Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a simple flink and an aggregated join #136

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions flink-join/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Flink Foreign Key Joins

This example demonstrates how two Debezium change data topics can be joined via Flink.

The source database contains two tables, `customers` and `addresses`, with a foreign key relationship from the latter to the former,
i.e. a customer can have multiple addresses.

Using Flink the change event for the parent customers are represented as a dynamic table defined with CREATE TABLE, while the child addresses are represented as a stream of pojos that are first aggregated by the foreign key.
Each insertion, update or deletion of a record on either side will re-trigger the join.

## Building

Prepare the Java components by first performing a Maven build.

```console
$ mvn clean install
```

## Environment

Setup the necessary environment variables

```console
$ export DEBEZIUM_VERSION=1.2

```

The `DEBEZIUM_VERSION` specifies which version of Debezium artifacts should be used.

## Start the demo

Start all Debezium components:

```console
$ docker-compose up connect
```

This creates the kafka connect service and all dependent services defined in the `docker-compose.yaml` file.

## Configure the Debezium connector

Register the connector to stream outbox changes from the order service:

```console
$ http PUT http://localhost:8083/connectors/inventory-connector/config < register-postgres.json
HTTP/1.1 201 Created
```
## Run the Flink Job

To run the Flink job in local mode, simply compile and start the job class:

```console
$ mvn clean install
$ mvn exec:java \
-Dexec.mainClass="io.debezium.examples.flink.join.FlinkJoinTableStream" \
-Dexec.classpathScope=compile
```

To run the Flink job against a remote cluster is a little more involved. The simplest approach is to create a docker compose session cluster (docker-compose up jobmanager) then copy the Flink Kafka dependency to the lib and follow the instructions for submitting this project jar as a job - see the [Flink docs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#session-cluster-with-docker-compose).

## Review the outcome

Examine the joined events using _kafkacat_:

```console
$ docker run --tty --rm \
--network kstreams-fk-join-network \
debezium/tooling:1.1 \
kafkacat -b kafka:9092 -C -o beginning -q \
-t customers-with-addresses | jq .
```

## Useful Commands

Getting a session in the Postgres DB of the "order" service:

```console
$ docker run --tty --rm -i \
--network kstreams-fk-join-network \
debezium/tooling:1.1 \
bash -c 'pgcli postgresql://postgres:postgres@postgres:5432/postgres'
```

E.g. to update a customer record:

```sql
update inventory.customers set first_name = 'Sarah' where id = 1001;
```
76 changes: 76 additions & 0 deletions flink-join/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
version: '3.5'

services:

zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
networks:
- my-network
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=100
networks:
- my-network

postgres:
image: debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- ./inventory-addresses.sql:/docker-entrypoint-initdb.d/zzz.sql
networks:
- my-network

connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
networks:
- my-network

jobmanager:
image: flink:1.11.2-scala_2.11
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager

taskmanager:
image: flink:1.11.2-scala_2.11
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2

networks:
my-network:
name: flink-join-network
18 changes: 18 additions & 0 deletions flink-join/inventory-addresses.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE inventory.addresses (
id SERIAL NOT NULL PRIMARY KEY,
customer_id INTEGER NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
zipcode VARCHAR(255) NOT NULL,
country VARCHAR(255) NOT NULL,
FOREIGN KEY (customer_id) REFERENCES inventory.customers(id)
);
ALTER SEQUENCE inventory.addresses_id_seq RESTART WITH 100001;
ALTER TABLE inventory.addresses REPLICA IDENTITY FULL;

INSERT INTO inventory.addresses
VALUES (default, 1001, '42 Main Street', 'Hamburg', '90210', 'Canada'),
(default, 1001, '11 Post Dr.', 'Berlin', '90211', 'Canada'),
(default, 1002, '12 Rodeo Dr.', 'Los Angeles', '90212', 'US'),
(default, 1002, '1 Debezium Plaza', 'Monterey', '90213', 'US'),
(default, 1002, '2 Debezium Plaza', 'Monterey', '90213', 'US');
71 changes: 71 additions & 0 deletions flink-join/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>

<groupId>io.debezium.examples.flink.join</groupId>
<artifactId>flink-join</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<surefire-plugin.version>2.22.0</surefire-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.11</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemProperties>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>
16 changes: 16 additions & 0 deletions flink-join/register-postgres.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.whitelist": "inventory",
"decimal.handling.mode" : "string",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.debezium.examples.flink.join;

import java.util.Properties;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

/**
* Performs an inner join of a customer and an address
*/
public class FlinkJoin {

public static String TOPIC_OUT = "customer-with-address";

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql("CREATE TABLE customers (\n" +
" id INT PRIMARY KEY,\n" +
" first_name STRING,\n" +
" last_name STRING,\n" +
" email STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dbserver1.inventory.customers',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = '1',\n" +
" 'format' = 'debezium-json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")");

tableEnv.executeSql("CREATE TABLE addresses (\n" +
" id BIGINT PRIMARY KEY,\n" +
" customer_id INT,\n" +
" street STRING,\n" +
" city STRING,\n" +
" zipcode STRING,\n" +
" country STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dbserver1.inventory.addresses',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = '1',\n" +
" 'format' = 'debezium-json',\n" +
" 'scan.startup.mode' = 'earliest-offset'\n" +
")");

Table addressWithEmail = tableEnv.sqlQuery("select c.id, c.email, a.country, a.id as address_id "
+ "from customers as c inner join addresses as a on c.id = a.customer_id");

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

//-- coming soon in flink, we should be able to output a changelog/cdc stream
//DebeziumJsonSerializationSchema schema ...

//we cannot directly write this table result, which has a retract stream as input
//into an output kafka table, so we create an output stream to direct into the output topic
//we have to filter the before update as we want an unwrapped result
DataStream<Tuple2<Boolean, Row>> output = tableEnv.toRetractStream(addressWithEmail, Row.class)
.filter((t)->t.f1.getKind()!=RowKind.UPDATE_BEFORE);

TypeInformation<Row> rowType = ((TupleTypeInfo)output.getType()).getTypeAt(1);
JsonRowSerializationSchema rowSerialization = JsonRowSerializationSchema.builder().withTypeInfo(rowType).build();
//output the key as the first field
JsonRowSerializationSchema keySerialization = JsonRowSerializationSchema.builder()
.withTypeInfo(new RowTypeInfo(Types.INT)).build();

FlinkKafkaProducer<Tuple2<Boolean, Row>> kafkaProducer = new FlinkKafkaProducer<Tuple2<Boolean, Row>>(TOPIC_OUT,
((record, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT,
keySerialization.serialize(record.f1),
record.f0 ? rowSerialization.serialize(record.f1) : null)),
properties,
Semantic.EXACTLY_ONCE);

output.addSink(kafkaProducer);

env.execute("Debezium Join");
}

}
Loading