Skip to content

Commit

Permalink
FLINK-36891: MySQL CDC connector produces corrupted state in case of …
Browse files Browse the repository at this point in the history
…serialization failure
  • Loading branch information
morozov committed Dec 12, 2024
1 parent 9f8268c commit 4537562
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,32 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();

out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof StreamPendingSplitsState) {
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}
try {
out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof StreamPendingSplitsState) {
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;
out.clear();
return result;
return result;
} finally {
out.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.Map;

import static org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit.generateSplitId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThrows;

/** Tests for {@link PendingSplitsStateSerializer}. */
public class PendingSplitsStateSerializerTest {
Expand Down Expand Up @@ -154,6 +156,23 @@ public void testPendingSplitsStateSerializerCompatibilityVersion6() throws IOExc
Assert.assertEquals(expectedHybridPendingSplitsState, hybridPendingSplitsStateAfter);
}

@Test
public void testOutputIsFinallyCleared() throws Exception {
PendingSplitsStateSerializer serializer =
new PendingSplitsStateSerializer(constructSourceSplitSerializer());
StreamPendingSplitsState state = new StreamPendingSplitsState(true);

final byte[] ser1 = serializer.serialize(state);
state.serializedFormCache = null;

PendingSplitsState unsupportedState = new UnsupportedPendingSplitsState();

assertThrows(IOException.class, () -> serializer.serialize(unsupportedState));

final byte[] ser2 = serializer.serialize(state);
assertArrayEquals(ser1, ser2);
}

private SourceSplitSerializer constructSourceSplitSerializer() {
return new SourceSplitSerializer() {
@Override
Expand Down Expand Up @@ -294,4 +313,7 @@ private static Table createTable(TableId id) {
editor.setPrimaryKeyNames(Arrays.asList("id"));
return editor.create();
}

/** An implementation for {@link PendingSplitsState} which will cause a serialization error. */
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,28 +72,32 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();

out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof BinlogPendingSplitsState) {
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
serializeBinlogPendingSplitsState((BinlogPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}
try {
out.writeInt(splitSerializer.getVersion());
if (state instanceof SnapshotPendingSplitsState) {
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
} else if (state instanceof BinlogPendingSplitsState) {
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
serializeBinlogPendingSplitsState((BinlogPendingSplitsState) state, out);
} else if (state instanceof HybridPendingSplitsState) {
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
} else {
throw new IOException(
"Unsupported to serialize PendingSplitsState class: "
+ state.getClass().getName());
}

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;

final byte[] result = out.getCopyOfBuffer();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;
out.clear();
return result;
return result;
} finally {
out.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,8 +43,10 @@
import java.util.Map;

import static org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit.generateSplitId;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;

/**
* Tests for {@link
Expand Down Expand Up @@ -102,6 +105,26 @@ public void testRepeatedSerializationCache() throws Exception {
assertSame(ser1, ser3);
}

@Test
public void testOutputIsFinallyCleared() throws Exception {
final PendingSplitsStateSerializer serializer =
new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);

final byte[] ser1 = serializer.serialize(state);
state.serializedFormCache = null;

PendingSplitsState unsupportedState = new UnsupportedPendingSplitsState();

assertThrows(
IOException.class,
() -> {
serializer.serialize(unsupportedState);
});

final byte[] ser2 = serializer.serialize(state);
assertArrayEquals(ser1, ser2);
}

static PendingSplitsState serializeAndDeserializeSourceEnumState(PendingSplitsState state)
throws Exception {
final PendingSplitsStateSerializer serializer =
Expand Down Expand Up @@ -270,4 +293,7 @@ public TableEditor edit() {
throw new UnsupportedOperationException("Not implemented.");
}
}

/** An implementation for {@link PendingSplitsState} which will cause a serialization error. */
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
}

0 comments on commit 4537562

Please sign in to comment.