Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ runtime:
- flink-cdc-runtime/**/*
e2e-tests:
- flink-cdc-e2e-tests/**/*
migration-tests:
- flink-cdc-migration-tests/**/*
base:
- flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/**/*
debezium:
Expand Down
12 changes: 12 additions & 0 deletions .github/workflows/flink_cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ jobs:
- name: Run license check
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb

migration_test:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify

compile_and_test:
# Only run the CI pipeline for the flink-cdc-connectors repository
# if: github.repository == 'apache/flink-cdc-connectors'
Expand Down
62 changes: 62 additions & 0 deletions flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>

<artifactId>flink-cdc-migration-testcases</artifactId>
<name>flink-cdc-migration-testcases</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.0.0</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.0.1</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.1.0</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-snapshot</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/** Utilities for migration tests. */
public class MigrationTestBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrationTestBase.class);

/** Flink CDC versions since 3.0. */
public enum FlinkCdcVersion {
v3_0_0,
v3_0_1,
v3_1_0,
SNAPSHOT;

public String getShadedClassPrefix() {
switch (this) {
case v3_0_0:
return "com.ververica.cdc.v3_0_0";
case v3_0_1:
return "com.ververica.cdc.v3_0_1";
case v3_1_0:
return "org.apache.flink.cdc.v3_1_0";
case SNAPSHOT:
return "org.apache.flink.cdc.snapshot";
default:
throw new RuntimeException("Unknown Flink CDC version: " + this);
}
}
}

private static final List<FlinkCdcVersion> versions =
Arrays.asList(
FlinkCdcVersion.v3_0_0,
FlinkCdcVersion.v3_0_1,
FlinkCdcVersion.v3_1_0,
FlinkCdcVersion.SNAPSHOT);

public static List<FlinkCdcVersion> getAllVersions() {
return versions.subList(0, versions.size());
}

public static List<FlinkCdcVersion> getVersionSince(FlinkCdcVersion sinceVersion) {
return versions.subList(versions.indexOf(sinceVersion), versions.size());
}

public static List<FlinkCdcVersion> getAllVersionExcept(FlinkCdcVersion... excludedVersions) {
List<FlinkCdcVersion> excluded = Arrays.asList(excludedVersions);
return versions.stream().filter(e -> !excluded.contains(e)).collect(Collectors.toList());
}

public static FlinkCdcVersion getSnapshotVersion() {
return versions.get(versions.size() - 1);
}

private static Class<?> getMockClass(FlinkCdcVersion version, String caseName)
throws Exception {
return Class.forName(version.getShadedClassPrefix() + ".migration.tests." + caseName);
}

protected void testMigrationFromTo(
FlinkCdcVersion fromVersion, FlinkCdcVersion toVersion, String caseName)
throws Exception {

LOG.info("Testing {} compatibility case from {} -> {}", caseName, fromVersion, toVersion);

// Serialize dummy object to bytes in early versions
Class<?> fromVersionMockClass = getMockClass(fromVersion, caseName);
Object fromVersionMockObject = fromVersionMockClass.newInstance();

int serializerVersion =
(int)
fromVersionMockClass
.getDeclaredMethod("getSerializerVersion")
.invoke(fromVersionMockObject);
byte[] serializedObject =
(byte[])
fromVersionMockClass
.getDeclaredMethod("serializeObject")
.invoke(fromVersionMockObject);

// Deserialize object in latest versions
Class<?> toVersionMockClass = getMockClass(toVersion, caseName);
Object toVersionMockObject = toVersionMockClass.newInstance();

Assert.assertTrue(
(boolean)
toVersionMockClass
.getDeclaredMethod(
"deserializeAndCheckObject", int.class, byte[].class)
.invoke(toVersionMockObject, serializerVersion, serializedObject));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;

import org.junit.Test;

import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;

/** Migration test cases for {@link SchemaManager}. */
public class SchemaManagerMigrationTest extends MigrationTestBase {

public static String mockCaseName = "SchemaManagerMigrationMock";

@Test
public void testMigration() throws Exception {
// It is known that 3.1.0 that breaks backwards compatibility.
// No state compatibility is guaranteed.
for (FlinkCdcVersion version : getAllVersionExcept(v3_1_0)) {
testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;

import org.junit.Test;

import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;

/** Migration test cases for {@link SchemaRegistry}. */
public class SchemaRegistryMigrationTest extends MigrationTestBase {
public static String mockCaseName = "SchemaRegistryMigrationMock";

@Test
public void testMigration() throws Exception {
// It is known that 3.1.0 that breaks backwards compatibility.
// No state compatibility is guaranteed.
for (FlinkCdcVersion version : getAllVersionExcept(v3_1_0)) {
testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;

import org.junit.Test;

import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_0_0;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_0_1;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;

/** Migration test cases for {@link TableChangeInfo}. */
public class TableChangeInfoMigrationTest extends MigrationTestBase {

public static String mockCaseName = "TableChangeInfoMigrationMock";

@Test
public void testMigration() throws Exception {
// Transform feature does not present until 3.1.0, and
// CDC 3.1.0 breaks backwards compatibility.
for (FlinkCdcVersion version : getAllVersionExcept(v3_0_0, v3_0_1, v3_1_0)) {
testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
Loading