Skip to content

Commit

Permalink
[Improve][Connector-v2] Support '-' in filedName placeholders in jdbc…
Browse files Browse the repository at this point in the history
… sink
  • Loading branch information
dailai committed Dec 6, 2024
1 parent e6f92fd commit 3c9be86
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -667,29 +669,25 @@ public static FieldNamedPreparedStatement prepareStatement(
connection.prepareStatement(parsedSQL), indexMapping);
}

private static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
StringBuilder parsedSql = new StringBuilder();
int fieldIndex = 1; // SQL statement parameter index starts from 1
int length = sql.length();
for (int i = 0; i < length; i++) {
char c = sql.charAt(i);
if (':' == c) {
int j = i + 1;
while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) {
j++;
}
String parameterName = sql.substring(i + 1, j);
checkArgument(
!parameterName.isEmpty(),
"Named parameters in SQL statement must not be empty.");
paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex);
fieldIndex++;
i = j - 1;
parsedSql.append('?');
} else {
parsedSql.append(c);
}
public static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
Pattern pattern =
Pattern.compile(":([\\p{L}\\p{Nl}\\p{Nd}\\p{Pc}\\$\\-\\.@%&*#~!?^+=<>|]+)");
Matcher matcher = pattern.matcher(sql);

StringBuffer result = new StringBuffer();
int fieldIndex = 1;

while (matcher.find()) {
String parameterName = matcher.group(1);
checkArgument(
!parameterName.isEmpty(),
"Named parameters in SQL statement must not be empty.");
paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex++);
matcher.appendReplacement(result, "?");
}
return parsedSql.toString();

matcher.appendTail(result);

return result.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
+ " `c_bit_1` bit(1) DEFAULT NULL,\n"
+ " `c-bit_1` bit(1) DEFAULT NULL,\n"
+ " `c_bit_8` bit(8) DEFAULT NULL,\n"
+ " `c_bit_16` bit(16) DEFAULT NULL,\n"
+ " `c_bit_32` bit(32) DEFAULT NULL,\n"
Expand Down Expand Up @@ -191,7 +191,7 @@ protected void checkResult(
String executeKey, TestContainer container, Container.ExecResult execResult) {
String[] fieldNames =
new String[] {
"c_bit_1",
"c-bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
Expand Down Expand Up @@ -249,7 +249,7 @@ String driverUrl() {
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
"c_bit_1",
"c-bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"

query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ CREATE TABLE sink_table WITH (


INSERT INTO sink_table
SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ CREATE TABLE sink_table WITH (


CREATE TABLE temp1 AS
SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Expand All @@ -58,4 +58,4 @@ CREATE TABLE temp1 AS


INSERT INTO sink_table SELECT * FROM temp1;

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
for (SelectItem selectItem : selectItems) {
if (selectItem.getExpression() instanceof AllColumns) {
for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
fieldNames[idx] = inputRowType.getFieldName(i);
fieldNames[idx] = cleanEscape(inputRowType.getFieldName(i));
seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
if (inputColumnsMapping != null) {
inputColumnsMapping.set(idx, inputRowType.getFieldName(i));
Expand All @@ -194,16 +194,12 @@ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
Expression expression = selectItem.getExpression();
if (selectItem.getAlias() != null) {
String aliasName = selectItem.getAlias().getName();
if (aliasName.startsWith(ESCAPE_IDENTIFIER)
&& aliasName.endsWith(ESCAPE_IDENTIFIER)) {
aliasName = aliasName.substring(1, aliasName.length() - 1);
}
fieldNames[idx] = aliasName;
fieldNames[idx] = cleanEscape(aliasName);
} else {
if (expression instanceof Column) {
fieldNames[idx] = ((Column) expression).getColumnName();
fieldNames[idx] = cleanEscape(((Column) expression).getColumnName());
} else {
fieldNames[idx] = expression.toString();
fieldNames[idx] = cleanEscape(expression.toString());
}
}

Expand All @@ -225,6 +221,13 @@ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
fieldNames, seaTunnelDataTypes, lateralViews, inputColumnsMapping);
}

private static String cleanEscape(String columnName) {
if (columnName.startsWith(ESCAPE_IDENTIFIER) && columnName.endsWith(ESCAPE_IDENTIFIER)) {
columnName = columnName.substring(1, columnName.length() - 1);
}
return columnName;
}

@Override
public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow, SeaTunnelRowType outRowType) {
// ------Physical Query Plan Execution------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ public void testEscapeIdentifier() {
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
"select id, trim(`apply`) as `apply` from test where `apply` = 'a'"));
"select `id`, trim(`apply`) as `apply` from test where `apply` = 'a'"));
SQLTransform sqlTransform = new SQLTransform(config, table);
TableSchema tableSchema = sqlTransform.transformTableSchema();
List<SeaTunnelRow> result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1), String.valueOf("a")}));
Assertions.assertEquals("id", tableSchema.getFieldNames()[0]);
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals("a", result.get(0).getField(1));
result =
Expand Down

0 comments on commit 3c9be86

Please sign in to comment.