diff --git a/CHANGES.md b/CHANGES.md index 0c126e4087e7..e647d515a84e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Debezium IO upgraded to 3.1.1 requires Java 17 (Java) ([#34747](https://github.com/apache/beam/issues/34747)). ## New Features / Improvements * Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)). diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index 2070bcfc873c..dcdfd33ce0ac 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -24,6 +24,7 @@ applyJavaNature( [id: 'io.confluent', url: 'https://packages.confluent.io/maven/'] ], enableSpotbugs: false, + requireJavaVersion: JavaVersion.VERSION_17, ) provideIntegrationTestingDependencies() @@ -38,10 +39,16 @@ dependencies { implementation library.java.joda_time provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv - testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") - testImplementation project(path: ":sdks:java:io:common") + + // Kafka connect dependencies + implementation "org.apache.kafka:connect-api:3.9.0" + + // Debezium dependencies + implementation group: 'io.debezium', name: 'debezium-core', version: '3.1.1.Final' // Test dependencies + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(path: ":sdks:java:io:common") testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:jdbc") testRuntimeOnly library.java.slf4j_jdk14 @@ -49,29 +56,66 @@ dependencies { testImplementation project(":runners:google-cloud-dataflow-java") testImplementation library.java.hamcrest testImplementation library.java.testcontainers_base - testImplementation library.java.testcontainers_mysql - testImplementation library.java.testcontainers_postgresql - // TODO(https://github.com/apache/beam/issues/31678) HikariCP 5.x requires Java11+ - testImplementation 'com.zaxxer:HikariCP:4.0.3' + testImplementation "org.testcontainers:kafka" + testImplementation "org.testcontainers:mysql" + testImplementation "org.testcontainers:postgresql" + testImplementation "io.debezium:debezium-testing-testcontainers:3.1.1.Final" + testImplementation 'com.zaxxer:HikariCP:5.1.0' - // Kafka connect dependencies - implementation "org.apache.kafka:connect-api:2.5.0" - implementation "org.apache.kafka:connect-json:2.5.0" - permitUnusedDeclared "org.apache.kafka:connect-json:2.5.0" // BEAM-11761 + // Debezium connector implementations for testing + testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '3.1.1.Final' + testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '3.1.1.Final' +} - // Debezium dependencies - implementation group: 'io.debezium', name: 'debezium-core', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-mysql', version: '1.3.1.Final' - testImplementation group: 'io.debezium', name: 'debezium-connector-postgres', version: '1.3.1.Final' +// TODO: Remove pin after Beam has unpinned it (PR #35231) +// Pin the Antlr version +configurations.all { + resolutionStrategy { + force 'org.antlr:antlr4:4.10', 'org.antlr:antlr4-runtime:4.10' + } +} + +// TODO: Remove pin after upgrading Beam's Jackson version +// Force Jackson versions for the test runtime classpath +configurations.named("testRuntimeClasspath") { + resolutionStrategy.force ( + 'com.fasterxml.jackson.core:jackson-core:2.17.1', + 'com.fasterxml.jackson.core:jackson-annotations:2.17.1', + 'com.fasterxml.jackson.core:jackson-databind:2.17.1', + 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.1', + 'com.fasterxml.jackson.module:jackson-module-afterburner:2.17.1' + ) +} + +def configureTestJvmArgs(Task task) { + List currentJvmArgs = task.jvmArgs ? new ArrayList<>(task.jvmArgs) : new ArrayList<>() + + // Add standard opens required for Jackson, Afterburner, and other reflective frameworks + // when dealing with Java Modules or complex classloader scenarios. + currentJvmArgs.addAll([ + '--add-opens=java.base/java.lang=ALL-UNNAMED', + '--add-opens=java.base/java.util=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED', + '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED', + '--add-opens=java.base/java.io=ALL-UNNAMED', + '--add-opens=java.base/java.nio=ALL-UNNAMED', + '--add-opens=java.base/java.math=ALL-UNNAMED', + '--add-opens=java.base/java.time=ALL-UNNAMED', // For JSR310 types + ]) + + task.jvmArgs = currentJvmArgs.unique() // Assign the new, filtered list back, removing duplicates + project.logger.lifecycle("Task ${task.name} final jvmArgs: ${task.jvmArgs.join(' ')}") } test { testLogging { outputs.upToDateWhen {false} showStandardStreams = true + exceptionFormat = 'full' } -} + configureTestJvmArgs(delegate) +} task integrationTest(type: Test, dependsOn: processTestResources) { group = "Verification" @@ -89,14 +133,3 @@ task integrationTest(type: Test, dependsOn: processTestResources) { useJUnit { } } - -configurations.all (Configuration it) -> { - resolutionStrategy { - // Force protobuf 3 because debezium is currently incompatible with protobuf 4. - // TODO - remove this and upgrade the version of debezium once a proto-4 compatible version is available - // https://github.com/apache/beam/pull/33526 does some of this, but was abandoned because it still doesn't - // work with protobuf 4. - force "com.google.protobuf:protobuf-java:3.25.5" - force "com.google.protobuf:protobuf-java-util:3.25.5" - } -} diff --git a/sdks/java/io/debezium/expansion-service/build.gradle b/sdks/java/io/debezium/expansion-service/build.gradle index 3cc6905893f3..3c244d3bcefa 100644 --- a/sdks/java/io/debezium/expansion-service/build.gradle +++ b/sdks/java/io/debezium/expansion-service/build.gradle @@ -25,6 +25,7 @@ applyJavaNature( exportJavadoc: false, validateShadowJar: false, shadowClosure: {}, + requireJavaVersion: JavaVersion.VERSION_17, ) description = "Apache Beam :: SDKs :: Java :: IO :: Debezium :: Expansion Service" @@ -38,10 +39,10 @@ dependencies { runtimeOnly library.java.slf4j_jdk14 // Debezium runtime dependencies - def debezium_version = '1.3.1.Final' + def debezium_version = '3.1.1.Final' runtimeOnly group: 'io.debezium', name: 'debezium-connector-mysql', version: debezium_version runtimeOnly group: 'io.debezium', name: 'debezium-connector-postgres', version: debezium_version runtimeOnly group: 'io.debezium', name: 'debezium-connector-sqlserver', version: debezium_version runtimeOnly group: 'io.debezium', name: 'debezium-connector-oracle', version: debezium_version runtimeOnly group: 'io.debezium', name: 'debezium-connector-db2', version: debezium_version -} \ No newline at end of file +} diff --git a/sdks/java/io/debezium/src/README.md b/sdks/java/io/debezium/src/README.md index 4cf9be81c618..e56ac370b705 100644 --- a/sdks/java/io/debezium/src/README.md +++ b/sdks/java/io/debezium/src/README.md @@ -25,7 +25,7 @@ DebeziumIO is an Apache Beam connector that lets users connect their Events-Driv ### Getting Started -DebeziumIO uses [Debezium Connectors v1.3](https://debezium.io/documentation/reference/1.3/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines. +DebeziumIO uses [Debezium Connectors v3.1](https://debezium.io/documentation/reference/3.1/connectors/) to connect to Apache Beam. All you need to do is choose the Debezium Connector that suits your Debezium setup and pick a [Serializable Function](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/SerializableFunction.html), then you will be able to connect to Apache Beam and start building your own Pipelines. These connectors have been successfully tested and are known to work fine: * MySQL Connector @@ -65,7 +65,7 @@ You can also add more configuration, such as Connector-specific Properties with |Method|Params|Description| |-|-|-| |`.withConnectionProperty(propName, propValue)`|_String_, _String_|Adds a custom property to the connector.| -> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/1.3/connectors/) specific documentation. +> **Note:** For more information on custom properties, see your [Debezium Connector](https://debezium.io/documentation/reference/3.1/connectors/) specific documentation. Example of a MySQL Debezium Connector setup: ``` @@ -149,7 +149,7 @@ p.run().waitUntilFinish(); ### KafkaSourceConsumerFn and Restrictions -KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC. +KafkaSourceConsumerFn (KSC onwards) is a [DoFn](https://beam.apache.org/releases/javadoc/2.65.0/org/apache/beam/sdk/transforms/DoFn.html) in charge of the Database replication and CDC. There are two ways of initializing KSC: * Restricted by number of records @@ -164,9 +164,9 @@ By default, DebeziumIO initializes it with the former, though user may choose th ### Requirements and Supported versions -- JDK v8 -- Debezium Connectors v1.3 -- Apache Beam 2.25 +- JDK v17 +- Debezium Connectors v3.1 +- Apache Beam 2.66 ## Running Unit Tests diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index 30ad8a5f9f74..be418aed5cab 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -75,7 +75,6 @@ * .withConnectorClass(MySqlConnector.class) * .withConnectionProperty("database.server.id", "184054") * .withConnectionProperty("database.server.name", "serverid") - * .withConnectionProperty("database.include.list", "dbname") * .withConnectionProperty("database.history", DebeziumSDFDatabaseHistory.class.getName()) * .withConnectionProperty("include.schema.changes", "false"); * @@ -513,9 +512,14 @@ public ConnectorConfiguration withConnectionProperty(String key, String value) { checkArgument( getConnectionProperties().get() != null, "connectionProperties can not be null"); - ConnectorConfiguration config = builder().build(); - config.getConnectionProperties().get().putIfAbsent(key, value); - return config; + // Create a new map, copy existing properties if they exist, or start fresh. + Map newRawMap = new HashMap<>(getConnectionProperties().get()); + newRawMap.put(key, value); + // Create a new ValueProvider for the updated map. + ValueProvider> newConnectionPropertiesProvider = + ValueProvider.StaticValueProvider.of(newRawMap); + // Create a new ConnectorConfiguration instance , replace only the connectionProperties field. + return builder().setConnectionProperties(newConnectionPropertiesProvider).build(); } /** @@ -554,10 +558,16 @@ public Map getConfigurationMap() { configuration.computeIfAbsent(entry.getKey(), k -> entry.getValue()); } - // Set default Database History impl. if not provided + // Set default Database History impl. if not provided implementation and Kafka topic prefix, + // if not provided configuration.computeIfAbsent( "database.history", k -> KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName()); + configuration.computeIfAbsent("topic.prefix", k -> "beam-debezium-connector"); + configuration.computeIfAbsent( + "schema.history.internal.kafka.bootstrap.servers", k -> "localhost:9092"); + configuration.computeIfAbsent( + "schema.history.internal.kafka.topic", k -> "schema-changes.inventory"); String stringProperties = Joiner.on('\n').withKeyValueSeparator(" -> ").join(configuration); LOG.debug("---------------- Connector configuration: {}", stringProperties); diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 9f227708e5e6..a0838174759c 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -128,7 +128,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { String[] parts = connectionProperty.split("=", -1); String key = parts[0]; String value = parts[1]; - connectorConfiguration.withConnectionProperty(key, value); + connectorConfiguration = connectorConfiguration.withConnectionProperty(key, value); } } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index 54330d620477..a1edfb210548 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -22,9 +22,9 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; -import io.debezium.relational.history.AbstractDatabaseHistory; -import io.debezium.relational.history.DatabaseHistoryException; +import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.SchemaHistoryException; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; @@ -474,7 +474,7 @@ public IsBounded isBounded() { } } - public static class DebeziumSDFDatabaseHistory extends AbstractDatabaseHistory { + public static class DebeziumSDFDatabaseHistory extends AbstractSchemaHistory { private List history; public DebeziumSDFDatabaseHistory() { @@ -497,7 +497,7 @@ public void start() { } @Override - protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { LOG.debug("------------- Adding history! {}", record); history.add(DocumentWriter.defaultWriter().writeAsBytes(record.document())); diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java index 12ba57bad45d..e926a4b30625 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SerializableMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.fail; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; @@ -50,7 +51,9 @@ import org.junit.runners.JUnit4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.utility.DockerImageName; @@ -63,13 +66,20 @@ public class DebeziumIOMySqlConnectorIT { * *

Creates a docker container using the image used by the debezium tutorial. */ + private static final DockerImageName KAFKA_IMAGE = + DockerImageName.parse("confluentinc/cp-kafka:7.6.0"); + + @ClassRule public static Network network = Network.newNetwork(); + @ClassRule public static final MySQLContainer MY_SQL_CONTAINER = new MySQLContainer<>( - DockerImageName.parse("debezium/example-mysql:1.4") + DockerImageName.parse("quay.io/debezium/example-mysql:3.1.1.Final") .asCompatibleSubstituteFor("mysql")) .withPassword("debezium") .withUsername("mysqluser") + .withNetwork(network) + .withNetworkAliases("mysql") .withExposedPorts(3306) .waitingFor( new HttpWaitStrategy() @@ -77,16 +87,31 @@ public class DebeziumIOMySqlConnectorIT { .forStatusCodeMatching(response -> response == 200) .withStartupTimeout(Duration.ofMinutes(2))); + @ClassRule + public static final KafkaContainer KAFKA_CONTAINER = + new KafkaContainer(KAFKA_IMAGE) + .withNetwork(network) + .withNetworkAliases("kafka") + .dependsOn(MY_SQL_CONTAINER); + public static DataSource getMysqlDatasource(Void unused) { HikariConfig hikariConfig = new HikariConfig(); - hikariConfig.setJdbcUrl(MY_SQL_CONTAINER.getJdbcUrl()); + + String jdbcUrl = + String.format( + "jdbc:mysql://%s:%d/inventory?allowPublicKeyRetrieval=true", + MY_SQL_CONTAINER.getHost(), MY_SQL_CONTAINER.getMappedPort(3306)); + LOG.info("Hikari DataSource JDBC URL for mysqluser: {}", jdbcUrl); + + hikariConfig.setJdbcUrl(jdbcUrl); hikariConfig.setUsername(MY_SQL_CONTAINER.getUsername()); hikariConfig.setPassword(MY_SQL_CONTAINER.getPassword()); + hikariConfig.addDataSourceProperty("allowPublicKeyRetrieval", "true"); hikariConfig.setDriverClassName(MY_SQL_CONTAINER.getDriverClassName()); return new HikariDataSource(hikariConfig); } - private void monitorEssentialMetrics() { + private void monitorEssentialMetrics() throws SQLException { DataSource ds = getMysqlDatasource(null); try { Connection conn = ds.getConnection(); @@ -98,11 +123,16 @@ private void monitorEssentialMetrics() { rs.close(); Thread.sleep(4000); } else { - throw new IllegalArgumentException("OIOI"); + throw new IllegalArgumentException( + "Illegal Argument Exception in monitorEssentialMetrics."); } } - } catch (InterruptedException | SQLException ex) { - throw new IllegalArgumentException("Oi", ex); + } catch (SQLException ex) { + LOG.error("SQL error in monitoring thread. Shutting down.", ex); + throw (ex); + } catch (InterruptedException ex) { + LOG.info("Monitoring thread interrupted. Shutting down."); + Thread.currentThread().interrupt(); } } @@ -110,7 +140,6 @@ private void monitorEssentialMetrics() { public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException { long writeSize = 500L; long testTime = writeSize * 200L; - MY_SQL_CONTAINER.start(); PipelineOptions options = PipelineOptionsFactory.create(); Pipeline writePipeline = Pipeline.create(options); @@ -158,6 +187,17 @@ public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException { .setHost("localhost") .setTable("inventory.customers") .setPort(MY_SQL_CONTAINER.getMappedPort(3306)) + .setDebeziumConnectionProperties( + Lists.newArrayList( + "database.server.id=1849055", + "schema.history.internal.kafka.bootstrap.servers=" + + KAFKA_CONTAINER.getBootstrapServers(), + "schema.history.internal.kafka.topic=schema-history-mysql-transform-" + + System.nanoTime(), + "schema.history.internal=io.debezium.storage.kafka.history.KafkaSchemaHistory", + "schema.history.internal.store.only.captured.tables.ddl=false", + "table.include.list=inventory.customers", + "snapshot.mode=initial_only")) .build())) .get("output"); @@ -169,13 +209,29 @@ public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException { return null; }); Thread writeThread = new Thread(() -> writePipeline.run().waitUntilFinish()); - Thread monitorThread = new Thread(this::monitorEssentialMetrics); + Thread monitorThread = + new Thread( + () -> { + try { + monitorEssentialMetrics(); + } catch (SQLException e) { + e.printStackTrace(); + fail("Failed because of SQLException in monitorEssentialMetrics!"); + } + }); monitorThread.start(); writeThread.start(); - readPipeline.run().waitUntilFinish(); + writeThread.join(); + LOG.info("Write thread for SchemaTransform test joined."); + + LOG.info("Starting read pipeline for SchemaTransform test..."); + readPipeline.run().waitUntilFinish(); + LOG.info("Read pipeline for SchemaTransform test finished."); + monitorThread.interrupt(); monitorThread.join(); + LOG.info("Monitor thread for SchemaTransform test joined."); } /** @@ -185,9 +241,11 @@ public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException { */ @Test public void testDebeziumIOMySql() { - MY_SQL_CONTAINER.start(); - String host = MY_SQL_CONTAINER.getContainerIpAddress(); + String kafkaBootstrapServers = KAFKA_CONTAINER.getBootstrapServers(); + String schemaHistoryTopic = "mysql-schema-history-io-" + System.nanoTime(); + + String host = MY_SQL_CONTAINER.getHost(); String port = MY_SQL_CONTAINER.getMappedPort(3306).toString(); PipelineOptions options = PipelineOptionsFactory.create(); @@ -203,15 +261,24 @@ public void testDebeziumIOMySql() { .withHostName(host) .withPort(port) .withConnectionProperty("database.server.id", "184054") - .withConnectionProperty("database.server.name", "dbserver1") .withConnectionProperty("database.include.list", "inventory") - .withConnectionProperty("include.schema.changes", "false")) + .withConnectionProperty("include.schema.changes", "false") + .withConnectionProperty( + "schema.history.internal.kafka.bootstrap.servers", + kafkaBootstrapServers) + .withConnectionProperty( + "schema.history.internal.kafka.topic", schemaHistoryTopic) + .withConnectionProperty( + "schema.history.internal", + "io.debezium.storage.kafka.history.KafkaSchemaHistory") + .withConnectionProperty( + "schema.history.internal.store.only.captured.tables.ddl", "true")) .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()) .withMaxNumberOfRecords(30) .withCoder(StringUtf8Coder.of())); String expected = - "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\"," - + "\"database\":\"inventory\",\"schema\":\"mysql-bin.000003\",\"table\":\"addresses\"},\"before\":null," + "{\"metadata\":{\"connector\":\"mysql\",\"version\":\"3.1.1.Final\",\"name\":\"beam-debezium-connector\"," + + "\"database\":\"inventory\",\"schema\":\"binlog.000002\",\"table\":\"addresses\"},\"before\":null," + "\"after\":{\"fields\":{\"zip\":\"76036\",\"city\":\"Euless\"," + "\"street\":\"3183 Moore Avenue\",\"id\":10,\"state\":\"Texas\",\"customer_id\":1001," + "\"type\":\"SHIPPING\"}}}"; @@ -224,6 +291,5 @@ public void testDebeziumIOMySql() { }); p.run().waitUntilFinish(); - MY_SQL_CONTAINER.stop(); } } diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java index 970d9483850c..c1f67e4e44d9 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java @@ -56,7 +56,7 @@ public class DebeziumIOPostgresSqlConnectorIT { @ClassRule public static final PostgreSQLContainer POSTGRES_SQL_CONTAINER = new PostgreSQLContainer<>( - DockerImageName.parse("quay.io/debezium/example-postgres:latest") + DockerImageName.parse("quay.io/debezium/example-postgres:3.1.1.Final") .asCompatibleSubstituteFor("postgres")) .withPassword("dbz") .withUsername("debezium") @@ -74,8 +74,10 @@ public class DebeziumIOPostgresSqlConnectorIT { static DataSource getPostgresDatasource() { PGSimpleDataSource dataSource = new PGSimpleDataSource(); dataSource.setDatabaseName("inventory"); - dataSource.setServerName(POSTGRES_SQL_CONTAINER.getContainerIpAddress()); - dataSource.setPortNumber(POSTGRES_SQL_CONTAINER.getMappedPort(5432)); + String[] serverNames = new String[] {POSTGRES_SQL_CONTAINER.getHost()}; + dataSource.setServerNames(serverNames); + int[] ports = new int[] {POSTGRES_SQL_CONTAINER.getMappedPort(5432)}; + dataSource.setPortNumbers(ports); dataSource.setUser("debezium"); dataSource.setPassword("dbz"); return dataSource; @@ -156,7 +158,7 @@ public void testDebeziumSchemaTransformPostgresRead() throws InterruptedExceptio public void testDebeziumIOPostgresSql() { POSTGRES_SQL_CONTAINER.start(); - String host = POSTGRES_SQL_CONTAINER.getContainerIpAddress(); + String host = POSTGRES_SQL_CONTAINER.getHost(); String port = POSTGRES_SQL_CONTAINER.getMappedPort(5432).toString(); PipelineOptions options = PipelineOptionsFactory.create(); @@ -173,13 +175,12 @@ public void testDebeziumIOPostgresSql() { .withPort(port) .withConnectionProperty("database.dbname", "inventory") .withConnectionProperty("database.server.name", "dbserver1") - .withConnectionProperty("database.include.list", "inventory") .withConnectionProperty("include.schema.changes", "false")) .withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()) .withMaxNumberOfRecords(30) .withCoder(StringUtf8Coder.of())); String expected = - "{\"metadata\":{\"connector\":\"postgresql\",\"version\":\"1.3.1.Final\",\"name\":\"dbserver1\"," + "{\"metadata\":{\"connector\":\"postgresql\",\"version\":\"3.1.1.Final\",\"name\":\"beam-debezium-connector\"," + "\"database\":\"inventory\",\"schema\":\"inventory\",\"table\":\"customers\"},\"before\":null," + "\"after\":{\"fields\":{\"last_name\":\"Thomas\",\"id\":1001,\"first_name\":\"Sally\"," + "\"email\":\"sally.thomas@acme.com\"}}}"; diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java index 12d1d610ff9f..88ecc4fdd906 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java @@ -44,7 +44,6 @@ public class DebeziumIOTest implements Serializable { .withConnectorClass(MySqlConnector.class) .withConnectionProperty("database.server.id", "184054") .withConnectionProperty("database.server.name", "dbserver1") - .withConnectionProperty("database.include.list", "inventory") .withConnectionProperty( "database.history", KafkaSourceConsumerFn.DebeziumSDFDatabaseHistory.class.getName()) .withConnectionProperty("include.schema.changes", "false"); diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java index c4b5d2d1f890..2fc8996ba55e 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertThrows; +import io.debezium.DebeziumException; import java.time.Duration; import java.util.Arrays; import java.util.stream.Collectors; @@ -28,10 +29,11 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.kafka.connect.errors.ConnectException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.hamcrest.Matchers; import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.testcontainers.containers.Container; @@ -43,6 +45,8 @@ @RunWith(Parameterized.class) public class DebeziumReadSchemaTransformTest { + @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @ClassRule public static final PostgreSQLContainer POSTGRES_SQL_CONTAINER = new PostgreSQLContainer<>( @@ -104,6 +108,13 @@ private PTransform makePtransform( // is "database.table". .setTable("inventory.customers") .setPort(port) + .setDebeziumConnectionProperties( + Lists.newArrayList( + "database.server.id=579676", + "schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory", + String.format( + "schema.history.internal.file.filename=%s", + tempFolder.getRoot().toPath().resolve("schema_history.dat")))) .build()); } @@ -124,15 +135,16 @@ public void testNoProblem() { result.getSchema().getFields().stream() .map(field -> field.getName()) .collect(Collectors.toList()), - Matchers.containsInAnyOrder("before", "after", "source", "op", "ts_ms", "transaction")); + Matchers.containsInAnyOrder( + "before", "after", "source", "transaction", "op", "ts_ms", "ts_us", "ts_ns")); } @Test public void testWrongUser() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply( @@ -151,9 +163,9 @@ public void testWrongUser() { @Test public void testWrongPassword() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply( @@ -172,9 +184,9 @@ public void testWrongPassword() { @Test public void testWrongPort() { Pipeline readPipeline = Pipeline.create(); - ConnectException ex = + DebeziumException ex = assertThrows( - ConnectException.class, + DebeziumException.class, () -> { PCollectionRowTuple.empty(readPipeline) .apply(makePtransform(userName, password, database, 12345, "localhost"))