Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7824 Introduce dialect.starrocks.catalog_name config property
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Apr 26, 2024
1 parent ace18cd commit 17735b4
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class JdbcSinkConnectorConfig {
public static final String DATABASE_TIME_ZONE = "database.time_zone";
public static final String POSTGRES_POSTGIS_SCHEMA = "dialect.postgres.postgis.schema";
public static final String SQLSERVER_IDENTITY_INSERT = "dialect.sqlserver.identity.insert";
public static final String STARROCKS_CATALOG_NAME = "dialect.starrocks.catalog_name";
public static final String BATCH_SIZE = "batch.size";
public static final String FIELD_INCLUDE_LIST = "field.include.list";
public static final String FIELD_EXCLUDE_LIST = "field.exclude.list";
Expand Down Expand Up @@ -277,6 +278,14 @@ public class JdbcSinkConnectorConfig {
.withDefault(false)
.withDescription("Allowing to insert explicit value for identity column in table for SQLSERVER.");

public static final Field STARROCKS_CATALOG_NAME_FIELD = Field.create(STARROCKS_CATALOG_NAME)
.withDisplayName("Specifies the catalog name to use when connecting to StarRocks")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 4))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("The default catalog to use when connecting to StarRocks");

public static final Field BATCH_SIZE_FIELD = Field.create(BATCH_SIZE)
.withDisplayName("Specifies how many records to attempt to batch together into the destination table, when possible. " +
"You can also configure the connector’s underlying consumer’s max.poll.records using consumer.override.max.poll.records in the connector configuration.")
Expand Down Expand Up @@ -331,6 +340,7 @@ public class JdbcSinkConnectorConfig {
DATABASE_TIME_ZONE_FIELD,
POSTGRES_POSTGIS_SCHEMA_FIELD,
SQLSERVER_IDENTITY_INSERT_FIELD,
STARROCKS_CATALOG_NAME_FIELD,
BATCH_SIZE_FIELD,
FIELD_INCLUDE_LIST_FIELD,
FIELD_EXCLUDE_LIST_FIELD)
Expand Down Expand Up @@ -505,6 +515,7 @@ public String getValue() {
private final String databaseTimezone;
private final String postgresPostgisSchema;
private final boolean sqlServerIdentityInsert;
private final String starRocksCatalogName;
private FieldNameFilter fieldsFilter;

private final long batchSize;
Expand All @@ -525,6 +536,7 @@ public JdbcSinkConnectorConfig(Map<String, String> props) {
this.databaseTimezone = config.getString(DATABASE_TIME_ZONE_FIELD);
this.postgresPostgisSchema = config.getString(POSTGRES_POSTGIS_SCHEMA_FIELD);
this.sqlServerIdentityInsert = config.getBoolean(SQLSERVER_IDENTITY_INSERT_FIELD);
this.starRocksCatalogName = config.getString(STARROCKS_CATALOG_NAME_FIELD);
this.batchSize = config.getLong(BATCH_SIZE_FIELD);

String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST);
Expand Down Expand Up @@ -623,6 +635,10 @@ public String getPostgresPostgisSchema() {
return postgresPostgisSchema;
}

public String getStarRocksCatalogName() {
return starRocksCatalogName;
}

/** makes {@link org.hibernate.cfg.Configuration} from connector config
*
* @return {@link org.hibernate.cfg.Configuration}
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/debezium/connector/jdbc/RecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void write(List<SinkRecordDescriptor> records, String sqlStatement) {
private Work processBatch(List<SinkRecordDescriptor> records, String sqlStatement) {

return conn -> {
// Allow doing some prep work for certain dialects/databases
dialect.prepareConnection(conn);

try (PreparedStatement prepareStatement = conn.prepareStatement(sqlStatement)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,12 @@ default String getTimeQueryBinding() {
* @return the list of bounded values
*/
List<ValueBindDescriptor> bindValue(FieldDescriptor field, int startIndex, Object value);

/**
* Prepares the connection for use
*
* @param connection the connection, should never be {@code null}
*/
default void prepareConnection(Connection connection) throws SQLException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package io.debezium.connector.jdbc.dialect.mysql;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
Expand Down Expand Up @@ -65,9 +67,11 @@ public DatabaseDialect instantiate(JdbcSinkConnectorConfig config, SessionFactor
}

private final boolean connectionTimeZoneSet;
private final String starRocksCatalogName;

private MySqlDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory sessionFactory) {
super(config, sessionFactory);
this.starRocksCatalogName = config.getStarRocksCatalogName();

try (StatelessSession session = sessionFactory.openStatelessSession()) {
this.connectionTimeZoneSet = session.doReturningWork((connection) -> connection.getMetaData().getURL().contains("connectionTimeZone="));
Expand Down Expand Up @@ -190,4 +194,13 @@ protected void addColumnDefaultValue(SinkRecordDescriptor.FieldDescriptor field,
}
super.addColumnDefaultValue(field, columnSpec);
}

@Override
public void prepareConnection(Connection connection) throws SQLException {
if (!Strings.isNullOrBlank(starRocksCatalogName)) {
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s", starRocksCatalogName));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testInsertModeInsertWithNoPrimaryKey(SinkRecordFactory factory) {
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.NONE.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue());
properties.put(JdbcSinkConnectorConfig.STARROCKS_CATALOG_NAME, "demo");
startSinkConnector(properties);
assertSinkConnectorIsRunning();

Expand Down

0 comments on commit 17735b4

Please sign in to comment.