Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON script mysqlcdc 到 pg数据库 报错:类型无法转换 #1463

Closed
1 of 4 tasks
Iamgreat0 opened this issue Jan 5, 2023 · 2 comments · Fixed by #1464
Closed
1 of 4 tasks

JSON script mysqlcdc 到 pg数据库 报错:类型无法转换 #1463

Iamgreat0 opened this issue Jan 5, 2023 · 2 comments · Fixed by #1464
Labels
question Further information is requested

Comments

@Iamgreat0
Copy link

Search before asking

  • I had searched in the issues and found no similar question.

  • I had googled my question but i didn't get any help.

  • I had read the documentation: ChunJun doc but it didn't help me.

Description

JSON脚本:
{
"job": {
"content": [
{
"reader": {
"parameter": {
"host": "172.2.0.22",
"port": 30001,
"serverId": 1,
"databaseList": [
"dps"
],
"tableList": [
"dps.qixin_source"
],
"username": "root",
"password": "wayz@1234",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "gender",
"type": "int"
},
{
"name": "data",
"type": "string"
},
{
"name": "create_time",
"type": "datetime"
},
{
"name": "update_time",
"type": "datetime"
}
],
"writeMode": "update",
"updateKey": ["id"]
},
"table": {
"tableName": "qixin_source"
},
"name": "mysqlcdcreader"
},
"writer": {
"parameter": {
"username": "pdmap",
"password": "pdmap@123",
"connection": [
{
"jdbcUrl": "jdbc:postgresql://114.67.106.133:5433/test?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&failOverReadOnly=false",
"table": [
"qixin_sink"
],
"schema": "public"

						}
					],
					"column": [
                                                  {
                                                            "name": "id",
                                                            "type": "int8"
                                                    },
                                                    {
                                                            "name": "name",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "gender",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "data",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "create_time",
                                                            "type": "string"
                                                    },
                                                    {
                                                            "name": "update_time",
                                                            "type": "string"
                                                    }
                                                   ],
					"mode": "insert"
				},
				"table": {
					"tableName": "qixin_sink"
				},
				"name": "postgresqlwriter"
			},
			"transformer": {
				"transformSql": "select id, if(name IS NULL,'',name) as name, if(gender IS NULL,'',cast(gender as CHAR)) as gender, if(data IS NULL,'',data) as data, if(create_time IS NULL,'',cast(create_time as CHAR)) as create_time, if(update_time IS NULL,'',cast(update_time as CHAR)) as update_time from qixin_source"
			}
		}
	],
	"setting": {
		"errorLimit": {
			"record": 100
		},
		"speed": {
			"bytes": 0,
			"channel": 1,
			"readerChannel": 1,
			"writerChannel": 1
		}
	}
}

}

提交任务命令:sh bin/chunjun-local.sh -job json/qixin/mysql_pg_cdc.json

报错:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811)
at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:174)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:224)
at com.dtstack.chunjun.Main.main(Main.java:117)
at com.dtstack.chunjun.client.local.LocalClusterClientHelper.submit(LocalClusterClientHelper.java:35)
at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.connector.jdbc.converter.JdbcRowConverter cannot be cast to com.dtstack.chunjun.connector.postgresql.converter.PostgresqlColumnConverter
at com.dtstack.chunjun.connector.postgresql.sink.PostgresOutputFormat.openInternal(PostgresOutputFormat.java:92)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.open(BaseRichOutputFormat.java:262)
at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.open(DtOutputFormatSinkFunction.java:95)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:433)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)

这问题已经困扰了我一个星期了。谢谢,大佬们!!!

Code of Conduct

@Iamgreat0 Iamgreat0 added the question Further information is requested label Jan 5, 2023
FlechazoW added a commit to FlechazoW/chunjun that referenced this issue Jan 5, 2023
…rter' instance of the 'JdbcColumnConverter' should cast.
FlechazoW added a commit that referenced this issue Jan 5, 2023
…nstance of the 'JdbcColumnConverter' should cast. (#1464)
@Iamgreat0
Copy link
Author

这个问题是个bug暂时无法解决吗

ll076110 pushed a commit that referenced this issue Mar 3, 2023
…nstance of the 'JdbcColumnConverter' should cast. (#1464)

(cherry picked from commit 1d87b29)
@junekzhong
Copy link

junekzhong commented Jul 17, 2023

已经同步commit 1d87b291d9029d8bf9b971570034eb08e534ae46, 依然出现这个问题。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
2 participants