Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can not start FlinkCDC 2.3 with startupOptions = specificOffset #1795

Closed
ldwnt opened this issue Dec 5, 2022 · 9 comments
Closed

Can not start FlinkCDC 2.3 with startupOptions = specificOffset #1795

ldwnt opened this issue Dec 5, 2022 · 9 comments
Labels
bug Something isn't working

Comments

@ldwnt
Copy link

ldwnt commented Dec 5, 2022

I just upgraded the FlinkCDC in my job from 2.1.1 to 2.3.0 to leverage the new feature "Scan Newly Added Tables" (the table was not in the tableList in the first place). But I ended up with the "Name is null" exception:

java.lang.NullPointerException: Name is null
	at java.lang.Enum.valueOf(Enum.java:236) ~[?:1.8.0_312]
	at com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind.valueOf(BinlogOffsetKind.java:26) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.getOffsetKind(BinlogOffset.java:136) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:73) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition(SerializerUtils.java:59) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserializeSplit(MySqlSplitSerializer.java:153) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:122) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer.deserialize(MySqlSplitSerializer.java:46) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3.0]
	at org.apache.flink.core.io.SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedSerialization.java:165) ~[flink-core-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.api.operators.util.SimpleVersionedListState$DeserializingIterator.next(SimpleVersionedListState.java:138) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_312]
	at org.apache.flink.util.CollectionUtil.iterableToList(CollectionUtil.java:95) ~[flink-core-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:251) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) ~[flink-streaming-java_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]

I could not find solutions to this issue here, so turned to another new feature: scan.startup.mode = specific-offset. This time I got exception "No TableMapEventData has been found for table id". Issues (#276, #411) were closed but still reported to exist, as what I observed here. Hoping I could get some help here.

Environment :

  • Flink version : 1.13.5
  • Flink CDC version: 2.3.0
  • Database and version: mysql 8.0.25

To Reproduce
Steps to reproduce the behavior:

  1. The test data :
  2. The test code :
            MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
                .hostname(datasource.getConnect().getHost())
                .port(Integer.parseInt(datasource.getConnect().getPort()))
                .username(datasource.getConnect().getUsername())
                .password(datasource.getConnect().getPassword())
                .databaseList(datasource.getDatabase())
                .tableList(tableRegex)
                .scanNewlyAddedTableEnabled(true)
                .startupOptions(StartupOptions.specificOffset("binlog.000017", 499573306))
                .includeSchemaChanges(true)
                .deserializer(new ChangJsonDeserializationSchema(true))
                .serverId(serverId)
                .debeziumProperties(properties);
  1. The error :
2022-12-05 16:02:06,396 INFO  [blc-mysql-sit.deepq.tech:3306] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1185] - Connected to MySQL binlog at mysql-sit.deepq.tech:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=binlog.000017, currentBinlogPosition=499573306, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], partition={server=mysql_binlog_source}, snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=binlog.000017, restartBinlogPosition=499573306, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
2022-12-05 16:02:06,399 INFO  [debezium-reader-0] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [917] - Waiting for keepalive thread to start
2022-12-05 16:02:06,399 INFO  [blc-mysql-sit.deepq.tech:3306] io.debezium.util.Threads [287] - Creating thread debezium-mysqlconnector-mysql_binlog_source-binlog-client
2022-12-05 16:02:06,400 INFO  [debezium-reader-0] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [924] - Keepalive thread is running
2022-12-05 16:02:06,405 ERROR [blc-mysql-sit.deepq.tech:3306] io.debezium.connector.mysql.MySqlStreamingChangeEventSource [1054] - Error during binlog processing. Last offset stored = null, binlog reader near position = binlog.000017/499573306
2022-12-05 16:02:06,430 ERROR [blc-mysql-sit.deepq.tech:3306] io.debezium.pipeline.ErrorHandler [31] - Producer failure
io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1670226722000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=499573356, flags=0}
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1154) ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1207) [debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) [mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1670226722000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=499573356, flags=0}
	at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230) ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	... 3 more
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:905. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
	at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:109) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserializeRows(UpdateRowsEventDataDeserializer.java:71) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:58) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.event.deserialization.UpdateRowsEventDataDeserializer.deserialize(UpdateRowsEventDataDeserializer.java:33) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230) ~[debezium-connector-mysql-1.6.4.Final.jar:1.6.4.Final]
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945) ~[mysql-binlog-connector-java-0.25.1.jar:0.25.1]
	... 3 more
@ldwnt ldwnt added the bug Something isn't working label Dec 5, 2022
@ruanhang1993
Copy link
Contributor

Thanks for reporting. It is duplicate to #1757. It has been fixed in #1758 in the branch master.

@ldwnt
Copy link
Author

ldwnt commented Dec 7, 2022

Thanks for reporting. It is duplicate to #1757. It has been fixed in #1758 in the branch master.

I built the cdc jar with source code @ commit c1a049e, and repeated the scenario as below:

  1. start a job A with cdc 2.1.1 (snapshot finished and binlog read)
  2. stop job A, update some records in the source mysql
  3. replace the jars and restart job A with cdc 2.3

The updated records were correctly read in job A, but the checkpoint can not be completed:

2022-12-07 18:44:56,860 INFO  [jobmanager-future-thread-1] org.apache.flink.runtime.source.coordinator.SourceCoordinator [303] - Restoring SplitEnumerator of source Source: @s -> @p -> (Sink: @ds, Sink: @ls) from checkpoint.
2022-12-07 18:44:56,901 ERROR [jobmanager-future-thread-1] org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [384] - Failed to reset the coordinator to checkpoint and start.
java.lang.IllegalStateException: Invalid status code 16777216,the valid code range is [0, 4]
	at com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.fromStatusCode(AssignerStatus.java:164) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
	at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:258) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
	at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:322) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
	at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:143) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
	at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:112) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
	at com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:50) ~[flink-connector-mysql-cdc-2.3.0.1.jar:2.3-SNAPSHOT]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:414) ~[flink-runtime_2.11-1.13.5.jar:?]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:309) ~[flink-runtime_2.11-1.13.5.jar:?]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:377) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:136) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) ~[?:1.8.0_312]
	at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731) ~[?:1.8.0_312]
	at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023) ~[?:1.8.0_312]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:131) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:273) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:1815) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1577) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-runtime_2.11-1.13.5.jar:1.13.5]
	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-core-1.13.5.jar:1.13.5]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) [?:1.8.0_312]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_312]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_312]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_312]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_312]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
...
2022-12-07 18:45:16,608 INFO  [Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoordinator [742] - Triggering checkpoint 4 (type=CHECKPOINT) @ 1670409916573 for job bce7153aa983463f434472bc58f25a0d.
2022-12-07 19:45:16,608 INFO  [Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoordinator [1985] - Checkpoint 4 of job bce7153aa983463f434472bc58f25a0d expired before completing.

@PatrickRen
Copy link
Contributor

Thanks for the issue @ldwnt ! I took a look and found the incompatibility was already introduced in 2.2 by c94791f. So unfortunately the state in 2.1 are not compatible with connector 2.2 and 2.3.

To be honest the state serializer deserves a refactor to be fully version managed.

@ldwnt
Copy link
Author

ldwnt commented Dec 8, 2022

Thanks for the issue @ldwnt ! I took a look and found the incompatibility was already introduced in 2.2 by c94791f. So unfortunately the state in 2.1 are not compatible with connector 2.2 and 2.3.

To be honest the state serializer deserves a refactor to be fully version managed.

Sad to hear that. We're considering upgrading cdc to 2.3 because in this version adding new tables and consuming from specific offset is supported, at least for mysql. However, if an upgrade is impossible, we have to replay the snapshot of dozens of datasources, which troubles the downstream stakeholders. Do we have a planned fix for this issue in 2.3?

@xuhaiL
Copy link

xuhaiL commented Mar 3, 2023

@ruanhang1993 @PatrickRen I also encountered this bug, do we have a planned fix for this issue in 2.3?

@caicancai
Copy link
Member

@ruanhang1993 I also encountered this bug,This seems to be a problem with the bottom layer relying on mysql-binglog-connector

@Cmelon9
Copy link

Cmelon9 commented Jun 6, 2023

@ruanhang1993 @PatrickRen flink cdc version 2.3 still have this bug.

@ldwnt
Copy link
Author

ldwnt commented Dec 26, 2023

@PatrickRen I give up the upgrade and start a new job with cdc 2.4. The problem is that the startup with specified binlog offset often leads to the MissingTableMapEventException error since the offset corresponds to a write_rows binlog record. So how are we supposed to use the startup mode "specific-offset"?

@PatrickRen
Copy link
Contributor

Closing this issue as it has been migrated to Apache Jira.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants