A Python wrapper for the KSQLdb REST API
pip install ksqldb
from ksqldb import KSQLdbClient
client = KSQLdbClient('http://localhost:8088')
It can also be configured with Basic authentication:
from ksqldb import KSQLdbClient
client = KSQLdbClient('http://localhost:8088', api_key="<KEY>", api_secret="<SECRET>")
Returns KSQLdb properties
client.get_properties()
For a full list of statement options make sure to check KSQLdb API: /ksql endpoint
Examples of executing KSQL statements:
client.ksql("show topics;")
client.ksql("show streams;")
client.ksql("show tables;")
client.ksql("describe <STREAM_NAME> extended;")
Runs a query synchronously.
Can't be used with EMIT CHANGES
queries.
Examples of executing KSQL queries:
client.query_sync("select * from STREAM_NAME;")
# To get data from beginning of stream use:
client.query_sync("select * from STREAM_NAME;", stream_properties={"ksql.streams.auto.offset.reset": "earliest"})
Runs a query asynchronously.
To test this in python shell can use python -m asyncio
Examples of executing KSQL async queries:
async for x in client.query_async("select * from STREAM_NAME emit changes;", timeout=None):
print(x)
# To get data from beginning of stream use:
async for x in client.query_async("select * from STREAM_NAME emit changes;", stream_properties={"ksql.streams.auto.offset.reset": "earliest"}, timeout=None):
print(x)
Usually you don't need to close a sync query, but should be done for async ones.
client.close_query("QUERY_ID")
Inserts data into a stream.
rows = [
{
"col1" : "val1",
"col2": 2.3,
"col3": True
},
{
"col1" : "val1",
"col2": 2.3,
"col3": True
},
]
client.inserts_stream("STREAM_NAME", rows)