Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Apr 17, 2023
1 parent 8596f49 commit 9d60c43
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) {
}
}

private void ensureArgNotNull(Map<String, String> props, String key) {
if (!props.containsKey(key)) {
throw new RuntimeException(String.format("'%s' not found", key));
private void ensurePropNotNull(Map<String, String> props, String name) {
if (!props.containsKey(name)) {
throw new RuntimeException(
String.format("'%s' not found, please check the WITH properties", name));
}
}

Expand All @@ -96,9 +97,10 @@ private void validateProperties(
throws Exception {
var props = validate.getPropertiesMap();

ensureArgNotNull(props, DbzConnectorConfig.HOST);
ensureArgNotNull(props, DbzConnectorConfig.PORT);
ensureArgNotNull(props, DbzConnectorConfig.DB_NAME);
ensurePropNotNull(props, DbzConnectorConfig.HOST);
ensurePropNotNull(props, DbzConnectorConfig.PORT);
ensurePropNotNull(props, DbzConnectorConfig.DB_NAME);
ensurePropNotNull(props, DbzConnectorConfig.TABLE_NAME);
String jdbcUrl =
getJdbcPrefix(validate.getSourceType())
+ "://"
Expand All @@ -118,12 +120,13 @@ private void validateProperties(
throw new RuntimeException(e);
}

ensureArgNotNull(props, DbzConnectorConfig.USER);
ensureArgNotNull(props, DbzConnectorConfig.PASSWORD);
ensurePropNotNull(props, DbzConnectorConfig.USER);
ensurePropNotNull(props, DbzConnectorConfig.PASSWORD);
String dbUser = props.get(DbzConnectorConfig.USER);
String dbPassword = props.get(DbzConnectorConfig.PASSWORD);
switch (validate.getSourceType()) {
case POSTGRES:
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var validator =
new PostgresValidator(
jdbcUrl,
Expand All @@ -137,6 +140,7 @@ private void validateProperties(
break;

case CITUS:
ensurePropNotNull(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator =
new PostgresValidator(
jdbcUrl,
Expand All @@ -149,7 +153,7 @@ private void validateProperties(
coordinatorValidator.validateTableSchema();
}

ensureArgNotNull(props, DbzConnectorConfig.DB_SERVERS);
ensurePropNotNull(props, DbzConnectorConfig.DB_SERVERS);
var servers = props.get(DbzConnectorConfig.DB_SERVERS);
var workerAddrs = StringUtils.split(servers, ',');
var jdbcPrefix = getJdbcPrefix(validate.getSourceType());
Expand Down

0 comments on commit 9d60c43

Please sign in to comment.