diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 0248594a1ce..09bac82faf0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -79,6 +79,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; @@ -87,7 +88,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; @@ -133,8 +133,6 @@ public class SchemaOperator extends AbstractStreamOperator private transient SchemaOperatorMetrics schemaOperatorMetrics; private transient int subTaskId; - private transient AtomicInteger schemaEvolutionVersionCode; - @VisibleForTesting public SchemaOperator(List routingRules) { this.routingRules = routingRules; @@ -184,7 +182,6 @@ public void open() throws Exception { new SchemaOperatorMetrics( getRuntimeContext().getMetricGroup(), schemaChangeBehavior); subTaskId = getRuntimeContext().getIndexOfThisSubtask(); - schemaEvolutionVersionCode = new AtomicInteger(); } @Override @@ -435,6 +432,11 @@ private TableId resolveReplacement( return TableId.parse(route.f1); } + @VisibleForTesting + protected int getCurrentTimestamp() { + return (int) Instant.now().getEpochSecond(); + } + private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { @@ -449,10 +451,7 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh long nonce = NonceUtils.generateNonce( - schemaEvolutionVersionCode.incrementAndGet(), - subTaskId, - tableId, - schemaChangeEvent); + getCurrentTimestamp(), subTaskId, tableId, schemaChangeEvent); LOG.info("{}> Sending the FlushEvent for table {} (nonce: {}).", subTaskId, tableId, nonce); output.collect(new StreamRecord<>(new FlushEvent(tableId, nonce))); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java index 194b62e1695..0d9e0b74643 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/NonceUtils.java @@ -17,36 +17,36 @@ package org.apache.flink.cdc.runtime.typeutils; +import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.TableId; -/** Generates schema evolution nonce value. */ -public class NonceUtils { - - /** Calculates a hashCode with Long type instead of Integer. */ - public static long longHash(Object... a) { - if (a == null) { - return 0; - } - - long result = 1; - - for (Object element : a) { - result = 31L * result + (element == null ? 0 : element.hashCode()); - } +import java.util.Objects; - return result; - } +/** + * Generates schema evolution nonce value and corresponding {@link FlushEvent}s. It is guaranteed to + * be unique by combining epoch timestamp, subTaskId, Table ID and schema change event into a long + * hashCode. + */ +@PublicEvolving +public class NonceUtils { + /** + * Generating a nonce value with current @{code timestamp}, {@code subTaskId}, {@code tableId}, + * and {@code schemaChangeEvent}. The higher 32 bits are current UTC timestamp in epoch seconds, + * and the lower 32 bits are Java hashCode of the rest parameters. + */ public static long generateNonce( - int versionCode, int subTaskId, TableId tableId, Event schemaChangeEvent) { - return longHash(versionCode, subTaskId, tableId, schemaChangeEvent); + int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) { + return (long) timestamp << 32 + | Integer.toUnsignedLong(Objects.hash(subTaskId, tableId, schemaChangeEvent)); } + /** Generating a {@link FlushEvent} carrying a nonce. */ public static FlushEvent generateFlushEvent( - int versionCode, int subTaskId, TableId tableId, Event schemaChangeEvent) { + int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) { return new FlushEvent( - tableId, generateNonce(versionCode, subTaskId, tableId, schemaChangeEvent)); + tableId, generateNonce(timestamp, subTaskId, tableId, schemaChangeEvent)); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index 586c5d0c996..d60b6d55a9d 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; @@ -69,6 +70,26 @@ public class SchemaEvolveTest { private static final TableId CUSTOMERS_TABLE_ID = TableId.tableId("my_company", "my_branch", "customers"); + /** + * A mocked schema operator that always yields predictable nonce with incrementing timestamp. + */ + public static class MockedSchemaOperator extends SchemaOperator { + + public MockedSchemaOperator( + List routingRules, + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior) { + super(routingRules, rpcTimeOut, schemaChangeBehavior); + } + + private int i = 0; + + @Override + protected int getCurrentTimestamp() { + return ++i; + } + } + /** Tests common evolve schema changes without exceptions. */ @Test public void testEvolveSchema() throws Exception { @@ -84,7 +105,7 @@ public void testEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -354,7 +375,7 @@ public void testTryEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -624,7 +645,7 @@ public void testExceptionEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -738,7 +759,7 @@ public void testIgnoreEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -1023,7 +1044,7 @@ public void testEvolveSchemaWithFailure() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>( schemaOperator, @@ -1107,7 +1128,7 @@ public void testTryEvolveSchemaWithFailure() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail @@ -1427,7 +1448,7 @@ public void testFineGrainedSchemaEvolves() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail @@ -1748,7 +1769,7 @@ public void testLenientSchemaEvolves() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open(); @@ -2134,7 +2155,7 @@ public void testLenientEvolveTweaks() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); EventOperatorTestHarness harness = new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); harness.open();