Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Sink-JDBC] Named parameters in SQL statement must not be empty #8530

Open
2 of 3 tasks
NookVoive opened this issue Jan 16, 2025 · 1 comment
Open
2 of 3 tasks

Comments

@NookVoive
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

当我想要把一张表从一个HANA数据库同步到数据到另一个HANA数据库时,因为字段名包含:/ 而导致程序报错。
如果我不同步这些字段程序就没问题了!

字段名:

/BEV1/LULEINH
/BEV1/RPFAESS
/BEV1/RPKIST
/BEV1/RPCONT
/BEV1/RPSONST
/BEV1/RPFLGNR

When I tried to synchronize the data of a table from one HANA database to another, the program reported an error because the field name contained the character "/".
If I don't synchronize these fields, there will be no problem with the program!

 Named parameters in SQL statement must not be empty

根据提示我找到了这段源码:

 private static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
        StringBuilder parsedSql = new StringBuilder();
        int fieldIndex = 1; // SQL statement parameter index starts from 1
        int length = sql.length();
        for (int i = 0; i < length; i++) {
            char c = sql.charAt(i);
            if (':' == c) {
                int j = i + 1;
                while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) {
                    j++;
                }
                String parameterName = sql.substring(i + 1, j);
                checkArgument(
                        !parameterName.isEmpty(),
                        "Named parameters in SQL statement must not be empty.");
                paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex);
                fieldIndex++;
                i = j - 1;
                parsedSql.append('?');
            } else {
                parsedSql.append(c);
            }
        }
        return parsedSql.toString();
    }

我想是不是源码中的截取动作可以对字段名做下转义?
I wonder if the truncation action in the source code can escape the field names?

SeaTunnel Version

2.3.8

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  jdbc {
    # HANA
    url = "jdbc:sap://xxxxxx:30015"
    driver = "com.sap.db.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "xxxxx"
    password = "xxxxxx"
    use_select_count = true

    table_path = ""${p_database}"."${p_table}""
    where_condition = ${where_condition}
  }
}

sink {
  jdbc {
    # HANA 
    url = "jdbc:sap://xxxxxx:30115"
    driver = "com.sap.db.jdbc.Driver"
    user = "xxxxxx"
    password = "xxxxxx"

    
    schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
    data_save_mode = "CUSTOM_PROCESSING"
    custom_sql = "delete from ${table_full_name} "${where_condition}
    
    generate_sink_sql = true
    enable_upsert = false
    database = "${database_name}"
    table = "${table_name}"
  }
}

Running Command

source /etc/profile.d/seatunnel.sh
export where_condition=$where_condition
bash $SEATUNNEL_HOME/bin/seatunnel.sh -c /apps/seatunnel/config/test.conf -m local -i p_database=${db_name} -i p_table=${table_name}

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:213)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalArgumentException: Named parameters in SQL statement must not be empty.
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:253)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:66)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
        at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:693)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1018)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Named parameters in SQL statement must not be empty.
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:136)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:173)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.write(MultiTableSinkWriter.java:47)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:249)
        ... 17 more
Caused by: java.lang.IllegalArgumentException: Named parameters in SQL statement must not be empty.
        at org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.parseNamedStatement(FieldNamedPreparedStatement.java:682)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.prepareStatement(FieldNamedPreparedStatement.java:638)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder.lambda$createInsertOrUpdateExecutor$1(JdbcOutputFormatBuilder.java:235)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor.prepareStatements(InsertOrUpdateBatchStatementExecutor.java:72)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor.prepareStatements(BufferReducedBatchStatementExecutor.java:50)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.createAndOpenStatementExecutor(JdbcOutputFormat.java:80)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:74)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.tryOpen(JdbcSinkWriter.java:112)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.write(JdbcSinkWriter.java:123)
        at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.write(JdbcSinkWriter.java:43)
        at org.apache.seatunnel.api.sink.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:62)
        ... 6 more

        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:205)
        ... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@akulabs8
Copy link

@Hisoka-X I can contribute here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants