Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): support ingest Citus distributed table #8988

Merged
merged 10 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public enum SourceTypeE {
MYSQL,
POSTGRES,
CITUS,
INVALID;

public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) {
Expand All @@ -27,6 +28,8 @@ public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) {
return SourceTypeE.MYSQL;
case POSTGRES:
return SourceTypeE.POSTGRES;
case CITUS:
return SourceTypeE.CITUS;
default:
return SourceTypeE.INVALID;
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ public class DbzConnectorConfig {
public static final String DB_NAME = "database.name";
public static final String TABLE_NAME = "table.name";

public static final String DB_SERVERS = "database.servers";

/* MySQL specified configs */
public static final String MYSQL_SERVER_ID = "server.id";

/* Postgres specified configs */
public static final String PG_SLOT_NAME = "slot.name";
public static final String PG_SCHEMA_NAME = "schema.name";

private static final String DBZ_CONFIG_FILE = "debezium.properties";
private static final String MYSQL_CONFIG_FILE = "mysql.properties";
private static final String POSTGRES_CONFIG_FILE = "postgres.properties";

Expand Down Expand Up @@ -81,15 +84,8 @@ public Properties getResolvedDebeziumProps() {

public DbzConnectorConfig(
SourceTypeE source, long sourceId, String startOffset, Map<String, String> userProps) {
var dbzProps = new Properties();
try (var input = getClass().getClassLoader().getResourceAsStream("debezium.properties")) {
assert input != null;
dbzProps.load(input);
} catch (IOException e) {
throw new RuntimeException("failed to load debezium.properties", e);
}

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
if (source == SourceTypeE.MYSQL) {
var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor);
// if offset is specified, we will continue binlog reading from the specified offset
Expand All @@ -104,9 +100,14 @@ public DbzConnectorConfig(
}

dbzProps.putAll(mysqlProps);
} else if (source == SourceTypeE.POSTGRES) {
} else if (source == SourceTypeE.POSTGRES || source == SourceTypeE.CITUS) {
var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor);

// citus needs all_tables publication to capture all shards
if (source == SourceTypeE.CITUS) {
postgresProps.setProperty("publication.autocreate.mode", "all_tables");
}

// if offset is specified, we will continue reading changes from the specified offset
if (null != startOffset && !startOffset.isBlank()) {
postgresProps.setProperty("snapshot.mode", "never");
Expand Down Expand Up @@ -134,7 +135,7 @@ private Properties initiateDbConfig(String fileName, StringSubstitutor substitut
var resolvedStr = substitutor.replace(inputStr);
dbProps.load(new StringReader(resolvedStr));
} catch (IOException e) {
throw new RuntimeException("failed to load " + fileName, e);
throw new RuntimeException("failed to load config file " + fileName, e);
}
return dbProps;
}
Expand Down
Loading