Skip to content

Commit

Permalink
[mysql] Fix backward compatibility on deserializing binlog offset fro…
Browse files Browse the repository at this point in the history
…m old versions (#1758)

This closes #1757
  • Loading branch information
PatrickRen committed Nov 17, 2022
1 parent 8132bc8 commit e654159
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;

import io.debezium.connector.mysql.GtidSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -94,7 +97,8 @@ public static BinlogOffset ofNonStopping() {
return builder().setOffsetKind(NON_STOPPING).build();
}

BinlogOffset(Map<String, String> offset) {
@VisibleForTesting
public BinlogOffset(Map<String, String> offset) {
this.offset = offset;
}

Expand Down Expand Up @@ -132,7 +136,11 @@ public Long getServerId() {
return longOffsetValue(offset, SERVER_ID_KEY);
}

@Nullable
public BinlogOffsetKind getOffsetKind() {
if (offset.get(OFFSET_KIND_KEY) == null) {
return null;
}
return BinlogOffsetKind.valueOf(offset.get(OFFSET_KIND_KEY));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.core.memory.DataOutputSerializer;

import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetSerializer;
import io.debezium.DebeziumException;
import io.debezium.util.HexConverter;
Expand Down Expand Up @@ -78,6 +79,11 @@ public static BinlogOffset readBinlogPosition(DataInputDeserializer in) throws I
if (StringUtils.isEmpty(offset.getFilename()) && offset.getPosition() == 0L) {
return BinlogOffset.ofEarliest();
}
// For other cases we treat it as a specific offset
return BinlogOffset.builder()
.setOffsetKind(BinlogOffsetKind.SPECIFIC)
.setOffsetMap(offset.getOffset())
.build();
}
return offset;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.source.utils;

import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;

import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

public class SerializerUtilsTest {

@Test
public void testBinlogOffsetSerde() throws Exception {
for (BinlogOffset offset : createBinlogOffsets()) {
byte[] serialized = serializeBinlogOffset(offset);
BinlogOffset deserialized = deserializeBinlogOffset(serialized);
assertEquals(offset, deserialized);
}
}

/**
* Test deserializing from old binlog offsets without {@link BinlogOffsetKind}, which validates
* the backward compatibility.
*/
@Test
public void testDeserializeFromBinlogOffsetWithoutKind() throws Exception {
// Create the INITIAL offset in earlier versions
Map<String, String> initialOffsetMap = new HashMap<>();
initialOffsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, "");
initialOffsetMap.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, "0");
BinlogOffset initialOffset = new BinlogOffset(initialOffsetMap);
BinlogOffset deserialized = deserializeBinlogOffset(serializeBinlogOffset(initialOffset));
assertEquals(BinlogOffset.ofEarliest(), deserialized);

// Create the NON_STOPPING offset in earlier versions
Map<String, String> nonStoppingOffsetMap = new HashMap<>();
nonStoppingOffsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, "");
nonStoppingOffsetMap.put(
BinlogOffset.BINLOG_POSITION_OFFSET_KEY, Long.toString(Long.MIN_VALUE));
BinlogOffset nonStoppingOffset = new BinlogOffset(nonStoppingOffsetMap);
deserialized = deserializeBinlogOffset(serializeBinlogOffset(nonStoppingOffset));
assertEquals(BinlogOffset.ofNonStopping(), deserialized);

// Create a specific offset in earlier versions
Map<String, String> specificOffsetMap = new HashMap<>();
specificOffsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, "mysql-bin.000001");
specificOffsetMap.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, "4");
specificOffsetMap.put(
BinlogOffset.GTID_SET_KEY, "24DA167-0C0C-11E8-8442-00059A3C7B00:1-19");
specificOffsetMap.put(BinlogOffset.TIMESTAMP_KEY, "1668690384");
specificOffsetMap.put(BinlogOffset.EVENTS_TO_SKIP_OFFSET_KEY, "15213");
specificOffsetMap.put(BinlogOffset.ROWS_TO_SKIP_OFFSET_KEY, "18613");
BinlogOffset specificOffset = new BinlogOffset(specificOffsetMap);
deserialized = deserializeBinlogOffset(serializeBinlogOffset(specificOffset));
assertEquals(
BinlogOffset.builder()
.setBinlogFilePosition("mysql-bin.000001", 4L)
.setGtidSet("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")
.setTimestampSec(1668690384L)
.setSkipEvents(15213L)
.setSkipRows(18613L)
.build(),
deserialized);
}

private List<BinlogOffset> createBinlogOffsets() {
return Arrays.asList(
// Specific offsets
BinlogOffset.ofBinlogFilePosition("foo-filename", 15213L),
BinlogOffset.ofGtidSet("foo-gtid"),
BinlogOffset.ofTimestampSec(15513L),

// Special offsets
BinlogOffset.ofNonStopping(),
BinlogOffset.ofEarliest(),
BinlogOffset.ofLatest(),

// Offsets with additional parameters
BinlogOffset.builder()
.setGtidSet("foo-gtid")
.setSkipEvents(18213L)
.setSkipRows(18613L)
.build());
}

private byte[] serializeBinlogOffset(BinlogOffset binlogOffset) throws IOException {
DataOutputSerializer dos = new DataOutputSerializer(64);
SerializerUtils.writeBinlogPosition(binlogOffset, dos);
return dos.getCopyOfBuffer();
}

private BinlogOffset deserializeBinlogOffset(byte[] serialized) throws IOException {
DataInputDeserializer did = new DataInputDeserializer(serialized);
return SerializerUtils.readBinlogPosition(4, did);
}
}

0 comments on commit e654159

Please sign in to comment.