Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-8016 Introduce lowercase and uppercase naming strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Jul 3, 2024
1 parent 74cf333 commit d63221b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.config.Instantiator;
import io.debezium.connector.jdbc.filter.FieldFilterFactory;
import io.debezium.connector.jdbc.filter.FieldFilterFactory.FieldNameFilter;
import io.debezium.connector.jdbc.naming.ColumnNamingStrategy;
import io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy;
import io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy;
import io.debezium.connector.jdbc.naming.LowercaseColumnNamingStrategy;
import io.debezium.connector.jdbc.naming.LowercaseTableNamingStrategy;
import io.debezium.connector.jdbc.naming.TableNamingStrategy;
import io.debezium.connector.jdbc.naming.UppercaseColumnNamingStrategy;
import io.debezium.connector.jdbc.naming.UppercaseTableNamingStrategy;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -533,8 +539,8 @@ public JdbcSinkConnectorConfig(Map<String, String> props) {
this.schemaEvolutionMode = SchemaEvolutionMode.parse(config.getString(SCHEMA_EVOLUTION));
this.quoteIdentifiers = config.getBoolean(QUOTE_IDENTIFIERS_FIELD);
// this.dataTypeMapping = Strings.setOf(config.getString(DATA_TYPE_MAPPING_FIELD), String::new);
this.tableNamingStrategy = config.getInstance(TABLE_NAMING_STRATEGY_FIELD, TableNamingStrategy.class);
this.columnNamingStrategy = config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class);
this.tableNamingStrategy = resolveTableNamingStrategy(config);
this.columnNamingStrategy = resolveColumnNamingStrategy(config);
this.databaseTimezone = config.getString(DATABASE_TIME_ZONE_FIELD);
this.postgresPostgisSchema = config.getString(POSTGRES_POSTGIS_SCHEMA_FIELD);
this.sqlServerIdentityInsert = config.getBoolean(SQLSERVER_IDENTITY_INSERT_FIELD);
Expand Down Expand Up @@ -711,4 +717,32 @@ private static int validateDeleteEnabled(Configuration config, Field field, Vali
}
return 0;
}

private static ColumnNamingStrategy resolveColumnNamingStrategy(Configuration config) {
final String className = config.getString(COLUMN_NAMING_STRATEGY_FIELD);
if (!Strings.isNullOrEmpty(className)) {
if (className.equalsIgnoreCase("lowercase")) {
return Instantiator.getInstance(LowercaseColumnNamingStrategy.class.getName());
}
else if (className.equalsIgnoreCase("uppercase")) {
return Instantiator.getInstance(UppercaseColumnNamingStrategy.class.getName());
}
return config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class);
}
throw new DebeziumException("Failed to resolve column naming strategy.");
}

private static TableNamingStrategy resolveTableNamingStrategy(Configuration config) {
final String className = config.getString(TABLE_NAMING_STRATEGY_FIELD);
if (!Strings.isNullOrEmpty(className)) {
if (className.equalsIgnoreCase("lowercase")) {
return Instantiator.getInstance(LowercaseTableNamingStrategy.class.getName());
}
else if (className.equalsIgnoreCase("uppercase")) {
return Instantiator.getInstance(UppercaseTableNamingStrategy.class.getName());
}
return config.getInstance(TABLE_NAMING_STRATEGY_FIELD, TableNamingStrategy.class);
}
throw new DebeziumException("Failed to resolve table naming strategy.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.naming;

/**
* The lower-case implementation of the {@link ColumnNamingStrategy} that simply returns the field's
* name as the column name in the destination table but in all lower-case.
*
* @author Chris Cranford
*/
public class LowercaseColumnNamingStrategy implements ColumnNamingStrategy {
@Override
public String resolveColumnName(String fieldName) {
return fieldName.toLowerCase();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.naming;

import org.apache.kafka.connect.sink.SinkRecord;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;

/**
* A lower-case implementation of the {@link DefaultTableNamingStrategy} where the computed table name
* will also be returned in all lower-case.
*
* @author Chris Cranford
*/
public class LowercaseTableNamingStrategy extends DefaultTableNamingStrategy {
@Override
public String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record) {
return super.resolveTableName(config, record).toLowerCase();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.naming;

/**
* The upper-case implementation of the {@link ColumnNamingStrategy} that simply returns the field's
* name as the column name in the destination table but in all upper-case.
*
* @author Chris Cranford
*/
public class UppercaseColumnNamingStrategy implements ColumnNamingStrategy {
@Override
public String resolveColumnName(String fieldName) {
return fieldName.toUpperCase();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.naming;

import org.apache.kafka.connect.sink.SinkRecord;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;

/**
* An upper-case implementation of the {@link DefaultTableNamingStrategy} where the computed table name
* will also be returned in all upper-case.
*
* @author Chris Cranford
*/
public class UppercaseTableNamingStrategy extends DefaultTableNamingStrategy {
@Override
public String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record) {
return super.resolveTableName(config, record).toUpperCase();
}
}

0 comments on commit d63221b

Please sign in to comment.