Skip to content

Commit

Permalink
[FLINK-36891[[source-connector][mysql] Fix corrupted state in case of…
Browse files Browse the repository at this point in the history
… serialization failure in MySQL CDC Source

This closes #3794.
  • Loading branch information
morozov authored Dec 14, 2024
1 parent 0037c43 commit b50d172
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,32 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();

out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof StreamPendingSplitsState) {
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}
try {
out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof StreamPendingSplitsState) {
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;
out.clear();
return result;
return result;
} finally {
out.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.base.source.assigner.state;

import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;

import io.debezium.relational.TableId;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThrows;

/** Tests for {@link PendingSplitsStateSerializer}. */
public class PendingSplitsStateSerializerTest {

private final TableId tableId = TableId.parse("catalog.schema.table1");

@Test
public void testOutputIsFinallyCleared() throws Exception {
PendingSplitsStateSerializer serializer =
new PendingSplitsStateSerializer(constructSourceSplitSerializer());
StreamPendingSplitsState state = new StreamPendingSplitsState(true);

final byte[] ser1 = serializer.serialize(state);
state.serializedFormCache = null;

PendingSplitsState unsupportedState = new UnsupportedPendingSplitsState();

assertThrows(IOException.class, () -> serializer.serialize(unsupportedState));

final byte[] ser2 = serializer.serialize(state);
assertArrayEquals(ser1, ser2);
}

private SourceSplitSerializer constructSourceSplitSerializer() {
return new SourceSplitSerializer() {
@Override
public OffsetFactory getOffsetFactory() {
return new OffsetFactory() {
@Override
public Offset newOffset(Map<String, String> offset) {
return null;
}

@Override
public Offset newOffset(String filename, Long position) {
return null;
}

@Override
public Offset newOffset(Long position) {
return null;
}

@Override
public Offset createTimestampOffset(long timestampMillis) {
return null;
}

@Override
public Offset createInitialOffset() {
return null;
}

@Override
public Offset createNoStoppingOffset() {
return null;
}
};
}
};
}

/** An implementation for {@link PendingSplitsState} which will cause a serialization error. */
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,28 +72,32 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();

out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof BinlogPendingSplitsState) {
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
serializeBinlogPendingSplitsState((BinlogPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}
try {
out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof BinlogPendingSplitsState) {
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
serializeBinlogPendingSplitsState((BinlogPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;
out.clear();
return result;
return result;
} finally {
out.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,8 +43,10 @@
import java.util.Map;

import static org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit.generateSplitId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;

/**
* Tests for {@link
Expand Down Expand Up @@ -102,6 +105,22 @@ public void testRepeatedSerializationCache() throws Exception {
assertSame(ser1, ser3);
}

@Test
public void testOutputIsFinallyCleared() throws Exception {
final PendingSplitsStateSerializer serializer =
new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);

final byte[] ser1 = serializer.serialize(state);
state.serializedFormCache = null;

PendingSplitsState unsupportedState = new UnsupportedPendingSplitsState();

assertThrows(IOException.class, () -> serializer.serialize(unsupportedState));

final byte[] ser2 = serializer.serialize(state);
assertArrayEquals(ser1, ser2);
}

static PendingSplitsState serializeAndDeserializeSourceEnumState(PendingSplitsState state)
throws Exception {
final PendingSplitsStateSerializer serializer =
Expand Down Expand Up @@ -270,4 +289,7 @@ public TableEditor edit() {
throw new UnsupportedOperationException("Not implemented.");
}
}

/** An implementation for {@link PendingSplitsState} which will cause a serialization error. */
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
}

0 comments on commit b50d172

Please sign in to comment.