Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ sdks/java/maven-archetypes/examples-java8/src/main/resources/archetype-resources
sdks/java/maven-archetypes/gcp-bom-examples/src/main/resources/archetype-resources/src/
sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/sample.txt

# Ignore generated debezium data
sdks/java/io/debezium/data/

# Ignore files generated by the Python build process.
**/*.pyc
**/*.pyo
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
## Breaking Changes

* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)).
* Debezium IO (Java) has been upgraded from depending on version 1.3.1.Final of io.debezium to 2.7.4.Final. This may cause some breaking changes since the libraries do not maintain full compatibility ([#33526](https://github.com/apache/beam/issues/33526)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ class BeamModulePlugin implements Plugin<Project> {
activemq_junit : "org.apache.activemq.tooling:activemq-junit:$activemq_version",
activemq_kahadb_store : "org.apache.activemq:activemq-kahadb-store:$activemq_version",
activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version",
antlr : "org.antlr:antlr4:4.7",
antlr_runtime : "org.antlr:antlr4-runtime:4.7",
antlr : "org.antlr:antlr4:4.10",
antlr_runtime : "org.antlr:antlr4-runtime:4.10",
args4j : "args4j:args4j:2.33",
auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version",
avro : "org.apache.avro:avro:1.11.4",
Expand Down
6 changes: 3 additions & 3 deletions sdks/java/io/debezium/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ dependencies {
permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761

// Debezium dependencies
implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final'
implementation group: 'io.debezium', name: 'debezium-core', version: '2.7.4.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '2.7.4.Final'
testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '2.7.4.Final'
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,10 +554,11 @@ public Map<String, String> getConfigurationMap() {
configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue());
}

// Set default Database History impl. if not provided
// Set default Database History impl and kafka topic prefix if not provided
configuration.computeIfAbsent(
"database.history",
k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName());
configuration.computeIfAbsent("topic.prefix", k -> "beam-debezium-connector");

String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration);
LOG.debug("---------------- Connector configuration: {}", stringProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
connectorConfiguration
.withConnectionProperty("table.include.list", configuration.getTable())
.withConnectionProperty("include.schema.changes", "false")
.withConnectionProperty("database.server.name", "beam-pipeline-server");
// add random unique name/identifier for server to identify connector
.withConnectionProperty("database.server.name", "beam-pipeline-server")
.withConnectionProperty("database.server.id", "579676")
.withConnectionProperty(
"schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
.withConnectionProperty(
"schema.history.internal.file.filename", "data/schema_history.dat");
if (configuration.getDatabase().equals("POSTGRES")) {
LOG.info(
"As Database is POSTGRES, we set the `database.dbname` property to {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.SchemaHistoryException;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -430,7 +430,7 @@ public IsBounded isBounded() {
}
}

public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory {
public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory {
private List<byte[]> history;

public DebeziumSDFDatabaseHistory() {
Expand All @@ -453,7 +453,7 @@ public void start() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
LOG.debug("------------- Adding history! {}", record);

history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void testDebeziumIOMySql() {
.withMaxNumberOfRecords(30)
.withCoder(StringUtf8Coder.of()));
String expected =
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\","
"{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.9.8.Final\",\"name\":\"dbserver1\","
+ "\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null,"
+ "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\","
+ "\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

import io.debezium.DebeziumException;
import java.time.Duration;
import java.util.Arrays;
import java.util.stream.Collectors;
Expand All @@ -28,7 +29,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.kafka.connect.errors.ConnectException;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -124,15 +124,16 @@ public void testNoProblem() {
result.getSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toList()),
Matchers.containsInAnyOrder("before", "after", "source", "op", "ts_ms", "transaction"));
Matchers.containsInAnyOrder(
"before", "after", "source", "op", "ts_ms", "ts_us", "ts_ns", "transaction"));
}

@Test
public void testWrongUser() {
Pipeline readPipeline = Pipeline.create();
ConnectException ex =
DebeziumException ex =
assertThrows(
ConnectException.class,
DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(
Expand All @@ -151,9 +152,9 @@ public void testWrongUser() {
@Test
public void testWrongPassword() {
Pipeline readPipeline = Pipeline.create();
ConnectException ex =
DebeziumException ex =
assertThrows(
ConnectException.class,
DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(
Expand All @@ -172,9 +173,9 @@ public void testWrongPassword() {
@Test
public void testWrongPort() {
Pipeline readPipeline = Pipeline.create();
ConnectException ex =
DebeziumException ex =
assertThrows(
ConnectException.class,
DebeziumException.class,
() -> {
PCollectionRowTuple.empty(readPipeline)
.apply(makePtransform(userName, password, database, 12345, "localhost"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used in this file and keeps getting removed by my linter, including it here.

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.auth.Credentials;
Expand Down
Loading