Skip to content

Commit

Permalink
DBZ-7616 Honor the message.key.columns order while building query for…
Browse files Browse the repository at this point in the history
… incremental snapshot

(cherry picked from commit 71256cf)
  • Loading branch information
mfvitale committed Mar 14, 2024
1 parent a617e2f commit e60f880
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Struct;
import org.junit.After;
Expand Down Expand Up @@ -249,6 +250,37 @@ record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()
}
}

@Test
@FixFor("DBZ-7617")
public void incrementalSnapshotMustRespectMessageKeyColumnsOrder() throws Exception {
// Testing.Print.enable();

try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (3, 1, 1, 1, 0)");
connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (2, 2, 2, 2, 1)");
connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (1, 2, 2, 2, 2)");

connection.commit();
}

startConnector(builder -> mutableConfig(false, true)
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a4")
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", "s1.a4", "pk2,pk1")));

sendAdHocSnapshotSignal("s1.a4");

Thread.sleep(5000);

SourceRecords sourceRecords = consumeAvailableRecordsByTopic();
List<Integer> ordered = sourceRecords.recordsForTopic("test_server.s1.a4").stream()
.map(sourceRecord -> ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()))
.collect(Collectors.toList());

assertThat(ordered).containsExactly(0, 2, 1);

}

@Test
public void inserts4PksWithKafkaSignal() throws Exception {
// Testing.Print.enable();
Expand Down
41 changes: 25 additions & 16 deletions debezium-core/src/main/java/io/debezium/relational/Key.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,31 +118,40 @@ public static KeyMapper getInstance(String fullyQualifiedColumnNames, TableIdToS
// ex: message.key.columns=inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
// will become => [inventory.customers.pk1,inventory.customers.pk2,(.*).purchaseorders.pk3,(.*).purchaseorders.pk4]
// then joining those values
String regexes = Arrays.stream(PATTERN_SPLIT.split(fullyQualifiedColumnNames))
List<Predicate<ColumnId>> predicates = new ArrayList<>(Arrays.stream(PATTERN_SPLIT.split(fullyQualifiedColumnNames))
.map(TABLE_SPLIT::split)
.collect(
ArrayList<String>::new,
(m, p) -> Arrays.asList(COLUMN_SPLIT.split(p[1])).forEach(c -> m.add(p[0] + "." + c)),
ArrayList::addAll)
ArrayList::addAll))
.stream()
.collect(Collectors.joining(","));

Predicate<ColumnId> delegate = Predicates.includes(regexes, ColumnId::toString);
.map(regex -> Predicates.includes(regex, ColumnId::toString))
.collect(Collectors.toList());

return (table) -> {
List<Column> candidates = table.columns()
.stream()
.filter(c -> {
final TableId tableId = table.id();
if (tableIdMapper == null) {
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()));
}
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()))
|| delegate.test(new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name()));
})
.collect(Collectors.toList());
List<Column> candidates = new ArrayList<>();
for (Predicate<ColumnId> predicate : predicates) {

candidates.addAll(
table.columns()
.stream()
.filter(c -> matchColumn(tableIdMapper, table, predicate, c))
.collect(Collectors.toList()));
}

return candidates.isEmpty() ? table.primaryKeyColumns() : candidates;
};
}
}

private static boolean matchColumn(TableIdToStringMapper tableIdMapper, Table table, Predicate<ColumnId> predicate, Column c) {

final TableId tableId = table.id();
if (tableIdMapper == null) {
return predicate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()));
}
return predicate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()))
|| predicate.test(
new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name()));
}
}

0 comments on commit e60f880

Please sign in to comment.