Skip to content

Commit

Permalink
[FLINK-36763][runtime] Support new "distributed" schema evolution top…
Browse files Browse the repository at this point in the history
…ology
  • Loading branch information
yuxiqian committed Dec 13, 2024
1 parent cf4c706 commit 57a89d4
Show file tree
Hide file tree
Showing 148 changed files with 8,937 additions and 5,326 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/flink_cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ jobs:
java-version: [ '8' ]
flink-version: ['1.19.1', '1.20.0']
module: [ 'pipeline_e2e' ]
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-version: ${{ matrix.java-version }}
flink-version: ${{ matrix.flink-version }}
module: ${{ matrix.module }}
parallelism: ${{ matrix.parallelism }}
source_e2e:
strategy:
matrix:
Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ on:
description: "Flink CDC module to test against."
required: true
type: string
parallelism:
description: "Flink parallelism."
required: false
type: number
default: 4
custom-maven-parameter:
description: "Custom maven parameter."
required: false
Expand Down Expand Up @@ -206,7 +211,7 @@ jobs:
build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"
mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules verify
mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} verify
- name: Print JVM thread dumps when cancelled
if: ${{ failure() }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ADD_COLUMN;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new AddColumnEvent(newTableId, addedColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,9 @@ public boolean trimRedundantChanges() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_TYPE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new AlterColumnTypeEvent(newTableId, typeMapping, oldTypeMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.CREATE_TABLE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new CreateTableEvent(newTableId, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ public static DataChangeEvent projectAfter(
dataChangeEvent.meta);
}

/** Updates the {@link TableId} info of current data change event. */
public static DataChangeEvent route(DataChangeEvent dataChangeEvent, TableId tableId) {
return new DataChangeEvent(
tableId,
dataChangeEvent.before,
dataChangeEvent.after,
dataChangeEvent.op,
dataChangeEvent.meta);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_COLUMN;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new DropColumnEvent(newTableId, droppedColumnNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_TABLE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new DropTableEvent(newTableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.flink.cdc.common.event;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Objects;

/**
Expand All @@ -25,21 +28,35 @@
*/
public class FlushEvent implements Event {

/** The schema changes from which table. */
private final TableId tableId;
/**
* The schema changes from which table. If tableId is null, it means this {@code FlushEvent}
* should flush all pending events, no matter which table it belongs to.
*/
private final @Nullable TableId tableId;

/**
* Nonce code to distinguish flush events corresponding to each schema change event from
* different subTasks.
* With the Schema Operator - Registry topology, a nonce code is required to distinguish flush
* events corresponding to each schema change event from different subTasks.
*/
private final long nonce;

public FlushEvent(TableId tableId, long nonce) {
/** With the distributed topology, we don't need to track flush events for various tables. */
private static final FlushEvent FLUSH_ALL_EVENT = new FlushEvent(null, -1L);

protected FlushEvent(@Nullable TableId tableId, long nonce) {
this.tableId = tableId;
this.nonce = nonce;
}

public TableId getTableId() {
public static FlushEvent ofAll() {
return FLUSH_ALL_EVENT;
}

public static FlushEvent of(@Nonnull TableId tableId, long nonce) {
return new FlushEvent(tableId, nonce);
}

public @Nullable TableId getTableId() {
return tableId;
}

Expand All @@ -66,6 +83,10 @@ public int hashCode() {

@Override
public String toString() {
return "FlushEvent{" + "tableId=" + tableId + ", nonce=" + nonce + '}';
if (tableId == null) {
return "FlushEvent{ << not table-specific >> }";
} else {
return "FlushEvent{" + "tableId=" + tableId + ", nonce=" + nonce + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.RENAME_COLUMN;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new RenameColumnEvent(newTableId, nameMapping);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@
public interface SchemaChangeEvent extends ChangeEvent, Serializable {
/** Returns its {@link SchemaChangeEventType}. */
SchemaChangeEventType getType();

/** Creates a copy of {@link SchemaChangeEvent} with new {@link TableId}. */
SchemaChangeEvent copy(TableId newTableId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.TRUNCATE_TABLE;
}

@Override
public SchemaChangeEvent copy(TableId newTableId) {
return new TruncateTableEvent(newTableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class RouteRule implements Serializable {

private static final long serialVersionUID = 1L;

public RouteRule(String sourceTable, String sinkTable) {
this(sourceTable, sinkTable, null);
}

public RouteRule(String sourceTable, String sinkTable, String replaceSymbol) {
this.sourceTable = sourceTable;
this.sinkTable = sinkTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.common.source;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.PublicEvolving;

/**
Expand All @@ -31,4 +32,18 @@ public interface DataSource {

/** Get the {@link MetadataAccessor} for accessing metadata from external systems. */
MetadataAccessor getMetadataAccessor();

/**
* Indicating if this source guarantees for each TableId, it will not be evolved differently
* among subTasks. If returns {@code false}, you'll get a regular operator topology that is
* compatible with single-incremented sources like MySQL. Returns {@code true} for sources that
* does not maintain a globally sequential schema change events stream, like MongoDB or Kafka.
* <br>
* Note that new topology still an experimental feature. Return {@code false} by default to
* avoid unexpected behaviors.
*/
@Experimental
default boolean canContainDistributedTables() {
return false;
}
}
Loading

0 comments on commit 57a89d4

Please sign in to comment.