Skip to content

Commit

Permalink
fix: mimicing snowflake algorithm
Browse files Browse the repository at this point in the history
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>

# Conflicts:
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
  • Loading branch information
yuxiqian committed Nov 12, 2024
1 parent 6746523 commit f1ce16b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
Expand All @@ -84,7 +85,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
Expand Down Expand Up @@ -130,8 +130,6 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private transient SchemaOperatorMetrics schemaOperatorMetrics;
private transient int subTaskId;

private transient AtomicInteger schemaEvolutionVersionCode;

@VisibleForTesting
public SchemaOperator(List<RouteRule> routingRules) {
this.routingRules = routingRules;
Expand Down Expand Up @@ -181,7 +179,6 @@ public void open() throws Exception {
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
schemaEvolutionVersionCode = new AtomicInteger();
}

@Override
Expand Down Expand Up @@ -432,6 +429,11 @@ private TableId resolveReplacement(
return TableId.parse(route.f1);
}

@VisibleForTesting
protected int getCurrentTimestamp() {
return (int) Instant.now().getEpochSecond();
}

private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)
throws InterruptedException, TimeoutException {

Expand All @@ -446,10 +448,7 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh

long nonce =
NonceUtils.generateNonce(
schemaEvolutionVersionCode.incrementAndGet(),
subTaskId,
tableId,
schemaChangeEvent);
getCurrentTimestamp(), subTaskId, tableId, schemaChangeEvent);

LOG.info("{}> Sending the FlushEvent for table {} (nonce: {}).", subTaskId, tableId, nonce);
output.collect(new StreamRecord<>(new FlushEvent(tableId, nonce)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,36 @@

package org.apache.flink.cdc.runtime.typeutils;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.TableId;

/** Generates schema evolution nonce value. */
public class NonceUtils {

/** Calculates a hashCode with Long type instead of Integer. */
public static long longHash(Object... a) {
if (a == null) {
return 0;
}

long result = 1;

for (Object element : a) {
result = 31L * result + (element == null ? 0 : element.hashCode());
}
import java.util.Objects;

return result;
}
/**
* Generates schema evolution nonce value and corresponding {@link FlushEvent}s. It is guaranteed to
* be unique by combining epoch timestamp, subTaskId, Table ID and schema change event into a long
* hashCode.
*/
@PublicEvolving
public class NonceUtils {

/**
* Generating a nonce value with current @{code timestamp}, {@code subTaskId}, {@code tableId},
* and {@code schemaChangeEvent}. The higher 32 bits are current UTC timestamp in epoch seconds,
* and the lower 32 bits are Java hashCode of the rest parameters.
*/
public static long generateNonce(
int versionCode, int subTaskId, TableId tableId, Event schemaChangeEvent) {
return longHash(versionCode, subTaskId, tableId, schemaChangeEvent);
int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) {
return (long) timestamp << 32
| Integer.toUnsignedLong(Objects.hash(subTaskId, tableId, schemaChangeEvent));
}

/** Generating a {@link FlushEvent} carrying a nonce. */
public static FlushEvent generateFlushEvent(
int versionCode, int subTaskId, TableId tableId, Event schemaChangeEvent) {
int timestamp, int subTaskId, TableId tableId, Event schemaChangeEvent) {
return new FlushEvent(
tableId, generateNonce(versionCode, subTaskId, tableId, schemaChangeEvent));
tableId, generateNonce(timestamp, subTaskId, tableId, schemaChangeEvent));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
Expand Down Expand Up @@ -69,6 +70,26 @@ public class SchemaEvolveTest {
private static final TableId CUSTOMERS_TABLE_ID =
TableId.tableId("my_company", "my_branch", "customers");

/**
* A mocked schema operator that always yields predictable nonce with incrementing timestamp.
*/
public static class MockedSchemaOperator extends SchemaOperator {

public MockedSchemaOperator(
List<RouteRule> routingRules,
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior) {
super(routingRules, rpcTimeOut, schemaChangeBehavior);
}

private int i = 0;

@Override
protected int getCurrentTimestamp() {
return ++i;
}
}

/** Tests common evolve schema changes without exceptions. */
@Test
public void testEvolveSchema() throws Exception {
Expand All @@ -84,7 +105,7 @@ public void testEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
Expand Down Expand Up @@ -354,7 +375,7 @@ public void testTryEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
Expand Down Expand Up @@ -624,7 +645,7 @@ public void testExceptionEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
Expand Down Expand Up @@ -738,7 +759,7 @@ public void testIgnoreEvolveSchema() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
Expand Down Expand Up @@ -1023,7 +1044,7 @@ public void testEvolveSchemaWithFailure() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(
schemaOperator,
Expand Down Expand Up @@ -1107,7 +1128,7 @@ public void testTryEvolveSchemaWithFailure() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);

// All types of schema change events will be sent to the sink
// AddColumn and RenameColumn events will always fail
Expand Down Expand Up @@ -1427,7 +1448,7 @@ public void testFineGrainedSchemaEvolves() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);

// All types of schema change events will be sent to the sink
// AddColumn and RenameColumn events will always fail
Expand Down Expand Up @@ -1748,7 +1769,7 @@ public void testLenientSchemaEvolves() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
Expand Down Expand Up @@ -2134,7 +2155,7 @@ public void testLenientEvolveTweaks() throws Exception {
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;

SchemaOperator schemaOperator =
new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
new MockedSchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior);
EventOperatorTestHarness<SchemaOperator, Event> harness =
new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior);
harness.open();
Expand Down

0 comments on commit f1ce16b

Please sign in to comment.