Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) ([#34747](https://github.com/apache/beam/issues/34747)).

## New Features / Improvements
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
Expand Down
85 changes: 59 additions & 26 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ applyJavaNature(
[id: 'io.confluent', url: 'https://packages.confluent.io/maven/']
],
enableSpotbugs: false,
requireJavaVersion: JavaVersion.VERSION_17,
)
provideIntegrationTestingDependencies()

Expand All @@ -38,40 +39,83 @@ dependencies {
implementation library.java.joda_time
provided library.java.jackson_dataformat_csv
permitUnusedDeclared library.java.jackson_dataformat_csv
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:io:common")

// Kafka connect dependencies
implementation "org.apache.kafka:connect-api:3.9.0"

// Debezium dependencies
implementation group: 'io.debezium', name: 'debezium-core', version: '3.1.1.Final'

// Test dependencies
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:io:common")
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:jdbc")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(":runners:google-cloud-dataflow-java")
testImplementation library.java.hamcrest
testImplementation library.java.testcontainers_base
testImplementation library.java.testcontainers_mysql
testImplementation library.java.testcontainers_postgresql
// TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires Java11+
testImplementation 'com.zaxxer:HikariCP:4.0.3'
testImplementation "org.testcontainers:kafka"
testImplementation "org.testcontainers:mysql"
testImplementation "org.testcontainers:postgresql"
testImplementation "io.debezium:debezium-testing-testcontainers:3.1.1.Final"
testImplementation 'com.zaxxer:HikariCP:5.1.0'

// Kafka connect dependencies
implementation "org.apache.kafka:connect-api:2.5.0"
implementation "org.apache.kafka:connect-json:2.5.0"
permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761
// Debezium connector implementations for testing
testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '3.1.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final'
}

// Debezium dependencies
implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final'
// TODO: Remove pin after Beam has unpinned it (PR #35231)
// Pin the Antlr version
configurations.all {
resolutionStrategy {
force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10'
}
}

// TODO: Remove pin after upgrading Beam's Jackson version
// Force Jackson versions for the test runtime classpath
configurations.named("testRuntimeClasspath") {
resolutionStrategy.force (
'com.fasterxml.jackson.core:jackson-core:2.17.1',
'com.fasterxml.jackson.core:jackson-annotations:2.17.1',
'com.fasterxml.jackson.core:jackson-databind:2.17.1',
'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1',
'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.1'
)
}

def configureTestJvmArgs(Task task) {
List<String> currentJvmArgs = task.jvmArgs ? new ArrayList<>(task.jvmArgs) : new ArrayList<>()

// Add standard opens required for Jackson, Afterburner, and other reflective frameworks
// when dealing with Java Modules or complex classloader scenarios.
currentJvmArgs.addAll([
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
'--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/java.math=ALL-UNNAMED',
'--add-opens=java.base/java.time=ALL-UNNAMED', // For JSR310 types
])

task.jvmArgs = currentJvmArgs.unique() // Assign the new, filtered list back, removing duplicates
project.logger.lifecycle("Task ${task.name} final jvmArgs: ${task.jvmArgs.join(' ')}")
}

test {
testLogging {
outputs.upToDateWhen {false}
showStandardStreams = true
exceptionFormat = 'full'
}
}
configureTestJvmArgs(delegate)

}

task integrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
Expand All @@ -89,14 +133,3 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
useJUnit {
}
}

configurations.all (Configuration it) -> {
resolutionStrategy {
// Force protobuf 3 because debezium is currently incompatible with protobuf 4.
// TODO - remove this and upgrade the version of debezium once a proto-4 compatible version is available
// https://github.com/apache/beam/pull/33526 does some of this, but was abandoned because it still doesn't
// work with protobuf 4.
force "com.google.protobuf:protobuf-java:3.25.5"
force "com.google.protobuf:protobuf-java-util:3.25.5"
}
}
5 changes: 3 additions & 2 deletions sdks/java/io/debezium/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ applyJavaNature(
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
requireJavaVersion: JavaVersion.VERSION_17,
)

description = "Apache Beam :: SDKs :: Java :: IO :: Debezium :: Expansion Service"
Expand All @@ -38,10 +39,10 @@ dependencies {
runtimeOnly library.java.slf4j_jdk14

// Debezium runtime dependencies
def debezium_version = '1.3.1.Final'
def debezium_version = '3.1.1.Final'
runtimeOnly group: 'io.debezium', name: 'debezium-connector-mysql', version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-postgres', version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-sqlserver', version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle', version: debezium_version
runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: debezium_version
}
}
12 changes: 6 additions & 6 deletions sdks/java/io/debezium/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ DebeziumIO is an Apache Beam connector that lets users connect their Events-Driv

### Getting Started

DebeziumIO uses [Debezium Connectors v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines.
DebeziumIO uses [Debezium Connectors v3.1](https://debezium.io/documentation/reference/3.1/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines.

These connectors have been successfully tested and are known to work fine:
* MySQL Connector
Expand Down Expand Up @@ -65,7 +65,7 @@ You can also add more configuration, such as Connector-specific Properties with
|Method|Params|Description|
|-|-|-|
|`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a custom property to the connector.|
> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/1.3/connectors/) specific documentation.
> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/3.1/connectors/) specific documentation.

Example of a MySQL Debezium Connector setup:
```
Expand Down Expand Up @@ -149,7 +149,7 @@ p.run().waitUntilFinish();

### KafkaSourceConsumerFn and Restrictions

KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC.
KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC.

There are two ways of initializing KSC:
* Restricted by number of records
Expand All @@ -164,9 +164,9 @@ By default, DebeziumIO initializes it with the former, though user may choose th

### Requirements and Supported versions

- JDK v8
- Debezium Connectors v1.3
- Apache Beam 2.25
- JDK v17
- Debezium Connectors v3.1
- Apache Beam 2.66

## Running Unit Tests

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
* .withConnectorClass(MySqlConnector.class)
* .withConnectionProperty("database.server.id", "184054")
* .withConnectionProperty("database.server.name", "serverid")
* .withConnectionProperty("database.include.list", "dbname")
* .withConnectionProperty("database.history", DebeziumSDFDatabaseHistory.class.getName())
* .withConnectionProperty("include.schema.changes", "false");
*
Expand Down Expand Up @@ -513,9 +512,14 @@ public ConnectorConfiguration withConnectionProperty(String key, String value) {
checkArgument(
getConnectionProperties().get() != null, "connectionProperties can not be null");

ConnectorConfiguration config = builder().build();
config.getConnectionProperties().get().putIfAbsent(key, value);
return config;
// Create a new map, copy existing properties if they exist, or start fresh.
Map<String, String> newRawMap = new HashMap<>(getConnectionProperties().get());
newRawMap.put(key, value);
// Create a new ValueProvider for the updated map.
ValueProvider<Map<String, String>> newConnectionPropertiesProvider =
ValueProvider.StaticValueProvider.of(newRawMap);
// Create a new ConnectorConfiguration instance , replace only the connectionProperties field.
return builder().setConnectionProperties(newConnectionPropertiesProvider).build();
}

/**
Expand Down Expand Up @@ -554,10 +558,16 @@ public Map<String, String> getConfigurationMap() {
configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
}

// Set default Database History impl. if not provided
// Set default Database History impl. if not provided implementation and Kafka topic prefix,
// if not provided
configuration.computeIfAbsent(
"database.history",
k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
configuration.computeIfAbsent("topic.prefix", k -> "beam-debezium-connector");
configuration.computeIfAbsent(
"schema.history.internal.kafka.bootstrap.servers", k -> "localhost:9092");
configuration.computeIfAbsent(
"schema.history.internal.kafka.topic", k -> "schema-changes.inventory");

String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration);
LOG.debug("---------------- Connector configuration: {}", stringProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
String[] parts = connectionProperty.split("=", -1);
String key = parts[0];
String value = parts[1];
connectorConfiguration.withConnectionProperty(key, value);
connectorConfiguration = connectorConfiguration.withConnectionProperty(key, value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistoryException;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -474,7 +474,7 @@ public IsBounded isBounded() {
}
}

public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory {
public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory {
private List<byte[]> history;

public DebeziumSDFDatabaseHistory() {
Expand All @@ -497,7 +497,7 @@ public void start() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
LOG.debug("------------- Adding history! {}", record);

history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
Expand Down
Loading
Loading