Skip to content

Commit

Permalink
avoid slot name conflict in the backfill task
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanhang1993 committed Jun 15, 2023
1 parent c4c3f91 commit 3a0891e
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public class PostgresSourceConfig extends JdbcSourceConfig {

private static final long serialVersionUID = 1L;

private final int subtaskId;

public PostgresSourceConfig(
int subtaskId,
StartupOptions startupOptions,
List<String> databaseList,
List<String> schemaList,
Expand Down Expand Up @@ -81,6 +84,11 @@ public PostgresSourceConfig(
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn);
this.subtaskId = subtaskId;
}

public int getSubtaskId() {
return subtaskId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public PostgresSourceConfig create(int subtaskId) {

Configuration dbzConfiguration = Configuration.from(props);
return new PostgresSourceConfig(
subtaskId,
startupOptions,
Collections.singletonList(database),
schemaList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffset;
import com.ververica.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
import com.ververica.cdc.connectors.postgres.source.utils.PostgresQueryUtils;
Expand Down Expand Up @@ -152,11 +153,17 @@ private void executeBackfillTask(

// we should only capture events for the current table,
// otherwise, we may not find corresponding schema
PostgresSourceConfig pgSourceConfig = (PostgresSourceConfig) ctx.getSourceConfig();
Configuration dbzConf =
ctx.getDbzConnectorConfig()
.getConfig()
.edit()
.with("table.include.list", split.getTableId().toString())
.with(
"slot.name",
pgSourceConfig.getDbzProperties().getProperty("slot.name")
+ "_"
+ pgSourceConfig.getSubtaskId())
// Disable heartbeat event in snapshot split fetcher
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
.build();
Expand All @@ -174,6 +181,7 @@ private void executeBackfillTask(
ctx.getReplicationConnection(),
backfillSplit);
LOG.info("Execute backfillReadTask for split {}", split);
LOG.info("Slot name {}", dbzConf.getString("slot.name"));
backfillReadTask.execute(new PostgresChangeEventSourceContext(), postgresOffsetContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import io.debezium.connector.postgresql.PostgresOffsetContext;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/** Utils for handling {@link PostgresOffset}. */
public class PostgresOffsetUtils {
Expand All @@ -32,11 +32,13 @@ public static PostgresOffsetContext getPostgresOffsetContext(
Objects.requireNonNull(offset, "offset is null for the sourceSplitBase")
.getOffset();
// all the keys happen to be long type for PostgresOffsetContext.Loader.load
Map<String, Object> offsetMap =
offsetStrMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> Long.parseLong(e.getValue())));
Map<String, Object> offsetMap = new HashMap<>();
for (String key : offsetStrMap.keySet()) {
String value = offsetStrMap.get(key);
if (value != null) {
offsetMap.put(key, Long.parseLong(value));
}
}
return loader.load(offsetMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public abstract class PostgresTestBase extends AbstractTestBase {
// use newer version of postgresql image to support pgoutput plugin
// when testing postgres 13, only 13-alpine supports both amd64 and arm64
protected static final DockerImageName PG_IMAGE =
DockerImageName.parse("debezium/postgres:13")
.asCompatibleSubstituteFor("postgres");
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres");

public static final PostgreSQLContainer<?> POSTGERS_CONTAINER =
new PostgreSQLContainer<>(PG_IMAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ private void checkStreamData(

private String getSlotName() {
final Random random = new Random();
int id = random.nextInt(100);
int id = random.nextInt(10000);
return "flink_" + id;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ public void testAllTypes() throws Throwable {
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"

+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
Expand Down

0 comments on commit 3a0891e

Please sign in to comment.