Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial CDC support for DHC #1819

Merged
merged 22 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions Integrations/python/deephaven/ConsumeCdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# -*-Python-*-
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#

import collections
import jpy
import wrapt
import deephaven.ConsumeKafka as ck

from deephaven.conversion_utils import _dictToProperties, _isStr

# None until the first _defineSymbols() call
_java_type_ = None
ALL_PARTITIONS = None

def _defineSymbols():
"""
Defines appropriate java symbol, which requires that the jvm has been initialized through the :class:`jpy` module,
for use throughout the module AT RUNTIME. This is versus static definition upon first import, which would lead to an
exception if the jvm wasn't initialized BEFORE importing the module.
"""

if not jpy.has_jvm():
raise SystemError("No java functionality can be used until the JVM has been initialized through the jpy module")

global _java_type_, ALL_PARTITIONS
if _java_type_ is None:
# This will raise an exception if the desired object is not the classpath
_java_type_ = jpy.get_type("io.deephaven.kafka.CdcTools")
ck._defineSymbols()
ALL_PARTITIONS = ck.ALL_PARTITIONS


# every module method should be decorated with @_passThrough
@wrapt.decorator
def _passThrough(wrapped, instance, args, kwargs):
"""
For decoration of module methods, to define necessary symbols at runtime

:param wrapped: the method to be decorated
:param instance: the object to which the wrapped function was bound when it was called
:param args: the argument list for `wrapped`
:param kwargs: the keyword argument dictionary for `wrapped`
:return: the decorated version of the method
"""

_defineSymbols()
return wrapped(*args, **kwargs)

@_passThrough
def consumeToTable(
kafka_config:dict,
cdc_spec,
partitions = None,
as_stream_table = False,
drop_columns = None,
):
"""
Consume from a Change Data Capture (CDC) Kafka stream (as, eg, produced by Debezium)
tracking the underlying database table to a Deephaven table.

:param kafka_config: Dictionary with properties to configure the associated kafka consumer and
also the resulting table. Passed to the org.apache.kafka.clients.consumer.KafkaConsumer constructor;
pass any KafkaConsumer specific desired configuration here.
Note this should include the relevant property for a schema server URL where the
key and/or value Avro necessary schemas are stored.
:param cdc_spec: A CDC Spec opaque object obtained from calling either the cdc_explict_spec method
or the cdc_short_spec method
:param partitions: Either a sequence of integer partition numbers or the predefined constant
ALL_PARTITIONS for all partitions. Defaults to ALL_PARTITIONS if unspecified.
:param as_stream_table: If true, produce a streaming table of changed rows keeping
the CDC 'op' column indicating the type of column change; if false, return
a DHC ticking table that tracks the underlying database table through the CDC Stream.
:param drop_columns: A sequence of column names to omit from the resulting DHC table.
Note that only columns not included in the primary key for the table can be dropped at this stage;
you can chain a drop column operation after this call if you need to do this.
:return: A Deephaven live table that will update based on the CDC messages consumed for the given topic.
:raises: ValueError or TypeError if arguments provided can't be processed.
"""

partitions = ck._jpy_partitions(partitions)
kafka_config = _dictToProperties(kafka_config)
return _java_type_.consumeToTable(
kafka_config,
cdc_spec,
partitions,
as_stream_table,
drop_columns)

@_passThrough
def consumeRawToTable(
kafka_config:dict,
cdc_spec,
partitions = None,
table_type:str = 'stream'
):
"""
Consume the raw events from a Change Data Capture (CDC) Kafka stream to a Deephaven table.

:param kafka_config: Dictionary with properties to configure the associated kafka consumer and
also the resulting table. Passed to the org.apache.kafka.clients.consumer.KafkaConsumer constructor;
pass any KafkaConsumer specific desired configuration here.
Note this should include the relevant property for a schema server URL where the
key and/or value Avro necessary schemas are stored.
:param cdc_spec: A CDC Spec opaque object obtained from calling either the cdc_explict_spec method
or the cdc_short_spec method
:param partitions: Either a sequence of integer partition numbers or the predefined constant
ALL_PARTITIONS for all partitions. Defaults to ALL_PARTITIONS if unspecified.
:param table_type: A string specifying the resulting table type: one of 'stream' (default), 'append',
'stream_map' or 'append_map'.
:return: A Deephaven live table for the raw CDC events.
"""
partitions = ck._jpy_partitions(partitions)
kafka_config = _dictToProperties(kafka_config)
table_type_enum = ck._jpy_table_type(table_type)
return _java_type_.consumeRawToTable(kafka_config, cdc_spec, partitions, table_type_enum)

@_passThrough
def cdc_long_spec(
topic:str,
key_schema_name:str,
key_schema_version:str,
value_schema_name:str,
value_schema_version:str,
):
"""
Create a CdcSpec opaque object (necessary for one argument in a call to consume*ToTable)
via explicitly specifying all configuration options.

:param topic: The Kafka topic for the CDC events associated to the desired table data.
:param key_schema_name: The schema name for the Key Kafka field in the CDC events for the topic.
This schema should include definitions for the columns forming the PRIMARY KEY of the underlying table.
This schema name will be looked up in a schema server.
:param key_schema_version: The version for the Key schema to look up in schema server.
None or "latest" implies using the latest version when Key is not ignored.
:param value_schema_name: The schema name for the Value Kafka field in the CDC events for the topic.
This schema should include definitions for all the columns of the underlying table.
This schema name will be looked up in a schema server.
:param value_schema_version: The version for the Value schema to look up in schema server.
None or "latest" implies using the latest version.
:return: A CDCSpec object representing the inputs.
"""
return _java_type_.cdcLongSpec(topic, key_schema_name, key_schema_version, value_schema_name, value_schema_version)

@_passThrough
def cdc_short_spec(
server_name:str,
db_name:str,
table_name:str,
):
jcferretti marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a CdcSpec opaque object (necessary for one argument in a call to consume*ToTable)
in the debezium style, specifying server name, database name and table name.
The topic name, and key and value schema names are implied by convention:
- Topic is the concatenation of the arguments using "." as separator.
- Key schema name is topic with a "-key" suffix added.
- Value schema name is topic with a "-value" suffix added.

:param server_name: The server_name configuration value used when the CDC Stream was created.
:param db_name: The database name configuration value used when the CDC Stream was created.
:param table_name: The table name configuration value used when the CDC Stream was created.
:return: A CDCSpec object representing the inputs.
"""
return _java_type_.cdcShortSpec(server_name, db_name, table_name)
50 changes: 30 additions & 20 deletions Integrations/python/deephaven/ConsumeKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ def _custom_avroSchemaToColumnDefinitions(schema, mapping:dict = None):
except Exception as e:
pass

@_passThrough
def _jpy_partitions(partitions):
if partitions is None:
return ALL_PARTITIONS
if isinstance(partitions, collections.Sequence):
try:
jarr = jpy.array('int', partitions)
except Exception as e:
raise ValueError(
"when not one of the predefined constants, keyword argument 'partitions' has to " +
"represent a sequence of integer partition with values >= 0, instead got " +
str(partitions) + " of type " + type(partitions).__name__
) from e
return _java_type_.partitionFilterFromArray(jarr)
if not isinstance(partitions, jpy.JType):
raise TypeError(
"argument 'partitions' has to be of str or sequence type, " +
"or a predefined compatible constant, instead got partitions " +
str(partitions) + " of type " + type(partitions).__name__)
return partitions

@_passThrough
def _jpy_table_type(table_type):
table_type_enum = _java_type_.friendlyNameToTableType(table_type)
if table_type_enum is None:
raise ValueError("unknown value " + table_type + " for argument 'table_type'")
return table_type_enum

@_passThrough
def consumeToTable(
kafka_config:dict,
Expand Down Expand Up @@ -137,23 +165,7 @@ def consumeToTable(
if not _isStr(topic):
raise ValueError("argument 'topic' has to be of str type, instead got " + topic)

if partitions is None:
partitions = ALL_PARTITIONS
elif isinstance(partitions, collections.Sequence):
try:
jarr = jpy.array('int', partitions)
except Exception as e:
raise ValueError(
"when not one of the predefined constants, keyword argument 'partitions' has to " +
"represent a sequence of integer partition with values >= 0, instead got " +
str(partitions) + " of type " + type(partitions).__name__
) from e
partitions = _java_type_.partitionFilterFromArray(jarr)
elif not isinstance(partitions, jpy.JType):
raise TypeError(
"argument 'partitions' has to be of str or sequence type, " +
"or a predefined compatible constant, instead got partitions " +
str(partitions) + " of type " + type(partitions).__name__)
partitions = _jpy_partitions(partitions)

if offsets is None:
offsets = ALL_PARTITIONS_DONT_SEEK
Expand Down Expand Up @@ -186,9 +198,7 @@ def consumeToTable(
raise TypeError(
"argument 'table_type' expected to be of type str, instead got " +
str(table_type) + " of type " + type(table_type).__name__)
table_type_enum = _java_type_.friendlyNameToTableType(table_type)
if table_type_enum is None:
raise ValueError("unknown value " + table_type + " for argument 'table_type'")
table_type_enum = _jpy_table_type(table_type)

kafka_config = _dictToProperties(kafka_config)
return _java_type_.consumeToTable(kafka_config, topic, partitions, offsets, key, value, table_type_enum)
Expand Down
7 changes: 7 additions & 0 deletions debezium-demo/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
COMPOSE_PROJECT_NAME=core-redpanda
REPO=ghcr.io/
TAG=main
PORT=10000
DEEPHAVEN_CONSOLE_TYPE=python
DEEPHAVEN_APPLICATION_DIR=/data/app.d
DEEPHAVEN_SERVER_IMAGE=deephaven/server:local-build
32 changes: 32 additions & 0 deletions debezium-demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Debezium - Kafka Demo
=====================

Start docker-compose with the compose file in this
directory, then start a DH Web console in python and try:

```
import deephaven.ConsumeCdc as cc
server_name = 'mysql'; db_name='shop'; table_name = 'purchases'
purchases = cc.consumeToTable(
{'bootstrap.servers' : 'redpanda:9092',
'schema.registry.url' : 'http://redpanda:8081'},
cc.cdc_short_spec(server_name,
db_name,
table_name))
```

If you browse `loadgen/generate_load.py`, you can find
other table names in the file to play with.

At the beginning of the file the constant `purchaseGenEveryMS`
controls how frequently purchases are triggered.

Attributions
============

Files in this directory are based on demo code by
Debezium, Redpanda, and Materialize

* Debezium https://github.com/debezium/debezium
* Redpanda https://github.com/vectorizedio/redpanda
* Materialize https://github.com/MaterializeInc/materialize
93 changes: 93 additions & 0 deletions debezium-demo/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# docker compose file to run the debezium-kafka ecommerce demo
# with DHC ticking tables and dashboard

version: '3.4'

services:
server:
extends:
file: ../docker-compose-common.yml
service: server

web:
extends:
file: ../docker-compose-common.yml
service: web

# Should only be used for non-production deployments, see grpc-proxy/README.md for more info
grpc-proxy:
extends:
file: ../docker-compose-common.yml
service: grpc-proxy
depends_on:
server:
condition: service_healthy

envoy:
# A reverse proxy configured for no SSL on localhost. It fronts the requests
# for the static content and the websocket proxy.
extends:
file: ../docker-compose-common.yml
service: envoy
depends_on:
server:
condition: service_healthy
grpc-proxy:
condition: service_started
web:
condition: service_started

redpanda:
image: docker.vectorized.io/vectorized/redpanda:v21.9.5
command:
- redpanda start
- --overprovisioned
- --smp 1
- --memory 1G
- --reserve-memory 0M
- --node-id 0
- --check=false
- --kafka-addr 0.0.0.0:9092
- --advertise-kafka-addr redpanda:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr redpanda:8082
- --set redpanda.enable_transactions=true
- --set redpanda.enable_idempotence=true
ports:
- 9092:9092
- 8081:8081
- 8082:8082

mysql:
image: debezium/example-mysql:1.4
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw

debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: redpanda:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://redpanda:8081
depends_on:
redpanda:
condition: service_started
ports:
- 8083:8083

loadgen:
build: loadgen
depends_on:
mysql:
condition: service_started
debezium:
condition: service_started
11 changes: 11 additions & 0 deletions debezium-demo/loadgen/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3

RUN apt-get update && apt-get -qy install curl

RUN pip install barnum kafka-python mysql-connector-python requests noise wait-for-it

COPY . /loadgen

COPY docker-entrypoint.sh /usr/local/bin

ENTRYPOINT ["docker-entrypoint.sh"]
10 changes: 10 additions & 0 deletions debezium-demo/loadgen/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

set -e

wait-for-it --timeout 60 --service mysql:3306
wait-for-it --timeout 60 --service debezium:8083

cd /loadgen

exec python generate_load.py
Loading