From 2c95dc28836f338cb24d5a6b45ec89bd67a4212a Mon Sep 17 00:00:00 2001 From: Lars M Johansson Date: Mon, 3 Jun 2024 16:03:51 +0200 Subject: [PATCH 1/2] DBZ-7813 Exclude Column List --- .github/workflows/cross-maven.yml | 17 +- .github/workflows/maven.yml | 2 +- pom.xml | 2 +- .../informix/InformixChangeRecordEmitter.java | 11 +- .../informix/InformixConnection.java | 87 +++--- .../informix/InformixDatabaseSchema.java | 11 + .../InformixSnapshotChangeEventSource.java | 1 - .../InformixStreamingChangeEventSource.java | 47 ++- .../AbstractInformixDefaultValueIT.java | 7 +- .../informix/BlockingSnapshotIT.java | 7 - .../informix/IncrementalSnapshotIT.java | 6 - .../informix/InformixCdcTypesIT.java | 3 +- .../informix/InformixConnectorIT.java | 295 +++++++++++++----- .../InformixOfflineDefaultValueIT.java | 3 - .../InformixReselectColumnsProcessorIT.java | 6 +- .../InformixValidateColumnOrderIT.java | 13 +- .../informix/SchemaHistoryTopicIT.java | 3 +- .../informix/TransactionMetadataIT.java | 3 +- .../connector/informix/util/TestHelper.java | 21 +- 19 files changed, 325 insertions(+), 220 deletions(-) diff --git a/.github/workflows/cross-maven.yml b/.github/workflows/cross-maven.yml index 88008b6..939b45c 100644 --- a/.github/workflows/cross-maven.yml +++ b/.github/workflows/cross-maven.yml @@ -15,8 +15,8 @@ on: pull_request: branches: - main - - 1.* - 2.* + - 3.* paths-ignore: - '*.md' @@ -65,14 +65,15 @@ jobs: key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 - name: Maven build core - run: > - ./core/mvnw clean install -B -f core/pom.xml - -pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka -am + run: ./core/mvnw clean install -f core/pom.xml -am + -pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka -DskipTests=true -DskipITs=true -Dcheckstyle.skip=true -Dformat.skip=true -Drevapi.skip=true + -Dhttp.keepAlive=false + -Dmaven.wagon.http.pool=false # This job builds and creates/restores the cache based on the hash of the POM files from the core # repository; therefore, we need to output this so that the matrix job can reuse this cache. @@ -105,10 +106,12 @@ jobs: restore-keys: ${{ needs.build-core.outputs.cache-key }} - name: Maven build Informix (${{ matrix.informix-plugin }}) run: > - ./informix/mvnw clean install -B -f informix/pom.xml + ./informix/mvnw clean install -f informix/pom.xml -P${{ matrix.informix-plugin }} -Dformat.formatter.goal=validate -Dformat.imports.goal=check - -Ddebezium.test.records.waittime=7 - -Ddebezium.test.records.waittime.after.nulls=13 + -Ddebezium.test.records.waittime=16 + -Ddebezium.test.records.waittime.after.nulls=16 + -Dhttp.keepAlive=false + -Dmaven.wagon.http.pool=false -DfailFlakyTests=false diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index a69ade2..05adb6e 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -9,8 +9,8 @@ on: push: branches: - main - - 1.* - 2.* + - 3.* paths-ignore: - '*.md' diff --git a/pom.xml b/pom.xml index 9db27e6..13f966e 100644 --- a/pom.xml +++ b/pom.xml @@ -270,7 +270,7 @@ ${docker.dbs}:${informix.version} - small + medium accept 1 diff --git a/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java b/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java index 6d915e4..2702df4 100644 --- a/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java +++ b/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java @@ -12,6 +12,7 @@ import com.informix.jdbc.IfmxReadableType; +import io.debezium.DebeziumException; import io.debezium.data.Envelope.Operation; import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.relational.TableSchema; @@ -56,7 +57,8 @@ protected Object[] getNewColumnValues() { @Override protected void emitTruncateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { receiver.changeRecord(getPartition(), tableSchema, Operation.TRUNCATE, null, - tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()), getOffset(), null); + tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()), + getOffset(), null); } /** @@ -79,8 +81,11 @@ private static X propagate(Callable callable) { try { return callable.call(); } - catch (Exception e) { - throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e); + catch (RuntimeException rex) { + throw rex; + } + catch (Exception ex) { + throw new DebeziumException(ex); } } } diff --git a/src/main/java/io/debezium/connector/informix/InformixConnection.java b/src/main/java/io/debezium/connector/informix/InformixConnection.java index 28a4f74..3dc6c95 100644 --- a/src/main/java/io/debezium/connector/informix/InformixConnection.java +++ b/src/main/java/io/debezium/connector/informix/InformixConnection.java @@ -7,8 +7,6 @@ import java.io.PrintWriter; import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.time.Instant; @@ -28,8 +26,6 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; -import io.debezium.relational.Table; -import io.debezium.relational.TableEditor; import io.debezium.relational.TableId; import io.debezium.util.Strings; @@ -80,6 +76,19 @@ public InformixConnection(JdbcConfiguration config) { realDatabaseName = retrieveRealDatabaseName().trim(); } + private String retrieveRealDatabaseName() { + try { + return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name")); + } + catch (SQLException e) { + throw new RuntimeException("Couldn't obtain database name", e); + } + } + + public String getRealDatabaseName() { + return realDatabaseName; + } + /** * Calculates the highest available Log Sequence Number. * Tecnically, the _exact_ highest available LSN is not available in the JDBC session, but the current page of the active @@ -100,16 +109,33 @@ public Lsn getMaxLsn() throws SQLException { }, "Maximum LSN query must return exactly one value")); } - public String getRealDatabaseName() { - return realDatabaseName; + /** + * Calculates the lowest available Log Sequence Number. + * @see InformixConnection#getMaxLsn() + * @return the current lowest log sequence number + */ + private Lsn getMinLsn() throws SQLException { + return queryAndMap(GET_MIN_LSN, singleResultMapper(rs -> { + final Lsn lsn = Lsn.of(rs.getLong("uniqid"), rs.getLong("logpage") << 12); + LOGGER.trace("Current minimum lsn is {}", lsn.toLongString()); + return lsn; + }, "Minimum LSN query must return exactly one value")); } - private String retrieveRealDatabaseName() { + public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) { + final TxLogPosition lastPosition = ((InformixOffsetContext) offset).getChangePosition(); + final Lsn lastBeginLsn = lastPosition.getBeginLsn(); + final Lsn restartLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastPosition.getCommitLsn(); + LOGGER.trace("Restart LSN is '{}'", restartLsn); + try { - return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name")); + final Lsn minLsn = getMinLsn(); + LOGGER.trace("Lowest available LSN is '{}'", minLsn); + + return restartLsn.isAvailable() && minLsn.isAvailable() && restartLsn.compareTo(minLsn) >= 0; } catch (SQLException e) { - throw new RuntimeException("Couldn't obtain database name", e); + throw new DebeziumException("Couldn't obtain lowest available Log Sequence Number", e); } } @@ -158,22 +184,6 @@ public String quotedColumnIdString(String columnName) { return columnName; } - public Table getTableSchemaFromTableId(TableId tableId) throws SQLException { - final DatabaseMetaData metadata = connection().getMetaData(); - try (ResultSet columns = metadata.getColumns( - tableId.catalog(), - tableId.schema(), - tableId.table(), - null)) { - final TableEditor tableEditor = Table.editor().tableId(tableId); - while (columns.next()) { - readTableColumn(columns, tableId, null).ifPresent(columnEditor -> tableEditor.addColumns(columnEditor.create())); - } - tableEditor.setPrimaryKeyNames(readPrimaryKeyNames(metadata, tableId)); - return tableEditor.create(); - } - } - public DataSource datasource() { return new DataSource() { @Override @@ -228,31 +238,4 @@ public boolean isWrapperFor(Class iface) throws SQLException { }; } - public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) { - - final Lsn storedLsn = ((InformixOffsetContext) offset).getChangePosition().getCommitLsn(); - - try { - final Lsn oldestLsn = getOldestLsn(); - - if (oldestLsn == null) { - return false; - } - - LOGGER.trace("Oldest SCN in logs is '{}'", oldestLsn); - return storedLsn == null || oldestLsn.compareTo(storedLsn) < 0; - } - catch (SQLException e) { - throw new DebeziumException("Unable to get last available log position", e); - } - } - - private Lsn getOldestLsn() throws SQLException { - return queryAndMap(GET_MIN_LSN, singleResultMapper(rs -> { - final Lsn lsn = Lsn.of(rs.getLong("uniqid"), rs.getLong("logpage") << 12); - LOGGER.trace("Current minimum lsn is {}", lsn.toLongString()); - return lsn; - }, "Minimum LSN query must return exactly one value")); - } - } diff --git a/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java b/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java index d536dbd..f9f091f 100644 --- a/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java +++ b/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java @@ -12,6 +12,7 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchemaBuilder; +import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.TableChanges; import io.debezium.schema.SchemaChangeEvent; @@ -75,4 +76,14 @@ else if (schemaChange.getType() == SchemaChangeEventType.ALTER) { protected DdlParser getDdlParser() { return null; } + + @Override + public Tables tables() { + return super.tables(); + } + + @Override + public Tables.TableFilter getTableFilter() { + return super.getTableFilter(); + } } diff --git a/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java index f59b3a4..b6e0037 100644 --- a/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java @@ -100,7 +100,6 @@ else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXC LOGGER.info("Locking table {}", tableId); statement.execute(lockingStatement.get()); } - } } } diff --git a/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java index 1402b9e..14f5d8d 100644 --- a/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java @@ -30,7 +30,6 @@ import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; -import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; import io.debezium.util.Clock; @@ -88,19 +87,25 @@ public void execute(ChangeEventSourceContext context, InformixPartition partitio throws InterruptedException { // Need to refresh schema before CDCEngine is started, to capture columns added in off-line schema evolution - try { - for (TableId tableId : schema.tableIds()) { - final Table table = metadataConnection.getTableSchemaFromTableId(tableId); - schema.refresh(table); + schema.tableIds().stream().map(TableId::schema).distinct().forEach(schemaName -> { + try { + metadataConnection.readSchema( + schema.tables(), + null, + schemaName, + schema.getTableFilter(), + null, + true); } - } - catch (SQLException e) { - LOGGER.error("Caught SQLException", e); - errorHandler.setProducerThrowable(e); - } + catch (SQLException e) { + LOGGER.error("Caught SQLException", e); + errorHandler.setProducerThrowable(e); + } + }); TxLogPosition lastPosition = offsetContext.getChangePosition(); Lsn lastCommitLsn = lastPosition.getCommitLsn(); + Lsn lastChangeLsn = lastPosition.getChangeLsn(); Lsn lastBeginLsn = lastPosition.getBeginLsn(); Lsn beginLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastCommitLsn; @@ -136,9 +141,15 @@ public void execute(ChangeEventSourceContext context, InformixPartition partitio transactionRecord.getTransactionId(), commitLsn, lastCommitLsn); break; } + if (commitLsn.equals(lastCommitLsn) && lastChangeLsn.equals(lastCommitLsn)) { + LOGGER.info("Skipping transaction with id: '{}' since commitLsn='{}' == lastCommitLsn='{}' and lastChangeLsn='{}' == lastCommitLsn", + transactionRecord.getTransactionId(), commitLsn, lastCommitLsn, lastChangeLsn); + break; + } if (commitLsn.compareTo(lastCommitLsn) > 0) { + Lsn currentLsn = Lsn.of(transactionRecord.getBeginRecord().getSequenceId()); LOGGER.info("Recover finished: from lastBeginLsn='{}' to lastCommitLsn='{}', current Lsn='{}'", - lastBeginLsn, lastCommitLsn, commitLsn); + lastBeginLsn, lastCommitLsn, currentLsn); recovering = false; } handleTransaction(transactionEngine, partition, offsetContext, transactionRecord, recovering); @@ -248,6 +259,8 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart throws InterruptedException, IfxStreamException { long tStart = System.nanoTime(); + long lastChangeSeq = offsetContext.getChangePosition().getChangeLsn().sequence(); + int transactionId = transactionRecord.getTransactionId(); IfxCDCBeginTransactionRecord beginRecord = transactionRecord.getBeginRecord(); @@ -257,11 +270,11 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart long beginTs = beginRecord.getTime(); long beginSeq = beginRecord.getSequenceId(); - long lowestBeginSeq = engine.getLowestBeginSequence().orElse(beginSeq); long endSeq = endRecord.getSequenceId(); + long restartSeq = engine.getLowestBeginSequence().orElse(endSeq); if (!recover) { - updateChangePosition(offsetContext, endSeq, beginSeq, transactionId, lowestBeginSeq < beginSeq ? lowestBeginSeq : beginSeq); + updateChangePosition(offsetContext, endSeq, beginSeq, transactionId, Math.min(restartSeq, beginSeq)); dispatcher.dispatchTransactionStartedEvent( partition, String.valueOf(transactionId), @@ -276,7 +289,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart if (IfmxStreamRecordType.COMMIT.equals(endRecord.getType())) { IfxCDCCommitTransactionRecord commitRecord = (IfxCDCCommitTransactionRecord) endRecord; - long commitSeq = commitRecord.getSequenceId(); + long commitSeq = endSeq; long commitTs = commitRecord.getTime(); Map before = null; @@ -287,7 +300,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart long changeSeq = streamRecord.getSequenceId(); - if (recover && Lsn.of(changeSeq).compareTo(offsetContext.getChangePosition().getChangeLsn()) <= 0) { + if (recover && changeSeq <= lastChangeSeq) { LOGGER.info("Skipping already processed record {}", changeSeq); continue; } @@ -370,7 +383,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart start = System.nanoTime(); - updateChangePosition(offsetContext, commitSeq, commitSeq, transactionId, lowestBeginSeq); + updateChangePosition(offsetContext, commitSeq, commitSeq, transactionId, restartSeq); dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, Instant.ofEpochSecond(commitTs)); end = System.nanoTime(); @@ -384,7 +397,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart if (IfmxStreamRecordType.ROLLBACK.equals(endRecord.getType())) { if (!recover) { - updateChangePosition(offsetContext, endSeq, endSeq, transactionId, null); + updateChangePosition(offsetContext, endSeq, endSeq, transactionId, restartSeq); offsetContext.getTransactionContext().endTransaction(); } diff --git a/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java b/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java index 8d8fbdc..839ccda 100644 --- a/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java +++ b/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java @@ -13,7 +13,6 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.kafka.connect.data.Field; @@ -329,6 +328,8 @@ private void testDefaultValuesCreateTableAndSnapshot(List colu connection.execute("INSERT INTO dv_test (id) values (1)"); + waitForAvailableRecords(); + records = consumeRecordsByTopic(1); tableRecords = records.recordsForTopic(TestHelper.TEST_DATABASE + ".informix.dv_test"); assertThat(tableRecords).hasSize(1); @@ -365,6 +366,8 @@ private void testDefaultValuesAlterTableModifyExisting(List co connection.execute("INSERT INTO dv_test (id) values (2)"); + waitForAvailableRecords(); + SourceRecords records = consumeRecordsByTopic(1); // Verify we got only 1 record for our test @@ -449,7 +452,7 @@ private void testDefaultValuesAlterTableAdd(List columnDefinit connection.execute("INSERT INTO dv_test (id) values (3)"); // TODO: ALTER TABLE ADD columns sometimes(!) result in 'ghost' inserts for all existing rows(?) - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); SourceRecords records = consumeRecordsByTopicUntil((integer, record) -> Objects.equals(3, ((Struct) record.key()).get("id"))); assertNoRecordsToConsume(); diff --git a/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java b/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java index c47befe..7c45c01 100644 --- a/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java +++ b/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java @@ -21,7 +21,6 @@ import io.debezium.connector.informix.util.TestHelper; import io.debezium.jdbc.JdbcConnection; import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; import io.debezium.pipeline.AbstractBlockingSnapshotTest; import io.debezium.relational.history.SchemaHistory; @@ -158,10 +157,4 @@ protected int insertMaxSleep() { public void readsSchemaOnlyForSignaledTables() throws Exception { super.readsSchemaOnlyForSignaledTables(); } - - @Test - @Flaky("DBZ-7543") - public void executeBlockingSnapshotWhileStreaming() throws Exception { - super.executeBlockingSnapshotWhileStreaming(); - } } diff --git a/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java b/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java index 02d7069..2995c73 100644 --- a/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java +++ b/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java @@ -162,12 +162,6 @@ protected String server() { return TestHelper.TEST_DATABASE; } - @Override - protected void sendAdHocSnapshotSignal() throws SQLException { - super.sendAdHocSnapshotSignal(); - TestHelper.waitForCDC(); - } - @Test @Ignore("Informix does not support DDL operations on tables defined for replication") @Override diff --git a/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java b/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java index d99dc7e..cad4b9e 100644 --- a/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -179,7 +178,7 @@ private void insertOneAndValidate(String tableName, Schema valueSchema, String i String topicName = String.format("testdb.informix.%s", tableName); connection.execute(String.format("insert into %s values(%s)", tableName, insertValue)); - waitForAvailableRecords(1, TimeUnit.MINUTES); + waitForAvailableRecords(); List records = consumeRecordsByTopic(1).recordsForTopic(topicName); assertThat(records).isNotNull().hasSize(1); diff --git a/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java b/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java index 9349ad0..a736207 100644 --- a/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java @@ -8,20 +8,20 @@ import static io.debezium.connector.informix.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY; import static io.debezium.connector.informix.util.TestHelper.TYPE_NAME_PARAMETER_KEY; import static io.debezium.connector.informix.util.TestHelper.TYPE_SCALE_PARAMETER_KEY; -import static io.debezium.data.Envelope.FieldName.AFTER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; import static org.junit.Assert.assertNull; +import java.math.BigDecimal; import java.sql.SQLException; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; @@ -36,13 +36,15 @@ import io.debezium.connector.informix.util.TestHelper; import io.debezium.converters.CloudEventsConverterTest; import io.debezium.converters.spi.CloudEventsMaker; -import io.debezium.data.Envelope; +import io.debezium.data.Envelope.FieldName; import io.debezium.data.SchemaAndValueField; +import io.debezium.data.SourceRecordAssert; +import io.debezium.data.SpecialValueDecimal; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; +import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.history.MemorySchemaHistory; @@ -123,13 +125,13 @@ public void deleteWithoutTombstone() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START + i; connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')"); connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')"); } + waitForAvailableRecords(); consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); @@ -151,7 +153,7 @@ public void deleteWithoutTombstone() throws Exception { new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); final Struct deleteValue = (Struct) deleteRecord.value(); - assertRecord((Struct) deleteValue.get("before"), expectedValueBefore); + assertRecord((Struct) deleteValue.get(FieldName.BEFORE), expectedValueBefore); } } @@ -180,8 +182,7 @@ public void deleteWithTombstone() throws Exception { connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')"); connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')"); } - - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); @@ -206,7 +207,7 @@ public void deleteWithTombstone() throws Exception { new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); final Struct deleteValue = (Struct) deleteRecord.value(); - assertRecord((Struct) deleteValue.get("before"), expectedValueBefore); + assertRecord((Struct) deleteValue.get(FieldName.BEFORE), expectedValueBefore); } } } @@ -225,12 +226,12 @@ public void testTruncateTable() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START + i; connection.execute("INSERT INTO truncate_table VALUES(" + id + ", 'name')"); } + waitForAvailableRecords(); consumeRecordsByTopic(RECORDS_PER_TABLE); @@ -262,14 +263,10 @@ public void updatePrimaryKey() throws Exception { waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); connection.execute("INSERT INTO tableb VALUES(1, 'b')"); - waitForAvailableRecords(1, TimeUnit.SECONDS); consumeRecordsByTopic(1); - connection.execute( - "UPDATE tablea SET id=100 WHERE id=1", - "UPDATE tableb SET id=100 WHERE id=1"); - - waitForAvailableRecords(10, TimeUnit.SECONDS); + connection.execute("UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"); + waitForAvailableRecords(); final SourceRecords records = consumeRecordsByTopic(6); final List tableA = records.recordsForTopic("testdb.informix.tablea"); @@ -295,9 +292,9 @@ public void updatePrimaryKey() throws Exception { final Struct deleteKeyA = (Struct) deleteRecordA.key(); final Struct deleteValueA = (Struct) deleteRecordA.value(); - assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA); + assertRecord(deleteValueA.getStruct(FieldName.BEFORE), expectedDeleteRowA); assertRecord(deleteKeyA, expectedDeleteKeyA); - assertNull(deleteValueA.get("after")); + assertNull(deleteValueA.get(FieldName.AFTER)); final Struct tombstoneKeyA = (Struct) tombstoneRecordA.key(); final Struct tombstoneValueA = (Struct) tombstoneRecordA.value(); @@ -306,9 +303,9 @@ public void updatePrimaryKey() throws Exception { final Struct insertKeyA = (Struct) insertRecordA.key(); final Struct insertValueA = (Struct) insertRecordA.value(); - assertRecord(insertValueA.getStruct("after"), expectedInsertRowA); + assertRecord(insertValueA.getStruct(FieldName.AFTER), expectedInsertRowA); assertRecord(insertKeyA, expectedInsertKeyA); - assertNull(insertValueA.get("before")); + assertNull(insertValueA.get(FieldName.BEFORE)); final List expectedDeleteRowB = List.of( new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), @@ -327,9 +324,9 @@ public void updatePrimaryKey() throws Exception { final Struct deletekeyB = (Struct) deleteRecordB.key(); final Struct deleteValueB = (Struct) deleteRecordB.value(); - assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB); + assertRecord(deleteValueB.getStruct(FieldName.BEFORE), expectedDeleteRowB); assertRecord(deletekeyB, expectedDeleteKeyB); - assertNull(deleteValueB.get("after")); + assertNull(deleteValueB.get(FieldName.AFTER)); final Struct tombstonekeyB = (Struct) tombstoneRecordB.key(); final Struct tombstoneValueB = (Struct) tombstoneRecordB.value(); @@ -338,9 +335,9 @@ public void updatePrimaryKey() throws Exception { final Struct insertkeyB = (Struct) insertRecordB.key(); final Struct insertValueB = (Struct) insertRecordB.value(); - assertRecord(insertValueB.getStruct("after"), expectedInsertRowB); + assertRecord(insertValueB.getStruct(FieldName.AFTER), expectedInsertRowB); assertRecord(insertkeyB, expectedInsertKeyB); - assertNull(insertValueB.get("before")); + assertNull(insertValueB.get(FieldName.BEFORE)); } @Test @@ -364,11 +361,8 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { connection.execute("INSERT INTO tableb VALUES(1, 'b')"); consumeRecordsByTopic(1); - connection.execute( - "UPDATE tablea SET id=100 WHERE id=1", - "UPDATE tableb SET id=100 WHERE id=1"); - - waitForAvailableRecords(10, TimeUnit.SECONDS); + connection.execute("UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"); + waitForAvailableRecords(); final SourceRecords records1 = consumeRecordsByTopic(2); @@ -382,7 +376,7 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); final SourceRecords records2 = consumeRecordsByTopic(4); @@ -410,9 +404,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { final Struct deleteKeyA = (Struct) deleteRecordA.key(); final Struct deleteValueA = (Struct) deleteRecordA.value(); - assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA); + assertRecord(deleteValueA.getStruct(FieldName.BEFORE), expectedDeleteRowA); assertRecord(deleteKeyA, expectedDeleteKeyA); - assertNull(deleteValueA.get("after")); + assertNull(deleteValueA.get(FieldName.AFTER)); final Struct tombstoneKeyA = (Struct) tombstoneRecordA.key(); final Struct tombstoneValueA = (Struct) tombstoneRecordA.value(); @@ -421,9 +415,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { final Struct insertKeyA = (Struct) insertRecordA.key(); final Struct insertValueA = (Struct) insertRecordA.value(); - assertRecord(insertValueA.getStruct("after"), expectedInsertRowA); + assertRecord(insertValueA.getStruct(FieldName.AFTER), expectedInsertRowA); assertRecord(insertKeyA, expectedInsertKeyA); - assertNull(insertValueA.get("before")); + assertNull(insertValueA.get(FieldName.BEFORE)); final List expectedDeleteRowB = List.of( new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), @@ -442,9 +436,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { final Struct deletekeyB = (Struct) deleteRecordB.key(); final Struct deleteValueB = (Struct) deleteRecordB.value(); - assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB); + assertRecord(deleteValueB.getStruct(FieldName.BEFORE), expectedDeleteRowB); assertRecord(deletekeyB, expectedDeleteKeyB); - assertNull(deleteValueB.get("after")); + assertNull(deleteValueB.get(FieldName.AFTER)); final Struct tombstonekeyB = (Struct) tombstoneRecordB.key(); final Struct tombstoneValueB = (Struct) tombstoneRecordB.value(); @@ -453,9 +447,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { final Struct insertkeyB = (Struct) insertRecordB.key(); final Struct insertValueB = (Struct) insertRecordB.value(); - assertRecord(insertValueB.getStruct("after"), expectedInsertRowB); + assertRecord(insertValueB.getStruct(FieldName.AFTER), expectedInsertRowB); assertRecord(insertkeyB, expectedInsertKeyB); - assertNull(insertValueB.get("before")); + assertNull(insertValueB.get(FieldName.BEFORE)); } @Test @@ -509,11 +503,12 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception { if (withOnlineUpd) { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); connection.execute("UPDATE tablea SET cola = 'aa' WHERE id > 1"); connection.execute("UPDATE tableb SET colb = 'bb' WHERE id > 1"); + waitForAvailableRecords(); + final SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); final List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea"); final List tableB = sourceRecords.recordsForTopic("testdb.informix.tableb"); @@ -538,7 +533,7 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(3, TimeUnit.MINUTES); + waitForAvailableRecords(); final SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); final List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea"); @@ -559,12 +554,12 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception { new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); final Struct valueA = (Struct) recordA.value(); - assertRecord((Struct) valueA.get("after"), expectedRowA); - assertNull(valueA.get("before")); + assertRecord((Struct) valueA.get(FieldName.AFTER), expectedRowA); + assertNull(valueA.get(FieldName.BEFORE)); final Struct valueB = (Struct) recordB.value(); - assertRecord((Struct) valueB.get("after"), expectedRowB); - assertNull(valueB.get("before")); + assertRecord((Struct) valueB.get(FieldName.AFTER), expectedRowB); + assertNull(valueB.get(FieldName.BEFORE)); assertThat(recordA.sourceOffset().get("snapshot")).as("Streaming phase").isNull(); assertThat(recordA.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull(); @@ -590,13 +585,13 @@ public void testTableIncludeList() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START + i; connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')"); connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')"); } + waitForAvailableRecords(); final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE); final List tableA = records.recordsForTopic("testdb.informix.tablea"); @@ -632,8 +627,7 @@ public void testTableExcludeList() throws Exception { connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')"); connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')"); } - - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE); final List tableA = records.recordsForTopic("testdb.informix.tablea"); @@ -643,6 +637,143 @@ public void testTableExcludeList() throws Exception { assertNoRecordsToConsume(); } + @Test + @FixFor("DBZ-7813") + public void testColumnIncludeListWithInitialSnapshot() throws Exception { + testColumnIncludeList(SnapshotMode.INITIAL); + } + + @Test + @FixFor("DBZ-7813") + public void testColumnIncludeListWithNoData() throws Exception { + testColumnIncludeList(SnapshotMode.NO_DATA); + } + + public void testColumnIncludeList(SnapshotMode snapshotMode) throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(InformixConnectorConfig.SNAPSHOT_MODE, snapshotMode) + .with(InformixConnectorConfig.TABLE_INCLUDE_LIST, "testdb.informix.dt_table") + .with(InformixConnectorConfig.COLUMN_INCLUDE_LIST, + "informix.dt_table.id,informix.dt_table.c1,informix.dt_table.c2,informix.dt_table.c3a,informix.dt_table.c3b") + .build(); + + final int expectedRecords; + if (snapshotMode == SnapshotMode.INITIAL) { + expectedRecords = 2; + connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (0,123,456,789.01,'test',1.228,234.56)"); + } + else { + expectedRecords = 1; + } + + start(InformixConnector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); + + // Wait for streaming to start + waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); + + connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"); + + waitForAvailableRecords(); + + final SourceRecords records = consumeRecordsByTopic(expectedRecords); + final List table = records.recordsForTopic("testdb.informix.dt_table"); + assertThat(table).hasSize(expectedRecords); + Schema aSchema = SchemaBuilder.struct().optional() + .name("testdb.informix.dt_table.Value") + .field("id", Schema.INT32_SCHEMA) + .field("c1", Schema.OPTIONAL_INT32_SCHEMA) + .field("c2", Schema.OPTIONAL_INT32_SCHEMA) + .field("c3a", SpecialValueDecimal.builder(DecimalMode.PRECISE, 5, 2).optional().build()) + .field("c3b", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Struct aStruct = new Struct(aSchema) + .put("c1", 123) + .put("c2", 456) + .put("c3a", BigDecimal.valueOf(789.01)) + .put("c3b", "test"); + if (snapshotMode == SnapshotMode.INITIAL) { + SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 0)); + SourceRecordAssert.assertThat(table.get(1)).valueAfterFieldIsEqualTo(aStruct.put("id", 1)); + } + else { + SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 1)); + } + + assertNoRecordsToConsume(); + } + + @Test + @FixFor("DBZ-7813") + public void testColumnExcludeListWithInitialSnapshot() throws Exception { + testColumnExcludeList(SnapshotMode.INITIAL); + } + + @Test + @FixFor("DBZ-7813") + public void testColumnExcludeListWithINoData() throws Exception { + testColumnExcludeList(SnapshotMode.NO_DATA); + } + + public void testColumnExcludeList(SnapshotMode snapshotMode) throws Exception { + final Configuration config = TestHelper.defaultConfig() + .with(InformixConnectorConfig.SNAPSHOT_MODE, snapshotMode) + .with(InformixConnectorConfig.TABLE_EXCLUDE_LIST, "testdb.informix.tablea") + .with(InformixConnectorConfig.COLUMN_EXCLUDE_LIST, "informix.dt_table.f1,informix.dt_table.f2") + .build(); + + final int expectedRecords; + if (snapshotMode == SnapshotMode.INITIAL) { + expectedRecords = 2; + connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (0,123,456,789.01,'test',1.228,234.56)"); + } + else { + expectedRecords = 1; + } + + start(InformixConnector.class, config); + assertConnectorIsRunning(); + + // Wait for snapshot completion + waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); + + // Wait for streaming to start + waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); + + connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"); + + waitForAvailableRecords(); + + final SourceRecords records = consumeRecordsByTopic(expectedRecords); + final List table = records.recordsForTopic("testdb.informix.dt_table"); + assertThat(table).hasSize(expectedRecords); + Schema aSchema = SchemaBuilder.struct().optional() + .name("testdb.informix.dt_table.Value") + .field("id", Schema.INT32_SCHEMA) + .field("c1", Schema.OPTIONAL_INT32_SCHEMA) + .field("c2", Schema.OPTIONAL_INT32_SCHEMA) + .field("c3a", SpecialValueDecimal.builder(DecimalMode.PRECISE, 5, 2).optional().build()) + .field("c3b", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Struct aStruct = new Struct(aSchema) + .put("c1", 123) + .put("c2", 456) + .put("c3a", BigDecimal.valueOf(789.01)) + .put("c3b", "test"); + if (snapshotMode == SnapshotMode.INITIAL) { + SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 0)); + SourceRecordAssert.assertThat(table.get(1)).valueAfterFieldIsEqualTo(aStruct.put("id", 1)); + } + else { + SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 1)); + } + + assertNoRecordsToConsume(); + } + private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception { final int RECORDS_PER_TABLE = 30; final int TABLES = 2; @@ -675,7 +806,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af return false; } final Struct envelope = (Struct) record.value(); - final Struct after = envelope.getStruct("after"); + final Struct after = envelope.getStruct(FieldName.AFTER); final Integer id = after.getInt32("id"); final String value = after.getString("cola"); return id != null && id == HALF_ID && "a".equals(value); @@ -685,7 +816,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af // Wait for snapshot to be completed or a first streaming message delivered if (restartJustAfterSnapshot) { waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(2, TimeUnit.MINUTES); + waitForAvailableRecords(); } else { waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); @@ -699,7 +830,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final List expectedRow = List.of( new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a")); - assertRecord(((Struct) records.allRecordsInOrder().get(0).value()).getStruct(Envelope.FieldName.AFTER), expectedRow); + assertRecord(((Struct) records.allRecordsInOrder().get(0).value()).getStruct(FieldName.AFTER), expectedRow); } connection.setAutoCommit(false); @@ -710,7 +841,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af } connection.commit(); - waitForAvailableRecords(1, TimeUnit.MINUTES); + waitForAvailableRecords(); List records = consumeRecordsByTopic(RECORDS_PER_TABLE).allRecordsInOrder(); @@ -720,7 +851,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af final List expectedLastRow = List.of( new SchemaAndValueField("id", Schema.INT32_SCHEMA, HALF_ID - 1), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); - assertRecord((Struct) value.get("after"), expectedLastRow); + assertRecord((Struct) value.get(FieldName.AFTER), expectedLastRow); waitForConnectorShutdown(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); stopConnector(); @@ -730,7 +861,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af assertConnectorIsRunning(); waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(1, TimeUnit.MINUTES); + waitForAvailableRecords(); SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE); List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea"); @@ -751,12 +882,12 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); final Struct valueA = (Struct) recordA.value(); - assertRecord((Struct) valueA.get("after"), expectedRowA); - assertNull(valueA.get("before")); + assertRecord((Struct) valueA.get(FieldName.AFTER), expectedRowA); + assertNull(valueA.get(FieldName.BEFORE)); final Struct valueB = (Struct) recordB.value(); - assertRecord((Struct) valueB.get("after"), expectedRowB); - assertNull(valueB.get("before")); + assertRecord((Struct) valueB.get(FieldName.AFTER), expectedRowB); + assertNull(valueB.get(FieldName.BEFORE)); } for (int i = 0; i < RECORDS_PER_TABLE; i++) { @@ -765,8 +896,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af connection.executeWithoutCommitting("INSERT INTO tableb VALUES(" + id + ", 'b')"); connection.commit(); } - - waitForAvailableRecords(1, TimeUnit.MINUTES); + waitForAvailableRecords(); sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); tableA = sourceRecords.recordsForTopic("testdb.informix.tablea"); @@ -787,12 +917,12 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); final Struct valueA = (Struct) recordA.value(); - assertRecord((Struct) valueA.get("after"), expectedRowA); - assertNull(valueA.get("before")); + assertRecord((Struct) valueA.get(FieldName.AFTER), expectedRowA); + assertNull(valueA.get(FieldName.BEFORE)); final Struct valueB = (Struct) recordB.value(); - assertRecord((Struct) valueB.get("after"), expectedRowB); - assertNull(valueB.get("before")); + assertRecord((Struct) valueB.get(FieldName.AFTER), expectedRowB); + assertNull(valueB.get(FieldName.BEFORE)); } assertNoRecordsToConsume(); @@ -852,11 +982,12 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); connection.execute("INSERT INTO masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')"); connection.execute("INSERT INTO truncated_column_table VALUES(11, 'some_name')"); + waitForAvailableRecords(); + final SourceRecords records = consumeRecordsByTopic(2); final List tableA = records.recordsForTopic("testdb.informix.masked_hashed_column_table"); final List tableB = records.recordsForTopic("testdb.informix.truncated_column_table"); @@ -867,8 +998,8 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception VerifyRecord.isValidInsert(record, "id", 10); Struct value = (Struct) record.value(); - if (value.getStruct("after") != null) { - Struct after = value.getStruct("after"); + if (value.getStruct(FieldName.AFTER) != null) { + Struct after = value.getStruct(FieldName.AFTER); assertThat(after.getString("name")).isEqualTo("************"); assertThat(after.getString("name2")).isEqualTo("8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7"); assertThat(after.getString("name3")).isEqualTo("8e68c68edbbac316dfe2"); @@ -879,8 +1010,8 @@ record = tableB.get(0); VerifyRecord.isValidInsert(record, "id", 11); value = (Struct) record.value(); - if (value.getStruct("after") != null) { - assertThat(value.getStruct("after").getString("name")).isEqualTo("some"); + if (value.getStruct(FieldName.AFTER) != null) { + assertThat(value.getStruct(FieldName.AFTER).getString("name")).isEqualTo("some"); } } @@ -897,10 +1028,11 @@ public void shouldRewriteIdentityKey() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); connection.execute("INSERT INTO tablea (id, cola) values (100, 'hundred')"); + waitForAvailableRecords(); + List records = consumeRecordsByTopic(1).recordsForTopic("testdb.informix.tablea"); assertThat(records).isNotNull().isNotEmpty(); assertThat(records.get(0).key()).isNotNull(); @@ -923,17 +1055,18 @@ public void shouldPropagateSourceTypeByDatatype() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"); + waitForAvailableRecords(); + final SourceRecords records = consumeRecordsByTopic(1); List recordsForTopic = records.recordsForTopic("testdb.informix.dt_table"); assertThat(recordsForTopic).hasSize(1); assertNoRecordsToConsume(); - final Field before = recordsForTopic.get(0).valueSchema().field("before"); + final Field before = recordsForTopic.get(0).valueSchema().field(FieldName.BEFORE); assertThat(before.schema().field("id").schema().parameters()).isNull(); assertThat(before.schema().field("c1").schema().parameters()).isNull(); @@ -965,8 +1098,6 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { .with(InformixConnectorConfig.TABLE_INCLUDE_LIST, "testdb.informix.tablea") .build(); - connection.execute("INSERT INTO tablea (id,cola) values (1001, 'DBZ3668')"); - start(InformixConnector.class, config); assertConnectorIsRunning(); @@ -987,9 +1118,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - connection.execute("INSERT INTO tablea (id,cola) VALUES (1002, 'DBZ3668')"); + connection.execute("INSERT INTO tablea (id,cola) VALUES (3668, 'DBZ3668')"); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); records = consumeRecordsByTopic(1); @@ -1029,10 +1160,10 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception { int expectedRecordCount = 2; SourceRecords sourceRecords = consumeRecordsByTopic(expectedRecordCount); assertThat(sourceRecords.recordsForTopic("testdb.informix.always_snapshot")).hasSize(expectedRecordCount); - Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(AFTER); + Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(FieldName.AFTER); TestCase.assertEquals(1, struct.get("id")); TestCase.assertEquals("Test1", struct.get("data")); - struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(1).value()).get(AFTER); + struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(1).value()).get(FieldName.AFTER); TestCase.assertEquals(2, struct.get("id")); TestCase.assertEquals("Test2", struct.get("data")); @@ -1054,16 +1185,16 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception { // Check we get up-to-date data in the snapshot. assertThat(sourceRecords.recordsForTopic("testdb.informix.always_snapshot")).hasSize(expectedRecordCount); - struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(0).value()).get(AFTER); + struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(0).value()).get(FieldName.AFTER); TestCase.assertEquals(3, struct.get("id")); TestCase.assertEquals("Test3", struct.get("data")); - struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(1).value()).get(AFTER); + struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(1).value()).get(FieldName.AFTER); TestCase.assertEquals(2, struct.get("id")); TestCase.assertEquals("Test2", struct.get("data")); } @Test - @Flaky("7699") + @FixFor("DBZ-7699") public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { Configuration.Builder builder = TestHelper.defaultConfig() @@ -1097,7 +1228,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception { connection.execute("INSERT INTO tablea VALUES (100,'100')"); connection.execute("INSERT INTO tablea VALUES (200,'200')"); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); recordCount = 2; sourceRecords = consumeRecordsByTopic(recordCount); @@ -1144,7 +1275,7 @@ public void shouldAllowForCustomSnapshot() throws InterruptedException, SQLExcep connection.execute("INSERT INTO tablea VALUES (2, '1');"); connection.execute("INSERT INTO tableb VALUES (2, '1');"); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); actualRecords = consumeRecordsByTopic(2); diff --git a/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java b/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java index 09472ca..ebde52b 100644 --- a/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java @@ -6,7 +6,6 @@ package io.debezium.connector.informix; import java.sql.SQLException; -import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -52,7 +51,5 @@ protected void performSchemaChange(Configuration config, InformixConnection conn waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); assertConnectorIsRunning(); - - waitForAvailableRecords(1, TimeUnit.SECONDS); } } diff --git a/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java b/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java index 8124251..3e17ea7 100644 --- a/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java @@ -5,8 +5,6 @@ */ package io.debezium.connector.informix; -import java.util.concurrent.TimeUnit; - import org.junit.After; import org.junit.Before; @@ -107,12 +105,12 @@ protected void waitForStreamingStarted() throws InterruptedException { } protected SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException { - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); return super.consumeRecordsByTopicReselectWhenNullStreaming(); } protected SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException { - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); return super.consumeRecordsByTopicReselectWhenNotNullStreaming(); } } diff --git a/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java b/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java index 104a507..09854f8 100644 --- a/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java @@ -11,7 +11,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Struct; @@ -88,8 +87,6 @@ public void testColumnOrderWhileInsert() throws Exception { waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); - // insert a record Map recordToBeInsert = new LinkedHashMap<>() { { @@ -104,7 +101,7 @@ public void testColumnOrderWhileInsert() throws Exception { Strings.join(", ", recordToBeInsert.keySet()), Strings.join("\", \"", recordToBeInsert.values()))); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); String topicName = String.format("%s.informix.%s", TestHelper.TEST_DATABASE, testTableName); SourceRecords sourceRecords = consumeRecordsByTopic(1); @@ -144,8 +141,6 @@ public void testColumnOrderWhileUpdate() throws Exception { waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); - Map recordAfterUpdate = new LinkedHashMap<>(recordToBeUpdate); // new value recordAfterUpdate.put("address", "00:00:00:00:00:00"); @@ -154,7 +149,7 @@ public void testColumnOrderWhileUpdate() throws Exception { connection.execute(String.format("update %s set address = \"%s\" where id = \"%s\"", testTableName, recordAfterUpdate.get("address"), recordToBeUpdate.get("id"))); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); String topicName = String.format("%s.informix.%s", TestHelper.TEST_DATABASE, testTableName); SourceRecords sourceRecords = consumeRecordsByTopic(1); @@ -197,11 +192,9 @@ public void testColumnOrderWhileDelete() throws Exception { waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); - connection.execute(String.format("delete from %s where id = \"%s\"", testTableName, recordToBeDelete.get("id"))); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); String topicName = String.format("%s.informix.%s", TestHelper.TEST_DATABASE, testTableName); SourceRecords sourceRecords = consumeRecordsByTopic(1); diff --git a/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java b/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java index a7e185b..5236f0c 100644 --- a/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java +++ b/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java @@ -9,7 +9,6 @@ import java.sql.SQLException; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -94,7 +93,7 @@ public void snapshotSchemaChanges() throws Exception { waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(); // DDL for 3 tables SourceRecords records = consumeRecordsByTopic(3); diff --git a/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java b/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java index afdd280..8ebee75 100644 --- a/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java +++ b/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Struct; @@ -98,7 +97,7 @@ public void transactionMetadata() throws Exception { connection.setAutoCommit(true); connection.execute("INSERT INTO tableb VALUES(1000, 'b')"); - waitForAvailableRecords(5, TimeUnit.SECONDS); + waitForAvailableRecords(); // BEGIN, data, END, BEGIN, data, END final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 3); diff --git a/src/test/java/io/debezium/connector/informix/util/TestHelper.java b/src/test/java/io/debezium/connector/informix/util/TestHelper.java index 622ec52..61d058e 100644 --- a/src/test/java/io/debezium/connector/informix/util/TestHelper.java +++ b/src/test/java/io/debezium/connector/informix/util/TestHelper.java @@ -11,10 +11,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.List; +import java.util.concurrent.TimeUnit; -import org.apache.kafka.connect.data.Struct; -import org.awaitility.Durations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,11 +20,9 @@ import io.debezium.config.Configuration; import io.debezium.connector.informix.InformixConnection; import io.debezium.connector.informix.InformixConnectorConfig; -import io.debezium.data.SchemaAndValueField; import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.storage.file.history.FileSchemaHistory; -import io.debezium.util.Clock; -import io.debezium.util.Metronome; import io.debezium.util.Testing; public class TestHelper { @@ -87,6 +83,7 @@ public static Configuration.Builder defaultConfig() { return Configuration.copy(defaultJdbcConfig().map(key -> InformixConnectorConfig.DATABASE_CONFIG_PREFIX + key)) .with(CommonConnectorConfig.TOPIC_PREFIX, TEST_DATABASE) + .with(RelationalDatabaseConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, TimeUnit.SECONDS.toMillis(30)) .with(InformixConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class) .with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH) .with(InformixConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) @@ -125,16 +122,4 @@ public static void assertCdcEnabled(InformixConnection conn) throws SQLException assertThat(is_logging + is_buff_logging).isPositive(); } - public static void assertRecord(Struct record, List expected) { - expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); - } - - public static void waitForCDC() { - try { - Metronome.parker(Durations.TWO_SECONDS, Clock.SYSTEM).pause(); - } - catch (InterruptedException e) { - // IGNORE - } - } } From 3f5fb69369ba330efed2e93d241e17cd08dcbb73 Mon Sep 17 00:00:00 2001 From: Lars M Johansson Date: Wed, 5 Jun 2024 17:56:59 +0200 Subject: [PATCH 2/2] DBZ-7813 (cross-)maven actions --- .github/workflows/cross-maven.yml | 68 ++++++++++++++++----------- .github/workflows/maven.yml | 76 ++++++++++++++++++++----------- 2 files changed, 91 insertions(+), 53 deletions(-) diff --git a/.github/workflows/cross-maven.yml b/.github/workflows/cross-maven.yml index 939b45c..0049ebf 100644 --- a/.github/workflows/cross-maven.yml +++ b/.github/workflows/cross-maven.yml @@ -21,7 +21,7 @@ on: - '*.md' jobs: - build-core: + build_core: runs-on: ubuntu-latest outputs: cache-key: ${{ steps.cache-key-generator.outputs.cache-key }} @@ -39,6 +39,7 @@ jobs: echo "BRANCH_FOUND=true" >> $GITHUB_OUTPUT fi done < SORTED_PULLS.txt + - name: Checkout core repository with pull request branch if: ${{ steps.branch.outputs.BRANCH_FOUND == 'true' }} uses: actions/checkout@v4 @@ -46,6 +47,7 @@ jobs: repository: ${{ github.event.pull_request.user.login }}/debezium ref: ${{ github.head_ref }} path: core + - name: Checkout core repository with default base branch if: ${{ steps.branch.outputs.BRANCH_FOUND != 'true' }} uses: actions/checkout@v4 @@ -53,65 +55,77 @@ jobs: repository: debezium/debezium ref: ${{ github.base_ref }} path: core - - name: Set up JDK + + - name: Set up Java uses: actions/setup-java@v4 with: distribution: temurin java-version: 17 - - name: Cache Maven packages + + - name: Cache Maven Repository uses: actions/cache@v4 with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2 - - name: Maven build core - run: ./core/mvnw clean install -f core/pom.xml -am - -pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka - -DskipTests=true + path: ~/.m2/repository + key: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }} + + - name: Build Debezium (Core) + run: > + ./core/mvnw clean install -B -ntp -f core/pom.xml + -pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi + -am + -DskipTests=true -DskipITs=true -Dcheckstyle.skip=true -Dformat.skip=true -Drevapi.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false + -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 - # This job builds and creates/restores the cache based on the hash of the POM files from the core + # This job builds and creates/restores the cache based on the hash of the POM files from the core # repository; therefore, we need to output this so that the matrix job can reuse this cache. - name: Generate Cache Key id: cache-key-generator - run: echo "cache-key=${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}" >> "$GITHUB_OUTPUT" + run: echo "cache-key=${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}" >> "$GITHUB_OUTPUT" - build-ifx: - needs: build-core - runs-on: ubuntu-latest + build_informix: strategy: + # Runs each combination concurrently matrix: - informix-plugin: [ "assembly,informix12", "assembly,informix14" ] + profile: [ "assembly,informix12", "assembly,informix14" ] fail-fast: false + name: "Informix - ${{ matrix.profile }}" + needs: [ build_core ] + runs-on: ubuntu-latest steps: - - name: Checkout informix repository + - name: Checkout Action (Informix) uses: actions/checkout@v4 with: path: informix - - name: Set up JDK + + - name: Set up Java uses: actions/setup-java@v4 with: distribution: temurin java-version: 17 - - name: Cache Maven packages + + - name: Cache Maven Repository uses: actions/cache@v4 with: - path: ~/.m2 - key: ${{ needs.build-core.outputs.cache-key }} - restore-keys: ${{ needs.build-core.outputs.cache-key }} - - name: Maven build Informix (${{ matrix.informix-plugin }}) + path: ~/.m2/repository + key: ${{ needs.build_core.outputs.cache-key }} + restore-keys: ${{ needs.build_core.outputs.cache-key }} + + - name: Build Informix run: > - ./informix/mvnw clean install -f informix/pom.xml - -P${{ matrix.informix-plugin }} + ./informix/mvnw clean install -B -ntp -f informix/pom.xml + -P${{ matrix.profile }} -Dformat.formatter.goal=validate -Dformat.imports.goal=check - -Ddebezium.test.records.waittime=16 - -Ddebezium.test.records.waittime.after.nulls=16 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false + -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + -Ddebezium.test.records.waittime=7 + -Ddebezium.test.records.waittime.after.nulls=13 -DfailFlakyTests=false diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 05adb6e..e555c9e 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -12,67 +12,91 @@ on: - 2.* - 3.* paths-ignore: - - '*.md' + - '*.md' jobs: - build-core: + build_core: runs-on: ubuntu-latest + outputs: + cache-key: ${{ steps.cache-key-generator.outputs.cache-key }} steps: - - name: Checkout core repository + - name: Checkout Action (Core) uses: actions/checkout@v4 with: repository: debezium/debezium ref: ${{ github.ref }} path: core - - name: Set up JDK + + - name: Set up Java uses: actions/setup-java@v4 with: distribution: temurin java-version: 17 - - name: Cache Maven packages + + - name: Cache Maven Repository uses: actions/cache@v4 with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2 - - name: Maven build core - run: > - ./core/mvnw clean install -B -f core/pom.xml - -pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka -am - -DskipTests=true + path: ~/.m2/repository + key: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }} + restore-keys: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }} + + - name: Build Debezium (Core) + run: > + ./core/mvnw clean install -B -ntp -f core/pom.xml + -pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi + -am + -DskipTests=true -DskipITs=true -Dcheckstyle.skip=true -Dformat.skip=true -Drevapi.skip=true - build-ifx: - needs: build-core - runs-on: ubuntu-latest + -Dhttp.keepAlive=false + -Dmaven.wagon.http.pool=false + -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + + # This job builds and creates/restores the cache based on the hash of the POM files from the core + # repository; therefore, we need to output this so that the matrix job can reuse this cache. + - name: Generate Cache Key + id: cache-key-generator + run: echo "cache-key=${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}" >> "$GITHUB_OUTPUT" + + build_informix: strategy: + # Runs each combination concurrently matrix: - informix-plugin: [ "assembly,informix12", "assembly,informix14" ] + profile: [ "assembly,informix12", "assembly,informix14" ] fail-fast: false + name: "Informix - ${{ matrix.profile }}" + needs: [ build_core ] + runs-on: ubuntu-latest steps: - - name: Checkout informix repository + - name: Checkout Action (Informix) uses: actions/checkout@v4 with: path: informix - - name: Set up JDK + + - name: Set up Java uses: actions/setup-java@v4 with: distribution: temurin java-version: 17 - - name: Cache Maven packages + + - name: Cache Maven Repository uses: actions/cache@v4 with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2 - - name: Maven build Informix (${{ matrix.informix-plugin }}) + path: ~/.m2/repository + key: ${{ needs.build_core.outputs.cache-key }} + restore-keys: ${{ needs.build_core.outputs.cache-key }} + + - name: Build Informix run: > - ./informix/mvnw clean install -B -f informix/pom.xml - -P${{ matrix.informix-plugin }} + ./informix/mvnw clean install -B -ntp -f informix/pom.xml + -P${{ matrix.profile }} -Dformat.formatter.goal=validate -Dformat.imports.goal=check + -Dhttp.keepAlive=false + -Dmaven.wagon.http.pool=false + -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Ddebezium.test.records.waittime=7 -Ddebezium.test.records.waittime.after.nulls=13 -DfailFlakyTests=false