streamsx.database package

Database integration for IBM Streams

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

Overview

Provides classes and functions to run SQL statements to a database.

Credentials

Db2 Warehouse credentials are defined using service credentials JSON.

The mandatory JSON elements are “username”, “password” and “jdbcurl”:

{
    "username": "<JDBC_USER>",
    "password": "<JDBC_PASSWORD>",
    "jdbcurl":  "<JDBC_URL>"
}
class streamsx.database.JDBCStatement(credentials, **options)

Bases: streamsx.topology.composite.Map

Composite map transformation for JDBC statement

The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.

This function includes the JDBC driver for Db2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.

Different drivers, e.g. for other databases than Db2, can be applied with the properties jdbc_driver_lib and jdbc_driver_class.

There are two ways to specify the statement:

  • Statement is part of the input stream. You can specify which input stream attribute contains the statement with sql_attribute. If input stream is of type CommonSchema.String, then you don’t need to specify the sql_attribute property.

  • Statement is given with the sql property. The statement can contain parameter markers that are set with input stream attributes specified by sql_params.

Example of a Streams application that inserts generated data into as rows in a table:

from streamsx.topology.topology import *
from streamsx.topology.schema import StreamSchema
from streamsx.topology.context import submit
import streamsx.database as db
import random
import time

# generates some data with schema (ID, NAME, AGE)
def generate_data():
    counter = 0
    while True:
        #yield a random id, name and age
        counter = counter +1
        yield  {"NAME": "Name_" + str(random.randint(0,500)), "ID": counter, "AGE": random.randint(10,99)}
        time.sleep(0.10)

topo = Topology()

tuple_schema = StreamSchema("tuple<int64 ID, rstring NAME, int32 AGE>")
# Generates data for a stream of three attributes. Each attribute maps to a column using the same name of the Db2 database table.
sample_data = topo.source(generate_data, name="GeneratedData").map(lambda tpl: (tpl["ID"], tpl["NAME"], tpl["AGE"]), schema=tuple_schema)

statement = db.JDBCStatement(credentials)
statement.sql = 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)'
statement.sql_params = 'ID, NAME, AGE'

sample_data.map(statement, name='INSERT')

# Use for IBM Streams including IBM Cloud Pak for Data
submit ('DISTRIBUTED', topo, cfg)

Example with key value arguments for the options parameter:

config = {
    'sql': 'INSERT INTO SAMPLE_DEMO (ID, NAME, AGE) VALUES (? , ?, ?)'
    'sql_params': 'ID, NAME, AGE'
}
inserts = sample_stream.map(db.JDBCStatement(credentials, **config))

Example with “select count” statement and defined output schema with attribute TOTAL having the result of the query:

sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>')
sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1'
query = topo.source([sql_query]).as_string()
res = query.map(db.JDBCStatement(credentials), schema=sample_schema)

Example with “drop table” statement and default output schema (set to input schema):

sql_drop = 'DROP TABLE RUN_SAMPLE'
s = topo.source([sql_drop]).as_string()
res_sql = s.map(db.JDBCStatement(credentials))
res_sql.print()

Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:

db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external')
res = query.map(db.JDBCStatement(db_external_connection), schema=sample_schema)

New in version 1.5.

credentials

The credentials of the IBM cloud Db2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.

Type

dict|str

options

The additional optional parameters as variable keyword arguments.

Type

kwargs

property batch_on_punct

Set to true, when execute the batch on window punctuation marker.

New in version 1.6.

Type

bool

property batch_size

Number of statements transmitted in a batch.

New in version 1.6.

Type

int

property commit_on_punct

Set to true, when commit shall be done on window punctuation marker.

New in version 1.6.

Type

bool

property jdbc_driver_class

Set the class name of the JDBC driver. The default driver is for DB2 database com.ibm.db2.jcc.DB2Driver.

Type

str

property jdbc_driver_lib

Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with jdbc_driver_class property. Per default the db2jcc4.jar is added to the ‘opt’ directory in the application bundle.

Type

str

property keystore

Path to the key store file for the SSL connection

Type

str

property keystore_password

Password for the key store file given by the keystore property.

Type

str

property keystore_type

Type of the key store file (JKS, PKCS12).

Type

str

property plugin_name

Name of the security plugin

Type

str

populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • schema – Schema passed into map.

  • name – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

property security_mechanism

Value of the security mechanism

Type

int

property sql

String containing the SQL statement. Use this as alternative option to sql_attribute property.

Type

str

property sql_attribute

Name of the input stream attribute containing the SQL statement. Use this as alternative option to sql property.

Type

str

property sql_params

The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.

Type

str

property ssl_connection

Set to True to enable SSL connection

Type

bool

property transaction_size

The number of tuples to commit per transaction. The default value is 1.

Type

int

property truststore

Path to the trust store file for the SSL connection

Type

str

property truststore_password

Password for the trust store file given by the truststore property.

Type

str

property truststore_type

Type of the trust store file (JKS, PKCS12).

Type

str

property vm_arg

Arbitrary JVM arguments can be passed to the Streams operator

Type

str

streamsx.database.download_toolkit(url=None, target_dir=None)

Downloads the latest JDBC toolkit from GitHub.

Example for updating the JDBC toolkit for your topology with the latest toolkit from GitHub:

import streamsx.database as db
# download toolkit from GitHub
jdbc_toolkit_location = db.download_toolkit()
# add the toolkit to topology
streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)

Example for updating the topology with a specific version of the JDBC toolkit using a URL:

import streamsx.database as db
url171 = 'https://github.com/IBMStreams/streamsx.jdbc/releases/download/v1.7.1/streamsx.jdbc.toolkits-1.7.1-20190703-1017.tgz'
jdbc_toolkit_location = db.download_toolkit(url=url171)
streamsx.spl.toolkit.add_toolkit(topology, jdbc_toolkit_location)
Parameters
  • url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.

  • target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given, the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems. If target_dir is None a location relative to the system temporary directory is chosen.

Returns

the location of the downloaded toolkit

Return type

str

Note

This function requires an outgoing Internet connection

New in version 1.4.

streamsx.database.configure_connection(instance, name='database', credentials=None)

Configures IBM Streams for a certain connection.

Creates or updates an application configuration object containing the required properties with connection information.

Example for creating a configuration for a Streams instance with connection details:

from icpd_core import icpd_util
from streamsx.rest_primitives import Instance
import streamsx.database as db

cfg = icpd_util.get_service_instance_details (name='your-streams-instance')
cfg[context.ConfigParams.SSL_VERIFY] = False
instance = Instance.of_service (cfg)
app_cfg = db.configure_connection (instance, credentials = 'my_credentials_json')

In Cloud Pak for Data you can configure a connection to Db2 with Connecting to data sources Example using this configured external connection with the name ‘Db2-Cloud’ to create an application configuration for IBM Streams:

db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external')
app_cfg = db.configure_connection (instance, credentials = db_external_connection)
Parameters
  • instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.

  • name (str) – Name of the application configuration, default name is ‘database’.

  • credentials (str|dict) – The service credentials, for example Db2 Warehouse service credentials.

Returns

Name of the application configuration.

streamsx.database.run_statement(stream, credentials, schema=None, sql=None, sql_attribute=None, sql_params=None, transaction_size=1, jdbc_driver_class='com.ibm.db2.jcc.DB2Driver', jdbc_driver_lib=None, ssl_connection=None, truststore=None, truststore_password=None, keystore=None, keystore_password=None, keystore_type=None, truststore_type=None, plugin_name=None, security_mechanism=None, vm_arg=None, name=None)

Runs a SQL statement using DB2 client driver and JDBC database interface.

The statement is called once for each input tuple received. Result sets that are produced by the statement are emitted as output stream tuples.

This function includes the JDBC driver for DB2 database (‘com.ibm.db2.jcc.DB2Driver’) in the application bundle per default.

Different drivers, e.g. for other databases than DB2, can be applied and the parameters jdbc_driver_class and jdbc_driver_lib must be specified.

Supports two ways to specify the statement:

  • Statement is part of the input stream. You can specify which input stream attribute contains the statement with the sql_attribute argument. If input stream is of type CommonSchema.String, then you don’t need to specify the sql_attribute argument.

  • Statement is given with the sql argument. The statement can contain parameter markers that are set with input stream attributes specified by sql_params argument.

Example with “insert” statement and values passed with input stream, where the input stream “sample_stream” is of type “sample_schema”:

import streamsx.database as db

sample_schema = StreamSchema('tuple<rstring A, rstring B>')
...
sql_insert = 'INSERT INTO RUN_SAMPLE (A, B) VALUES (?, ?)'
inserts = db.run_statement(sample_stream, credentials=credentials, schema=sample_schema, sql=sql_insert, sql_params="A, B")

Example with “select count” statement and defined output schema with attribute TOTAL having the result of the query:

sample_schema = StreamSchema('tuple<int32 TOTAL, rstring string>')
sql_query = 'SELECT COUNT(*) AS TOTAL FROM SAMPLE.TAB1'
query = topo.source([sql_query]).as_string()
res = db.run_statement(query, credentials=credentials, schema=sample_schema)

Example for using configured external connection with the name ‘Db2-Cloud’ (Cloud Pak for Data only), see Connecting to data sources:

db_external_connection = icpd_util.get_connection('Db2-Cloud',conn_class='external')
res = db.run_statement(query, credentials=db_external_connection, schema=sample_schema)
Parameters
  • stream (streamsx.topology.topology.Stream) – Stream of tuples containing the SQL statements or SQL statement parameter values. Supports streamsx.topology.schema.StreamSchema (schema for a structured stream) or CommonSchema.String as input.

  • credentials (dict|str) – The credentials of the IBM cloud DB2 warehouse service as dict or configured external connection of kind “Db2 Warehouse” (Cloud Pak for Data only) as dict or the name of the application configuration.

  • schema (StreamSchema) – Schema for returned stream. Defaults to input stream schema if not set.

  • sql (str) – String containing the SQL statement. Use this as alternative option to sql_attribute parameter.

  • sql_attribute (str) – Name of the input stream attribute containing the SQL statement. Use this as alternative option to sql parameter.

  • sql_params (str) – The values of SQL statement parameters. These values and SQL statement parameter markers are associated in lexicographic order. For example, the first parameter marker in the SQL statement is associated with the first sql_params value.

  • transaction_size (int) – The number of tuples to commit per transaction. The default value is 1.

  • jdbc_driver_class (str) – The default driver is for DB2 database ‘com.ibm.db2.jcc.DB2Driver’.

  • jdbc_driver_lib (str) – Path to the JDBC driver library file. Specify the jar filename with absolute path, containing the class given with jdbc_driver_class parameter. Per default the ‘db2jcc4.jar’ is added to the ‘opt’ directory in the application bundle.

  • ssl_connection (bool) – Set to True to enable SSL connection.

  • truststore (str) – Path to the trust store file for the SSL connection.

  • truststore_password (str) – Password for the trust store file given by the truststore parameter.

  • keystore (str) – Path to the key store file for the SSL connection.

  • keystore_password (str) – Password for the key store file given by the keystore parameter.

  • keystore_type (str) – Type of the key store file (JKS, PKCS12).

  • truststore_type (str) – Type of the key store file (JKS, PKCS12).

  • plugin_name (str) – Name of the security plugin.

  • security_mechanism (int) – Value of the security mechanism.

  • vm_arg (str) – Arbitrary JVM arguments can be passed to the Streams operator.

  • name (str) – Sink name in the Streams context, defaults to a generated name.

Returns

Output Stream.

Return type

streamsx.topology.topology.Stream

Deprecated since version 1.5.0: Use the JDBCStatement.

Indices and tables