Skip to content

Commit

Permalink
DBZ-7813 Exclude Column List
Browse files Browse the repository at this point in the history
  • Loading branch information
nrkljo authored and jpechane committed Jun 13, 2024
1 parent 700718e commit 04020d8
Show file tree
Hide file tree
Showing 19 changed files with 325 additions and 220 deletions.
17 changes: 10 additions & 7 deletions .github/workflows/cross-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ on:
pull_request:
branches:
- main
- 1.*
- 2.*
- 3.*
paths-ignore:
- '*.md'

Expand Down Expand Up @@ -65,14 +65,15 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: Maven build core
run: >
./core/mvnw clean install -B -f core/pom.xml
-pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka -am
run: ./core/mvnw clean install -f core/pom.xml -am
-pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka
-DskipTests=true
-DskipITs=true
-Dcheckstyle.skip=true
-Dformat.skip=true
-Drevapi.skip=true
-Dhttp.keepAlive=false
-Dmaven.wagon.http.pool=false

# This job builds and creates/restores the cache based on the hash of the POM files from the core
# repository; therefore, we need to output this so that the matrix job can reuse this cache.
Expand Down Expand Up @@ -105,10 +106,12 @@ jobs:
restore-keys: ${{ needs.build-core.outputs.cache-key }}
- name: Maven build Informix (${{ matrix.informix-plugin }})
run: >
./informix/mvnw clean install -B -f informix/pom.xml
./informix/mvnw clean install -f informix/pom.xml
-P${{ matrix.informix-plugin }}
-Dformat.formatter.goal=validate
-Dformat.imports.goal=check
-Ddebezium.test.records.waittime=7
-Ddebezium.test.records.waittime.after.nulls=13
-Ddebezium.test.records.waittime=16
-Ddebezium.test.records.waittime.after.nulls=16
-Dhttp.keepAlive=false
-Dmaven.wagon.http.pool=false
-DfailFlakyTests=false
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ on:
push:
branches:
- main
- 1.*
- 2.*
- 3.*
paths-ignore:
- '*.md'

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@
<name>${docker.dbs}:${informix.version}</name>
<run>
<env>
<SIZE>small</SIZE>
<SIZE>medium</SIZE>
<LICENSE>accept</LICENSE>
<USEOSTIME>1</USEOSTIME>
</env>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.informix.jdbc.IfmxReadableType;

import io.debezium.DebeziumException;
import io.debezium.data.Envelope.Operation;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.TableSchema;
Expand Down Expand Up @@ -56,7 +57,8 @@ protected Object[] getNewColumnValues() {
@Override
protected void emitTruncateRecord(Receiver<InformixPartition> receiver, TableSchema tableSchema) throws InterruptedException {
receiver.changeRecord(getPartition(), tableSchema, Operation.TRUNCATE, null,
tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()), getOffset(), null);
tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()),
getOffset(), null);
}

/**
Expand All @@ -79,8 +81,11 @@ private static <X> X propagate(Callable<X> callable) {
try {
return callable.call();
}
catch (Exception e) {
throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
catch (RuntimeException rex) {
throw rex;
}
catch (Exception ex) {
throw new DebeziumException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.time.Instant;
Expand All @@ -28,8 +26,6 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;

Expand Down Expand Up @@ -80,6 +76,19 @@ public InformixConnection(JdbcConfiguration config) {
realDatabaseName = retrieveRealDatabaseName().trim();
}

private String retrieveRealDatabaseName() {
try {
return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name"));
}
catch (SQLException e) {
throw new RuntimeException("Couldn't obtain database name", e);
}
}

public String getRealDatabaseName() {
return realDatabaseName;
}

/**
* Calculates the highest available Log Sequence Number.
* Tecnically, the _exact_ highest available LSN is not available in the JDBC session, but the current page of the active
Expand All @@ -100,16 +109,33 @@ public Lsn getMaxLsn() throws SQLException {
}, "Maximum LSN query must return exactly one value"));
}

public String getRealDatabaseName() {
return realDatabaseName;
/**
* Calculates the lowest available Log Sequence Number.
* @see InformixConnection#getMaxLsn()
* @return the current lowest log sequence number
*/
private Lsn getMinLsn() throws SQLException {
return queryAndMap(GET_MIN_LSN, singleResultMapper(rs -> {
final Lsn lsn = Lsn.of(rs.getLong("uniqid"), rs.getLong("logpage") << 12);
LOGGER.trace("Current minimum lsn is {}", lsn.toLongString());
return lsn;
}, "Minimum LSN query must return exactly one value"));
}

private String retrieveRealDatabaseName() {
public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
final TxLogPosition lastPosition = ((InformixOffsetContext) offset).getChangePosition();
final Lsn lastBeginLsn = lastPosition.getBeginLsn();
final Lsn restartLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastPosition.getCommitLsn();
LOGGER.trace("Restart LSN is '{}'", restartLsn);

try {
return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name"));
final Lsn minLsn = getMinLsn();
LOGGER.trace("Lowest available LSN is '{}'", minLsn);

return restartLsn.isAvailable() && minLsn.isAvailable() && restartLsn.compareTo(minLsn) >= 0;
}
catch (SQLException e) {
throw new RuntimeException("Couldn't obtain database name", e);
throw new DebeziumException("Couldn't obtain lowest available Log Sequence Number", e);
}
}

Expand Down Expand Up @@ -158,22 +184,6 @@ public String quotedColumnIdString(String columnName) {
return columnName;
}

public Table getTableSchemaFromTableId(TableId tableId) throws SQLException {
final DatabaseMetaData metadata = connection().getMetaData();
try (ResultSet columns = metadata.getColumns(
tableId.catalog(),
tableId.schema(),
tableId.table(),
null)) {
final TableEditor tableEditor = Table.editor().tableId(tableId);
while (columns.next()) {
readTableColumn(columns, tableId, null).ifPresent(columnEditor -> tableEditor.addColumns(columnEditor.create()));
}
tableEditor.setPrimaryKeyNames(readPrimaryKeyNames(metadata, tableId));
return tableEditor.create();
}
}

public DataSource datasource() {
return new DataSource() {
@Override
Expand Down Expand Up @@ -228,31 +238,4 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException {
};
}

public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {

final Lsn storedLsn = ((InformixOffsetContext) offset).getChangePosition().getCommitLsn();

try {
final Lsn oldestLsn = getOldestLsn();

if (oldestLsn == null) {
return false;
}

LOGGER.trace("Oldest SCN in logs is '{}'", oldestLsn);
return storedLsn == null || oldestLsn.compareTo(storedLsn) < 0;
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}

private Lsn getOldestLsn() throws SQLException {
return queryAndMap(GET_MIN_LSN, singleResultMapper(rs -> {
final Lsn lsn = Lsn.of(rs.getLong("uniqid"), rs.getLong("logpage") << 12);
LOGGER.trace("Current minimum lsn is {}", lsn.toLongString());
return lsn;
}, "Minimum LSN query must return exactly one value"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
Expand Down Expand Up @@ -75,4 +76,14 @@ else if (schemaChange.getType() == SchemaChangeEventType.ALTER) {
protected DdlParser getDdlParser() {
return null;
}

@Override
public Tables tables() {
return super.tables();
}

@Override
public Tables.TableFilter getTableFilter() {
return super.getTableFilter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXC
LOGGER.info("Locking table {}", tableId);
statement.execute(lockingStatement.get());
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
Expand Down Expand Up @@ -88,19 +87,25 @@ public void execute(ChangeEventSourceContext context, InformixPartition partitio
throws InterruptedException {

// Need to refresh schema before CDCEngine is started, to capture columns added in off-line schema evolution
try {
for (TableId tableId : schema.tableIds()) {
final Table table = metadataConnection.getTableSchemaFromTableId(tableId);
schema.refresh(table);
schema.tableIds().stream().map(TableId::schema).distinct().forEach(schemaName -> {
try {
metadataConnection.readSchema(
schema.tables(),
null,
schemaName,
schema.getTableFilter(),
null,
true);
}
}
catch (SQLException e) {
LOGGER.error("Caught SQLException", e);
errorHandler.setProducerThrowable(e);
}
catch (SQLException e) {
LOGGER.error("Caught SQLException", e);
errorHandler.setProducerThrowable(e);
}
});

TxLogPosition lastPosition = offsetContext.getChangePosition();
Lsn lastCommitLsn = lastPosition.getCommitLsn();
Lsn lastChangeLsn = lastPosition.getChangeLsn();
Lsn lastBeginLsn = lastPosition.getBeginLsn();
Lsn beginLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastCommitLsn;

Expand Down Expand Up @@ -136,9 +141,15 @@ public void execute(ChangeEventSourceContext context, InformixPartition partitio
transactionRecord.getTransactionId(), commitLsn, lastCommitLsn);
break;
}
if (commitLsn.equals(lastCommitLsn) && lastChangeLsn.equals(lastCommitLsn)) {
LOGGER.info("Skipping transaction with id: '{}' since commitLsn='{}' == lastCommitLsn='{}' and lastChangeLsn='{}' == lastCommitLsn",
transactionRecord.getTransactionId(), commitLsn, lastCommitLsn, lastChangeLsn);
break;
}
if (commitLsn.compareTo(lastCommitLsn) > 0) {
Lsn currentLsn = Lsn.of(transactionRecord.getBeginRecord().getSequenceId());
LOGGER.info("Recover finished: from lastBeginLsn='{}' to lastCommitLsn='{}', current Lsn='{}'",
lastBeginLsn, lastCommitLsn, commitLsn);
lastBeginLsn, lastCommitLsn, currentLsn);
recovering = false;
}
handleTransaction(transactionEngine, partition, offsetContext, transactionRecord, recovering);
Expand Down Expand Up @@ -248,6 +259,8 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
throws InterruptedException, IfxStreamException {
long tStart = System.nanoTime();

long lastChangeSeq = offsetContext.getChangePosition().getChangeLsn().sequence();

int transactionId = transactionRecord.getTransactionId();

IfxCDCBeginTransactionRecord beginRecord = transactionRecord.getBeginRecord();
Expand All @@ -257,11 +270,11 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart

long beginTs = beginRecord.getTime();
long beginSeq = beginRecord.getSequenceId();
long lowestBeginSeq = engine.getLowestBeginSequence().orElse(beginSeq);
long endSeq = endRecord.getSequenceId();
long restartSeq = engine.getLowestBeginSequence().orElse(endSeq);

if (!recover) {
updateChangePosition(offsetContext, endSeq, beginSeq, transactionId, lowestBeginSeq < beginSeq ? lowestBeginSeq : beginSeq);
updateChangePosition(offsetContext, endSeq, beginSeq, transactionId, Math.min(restartSeq, beginSeq));
dispatcher.dispatchTransactionStartedEvent(
partition,
String.valueOf(transactionId),
Expand All @@ -276,7 +289,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart

if (IfmxStreamRecordType.COMMIT.equals(endRecord.getType())) {
IfxCDCCommitTransactionRecord commitRecord = (IfxCDCCommitTransactionRecord) endRecord;
long commitSeq = commitRecord.getSequenceId();
long commitSeq = endSeq;
long commitTs = commitRecord.getTime();

Map<String, IfmxReadableType> before = null;
Expand All @@ -287,7 +300,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart

long changeSeq = streamRecord.getSequenceId();

if (recover && Lsn.of(changeSeq).compareTo(offsetContext.getChangePosition().getChangeLsn()) <= 0) {
if (recover && changeSeq <= lastChangeSeq) {
LOGGER.info("Skipping already processed record {}", changeSeq);
continue;
}
Expand Down Expand Up @@ -370,7 +383,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart

start = System.nanoTime();

updateChangePosition(offsetContext, commitSeq, commitSeq, transactionId, lowestBeginSeq);
updateChangePosition(offsetContext, commitSeq, commitSeq, transactionId, restartSeq);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, Instant.ofEpochSecond(commitTs));

end = System.nanoTime();
Expand All @@ -384,7 +397,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
if (IfmxStreamRecordType.ROLLBACK.equals(endRecord.getType())) {

if (!recover) {
updateChangePosition(offsetContext, endSeq, endSeq, transactionId, null);
updateChangePosition(offsetContext, endSeq, endSeq, transactionId, restartSeq);
offsetContext.getTransactionContext().endTransaction();
}

Expand Down
Loading

0 comments on commit 04020d8

Please sign in to comment.