Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [SQL Transform] Cast bigint to timestamp error #5809

Closed
3 tasks done
EricJoy2048 opened this issue Nov 8, 2023 · 0 comments · Fixed by #5812
Closed
3 tasks done

[Bug] [SQL Transform] Cast bigint to timestamp error #5809

EricJoy2048 opened this issue Nov 8, 2023 · 0 comments · Fixed by #5812
Labels

Comments

@EricJoy2048
Copy link
Member

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

2023-11-08 14:39:35,068 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:192)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.example.engine.TransformV2Example.main(TransformV2Example.java:43)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.IndexOutOfBoundsException: Index: 3, Size: 2
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction.castAs(SystemFunction.java:118)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.executeCastExpr(ZetaSQLFunction.java:448)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.computeForValue(ZetaSQLFunction.java:230)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.project(ZetaSQLEngine.java:261)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.transformBySQL(ZetaSQLEngine.java:228)
at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:115)
at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.map(AbstractSeaTunnelTransform.java:40)
at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.map(AbstractSeaTunnelTransform.java:26)
at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:94)
at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:37)
at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:208)
at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:123)
at org.apache.seatunnel.connectors.seatunnel.fake.source.FakeDataGenerator.collectFakedRows(FakeDataGenerator.java:99)
at org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceReader.pollNext(FakeSourceReader.java:75)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:117)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:122)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:634)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:185)
... 2 more

SeaTunnel Version

all

SeaTunnel Config

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
  execution.parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  FakeSource {
    result_table_name = "fake"
    schema = {
      fields {
        id = "bigint"
        name = "string"
        c1 = "string"
        c2 = "timestamp"
        c3 = "string"
        c4 = "bigint"
      }
    }
    rows = [
      {fields = [1, "Joy Ding", "12.4", "2012-12-21T12:34:56", null, 100000000000], kind = INSERT}
    ]
  }
}

transform {
  Sql {
    source_table_name = "fake"
    result_table_name = "fake1"
    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(16,4)) as c4_2 from fake"
  }
}

sink {
  Console {
    source_table_name = "fake1"
  }
  Assert {
    source_table_name = "fake1"
    rules = {
      field_rules = [
        {
          field_name = "id"
          field_type = "string"
          field_value = [
            {equals_to = "1"}
          ]
        },
        {
          field_name = "id2"
          field_type = "int"
          field_value = [
            {equals_to = 1}
          ]
        },
        {
          field_name = "id3"
          field_type = "double"
          field_value = [
            {equals_to = 1}
          ]
        },
        {
          field_name = "c1_1"
          field_type = "double"
          field_value = [
            {equals_to = 12.4}
          ]
        },
        {
          field_name = "c1_2"
          field_type = "decimal(10,2)"
          field_value = [
            {equals_to = "12.40"}
          ]
        },
        {
          field_name = "c2_1"
          field_type = "date"
          field_value = [
            {equals_to = "2012-12-21"}
          ]
        },
        {
          field_name = "c3_1"
          field_type = "string"
          field_value = [
            {equals_to = "Unknown"}
          ]
        },
        {
          field_name = "c3_2"
          field_type = "string"
          field_value = [
            {equals_to = "Unknown"}
          ]
        },
        {
          field_name = "name1"
          field_type = "string"
          field_value = [
            {equals_to = "NULL"}
          ]
        },
        {
          field_name = "name2"
          field_type = "string"
          field_value = [
            {equals_to = "Joy Ding"}
          ]
        },
        {
          field_name = "c4_1"
          field_type = "timestamp"
          field_value = [
            {equals_to = 100000000000}
          ]
        },
        {
          field_name = "c4_2"
          field_type = "decimal(16,4)"
          field_value = [
            {equals_to = "100000000000.0000"}
          ]
        }
      ]
    }
  }
}

Running Command

*

Error Exception

*

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant