diff --git a/.github/workflows/flink_cdc.yml b/.github/workflows/flink_cdc.yml
index 2ccb59c8638..84d62776aba 100644
--- a/.github/workflows/flink_cdc.yml
+++ b/.github/workflows/flink_cdc.yml
@@ -73,12 +73,14 @@ jobs:
java-version: [ '8' ]
flink-version: ['1.19.1', '1.20.0']
module: [ 'pipeline_e2e' ]
+ parallelism: [ 1, 4 ]
name: Pipeline E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-version: ${{ matrix.java-version }}
flink-version: ${{ matrix.flink-version }}
module: ${{ matrix.module }}
+ parallelism: ${{ matrix.parallelism }}
source_e2e:
strategy:
matrix:
diff --git a/.github/workflows/flink_cdc_base.yml b/.github/workflows/flink_cdc_base.yml
index 707282fa177..ddb650ab6a0 100644
--- a/.github/workflows/flink_cdc_base.yml
+++ b/.github/workflows/flink_cdc_base.yml
@@ -31,6 +31,11 @@ on:
description: "Flink CDC module to test against."
required: true
type: string
+ parallelism:
+ description: "Flink parallelism."
+ required: false
+ type: number
+ default: 4
custom-maven-parameter:
description: "Custom maven parameter."
required: false
@@ -206,7 +211,7 @@ jobs:
build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"
- mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules verify
+ mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} verify
- name: Print JVM thread dumps when cancelled
if: ${{ failure() }}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java
index 3966ff46ffc..bba105c58ee 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java
@@ -178,4 +178,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ADD_COLUMN;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new AddColumnEvent(newTableId, addedColumns);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java
index 51acb43198a..2257f134d96 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AlterColumnTypeEvent.java
@@ -146,4 +146,9 @@ public boolean trimRedundantChanges() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_TYPE;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new AlterColumnTypeEvent(newTableId, typeMapping, oldTypeMapping);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/CreateTableEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/CreateTableEvent.java
index 6d3e547172d..36ef4bcc50c 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/CreateTableEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/CreateTableEvent.java
@@ -77,4 +77,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.CREATE_TABLE;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new CreateTableEvent(newTableId, schema);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java
index da4d454ddee..63cade18824 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java
@@ -167,6 +167,16 @@ public static DataChangeEvent projectAfter(
dataChangeEvent.meta);
}
+ /** Updates the {@link TableId} info of current data change event. */
+ public static DataChangeEvent route(DataChangeEvent dataChangeEvent, TableId tableId) {
+ return new DataChangeEvent(
+ tableId,
+ dataChangeEvent.before,
+ dataChangeEvent.after,
+ dataChangeEvent.op,
+ dataChangeEvent.meta);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropColumnEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropColumnEvent.java
index 76cabbde7b7..0ae5cf2f978 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropColumnEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropColumnEvent.java
@@ -81,4 +81,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_COLUMN;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new DropColumnEvent(newTableId, droppedColumnNames);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java
index dd5efdd0818..382d9d7b61c 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DropTableEvent.java
@@ -68,4 +68,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_TABLE;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new DropTableEvent(newTableId);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
index 798552e0499..787e09912ad 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java
@@ -17,6 +17,9 @@
package org.apache.flink.cdc.common.event;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.util.Objects;
/**
@@ -25,21 +28,35 @@
*/
public class FlushEvent implements Event {
- /** The schema changes from which table. */
- private final TableId tableId;
+ /**
+ * The schema changes from which table. If tableId is null, it means this {@code FlushEvent}
+ * should flush all pending events, no matter which table it belongs to.
+ */
+ private final @Nullable TableId tableId;
/**
- * Nonce code to distinguish flush events corresponding to each schema change event from
- * different subTasks.
+ * With the Schema Operator - Registry topology, a nonce code is required to distinguish flush
+ * events corresponding to each schema change event from different subTasks.
*/
private final long nonce;
- public FlushEvent(TableId tableId, long nonce) {
+ /** With the distributed topology, we don't need to track flush events for various tables. */
+ private static final FlushEvent FLUSH_ALL_EVENT = new FlushEvent(null, -1L);
+
+ protected FlushEvent(@Nullable TableId tableId, long nonce) {
this.tableId = tableId;
this.nonce = nonce;
}
- public TableId getTableId() {
+ public static FlushEvent ofAll() {
+ return FLUSH_ALL_EVENT;
+ }
+
+ public static FlushEvent of(@Nonnull TableId tableId, long nonce) {
+ return new FlushEvent(tableId, nonce);
+ }
+
+ public @Nullable TableId getTableId() {
return tableId;
}
@@ -66,6 +83,10 @@ public int hashCode() {
@Override
public String toString() {
- return "FlushEvent{" + "tableId=" + tableId + ", nonce=" + nonce + '}';
+ if (tableId == null) {
+ return "FlushEvent{ << not table-specific >> }";
+ } else {
+ return "FlushEvent{" + "tableId=" + tableId + ", nonce=" + nonce + '}';
+ }
}
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java
index 8bde3787207..f38d1fd04d9 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/RenameColumnEvent.java
@@ -78,4 +78,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.RENAME_COLUMN;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new RenameColumnEvent(newTableId, nameMapping);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEvent.java
index d5596e3b3de..fdb6041417c 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/SchemaChangeEvent.java
@@ -29,4 +29,7 @@
public interface SchemaChangeEvent extends ChangeEvent, Serializable {
/** Returns its {@link SchemaChangeEventType}. */
SchemaChangeEventType getType();
+
+ /** Creates a copy of {@link SchemaChangeEvent} with new {@link TableId}. */
+ SchemaChangeEvent copy(TableId newTableId);
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java
index 2144ff2837d..29ef6d6b8e3 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/TruncateTableEvent.java
@@ -68,4 +68,9 @@ public TableId tableId() {
public SchemaChangeEventType getType() {
return SchemaChangeEventType.TRUNCATE_TABLE;
}
+
+ @Override
+ public SchemaChangeEvent copy(TableId newTableId) {
+ return new TruncateTableEvent(newTableId);
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
index 4fbfb61b661..2974732fb1c 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java
@@ -24,6 +24,10 @@ public class RouteRule implements Serializable {
private static final long serialVersionUID = 1L;
+ public RouteRule(String sourceTable, String sinkTable) {
+ this(sourceTable, sinkTable, null);
+ }
+
public RouteRule(String sourceTable, String sinkTable, String replaceSymbol) {
this.sourceTable = sourceTable;
this.sinkTable = sinkTable;
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java
index 1027207a416..54da0a9ff5f 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.common.source;
+import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
/**
@@ -31,4 +32,18 @@ public interface DataSource {
/** Get the {@link MetadataAccessor} for accessing metadata from external systems. */
MetadataAccessor getMetadataAccessor();
+
+ /**
+ * Indicating if this source guarantees for each TableId, it will not be evolved differently
+ * among subTasks. If returns {@code false}, you'll get a regular operator topology that is
+ * compatible with single-incremented sources like MySQL. Returns {@code true} for sources that
+ * does not maintain a globally sequential schema change events stream, like MongoDB or Kafka.
+ *
+ * Note that new topology still an experimental feature. Return {@code false} by default to
+ * avoid unexpected behaviors.
+ */
+ @Experimental
+ default boolean canContainDistributedTables() {
+ return false;
+ }
}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
new file mode 100644
index 00000000000..c547eee7687
--- /dev/null
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeFamily;
+import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
+import org.apache.flink.shaded.guava31.com.google.common.io.BaseEncoding;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utils for merging {@link Schema}s and {@link DataType}s. Prefer using this over {@link
+ * SchemaUtils} to get consistent schema merging behaviors.
+ */
+@PublicEvolving
+public class SchemaMergingUtils {
+ /**
+ * Checking if given {@code upcomingSchema} could be fit into currently known {@code
+ * currentSchema}. Current schema could be null (as the cold opening state, and in this case it
+ * always returns {@code false}) but the upcoming schema should never be null.
+ */
+ public static boolean isSchemaCompatible(
+ @Nullable Schema currentSchema, Schema upcomingSchema) {
+ if (currentSchema == null) {
+ return false;
+ }
+ Map currentColumnTypes =
+ currentSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName, Column::getType));
+ List upcomingColumns = upcomingSchema.getColumns();
+
+ for (Column upcomingColumn : upcomingColumns) {
+ String columnName = upcomingColumn.getName();
+ DataType upcomingColumnType = upcomingColumn.getType();
+ DataType currentColumnType = currentColumnTypes.get(columnName);
+
+ if (!isDataTypeCompatible(currentColumnType, upcomingColumnType)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Try to merge {@code upcomingSchema} into {@code currentSchema} by performing lenient schema
+ * changes. Returns evolved schema and corresponding schema change event interpretations.
+ */
+ public static Schema getLeastCommonSchema(
+ @Nullable Schema currentSchema, Schema upcomingSchema) {
+ // No current schema record, we need to create it first.
+ if (currentSchema == null) {
+ return upcomingSchema;
+ }
+
+ // Current schema is compatible with upcoming ones, just return it and perform no schema
+ // evolution.
+ if (isSchemaCompatible(currentSchema, upcomingSchema)) {
+ return currentSchema;
+ }
+
+ Map newTypeMapping = new HashMap<>();
+
+ Map currentColumns =
+ currentSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName, col -> col));
+ List upcomingColumns = upcomingSchema.getColumns();
+
+ List appendedColumns = new ArrayList<>();
+
+ for (Column upcomingColumn : upcomingColumns) {
+ String columnName = upcomingColumn.getName();
+ DataType upcomingColumnType = upcomingColumn.getType();
+ if (currentColumns.containsKey(columnName)) {
+ Column currentColumn = currentColumns.get(columnName);
+ DataType currentColumnType = currentColumn.getType();
+ DataType leastCommonType =
+ getLeastCommonType(currentColumnType, upcomingColumnType);
+ if (!Objects.equals(leastCommonType, currentColumnType)) {
+ newTypeMapping.put(columnName, leastCommonType);
+ }
+ } else {
+ appendedColumns.add(upcomingColumn);
+ }
+ }
+
+ List commonColumns = new ArrayList<>();
+ for (Column column : currentSchema.getColumns()) {
+ if (newTypeMapping.containsKey(column.getName())) {
+ commonColumns.add(column.copy(newTypeMapping.get(column.getName())));
+ } else {
+ commonColumns.add(column);
+ }
+ }
+
+ commonColumns.addAll(appendedColumns);
+ return currentSchema.copy(commonColumns);
+ }
+
+ /** Merge compatible schemas. */
+ public static Schema getCommonSchema(List schemas) {
+ if (schemas.isEmpty()) {
+ return null;
+ } else if (schemas.size() == 1) {
+ return schemas.get(0);
+ } else {
+ Schema outputSchema = null;
+ for (Schema schema : schemas) {
+ outputSchema = getLeastCommonSchema(outputSchema, schema);
+ }
+ return outputSchema;
+ }
+ }
+
+ /**
+ * Generating what schema change events we need to do by converting compatible {@code
+ * beforeSchema} to {@code afterSchema}.
+ */
+ public static List getSchemaDifference(
+ TableId tableId, @Nullable Schema beforeSchema, Schema afterSchema) {
+ if (beforeSchema == null) {
+ return Collections.singletonList(new CreateTableEvent(tableId, afterSchema));
+ }
+
+ Map beforeColumns =
+ beforeSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName, col -> col));
+
+ Map oldTypeMapping = new HashMap<>();
+ Map newTypeMapping = new HashMap<>();
+ List appendedColumns = new ArrayList<>();
+
+ String afterWhichColumnPosition = null;
+ for (Column afterColumn : afterSchema.getColumns()) {
+ String columnName = afterColumn.getName();
+ DataType afterType = afterColumn.getType();
+ if (beforeColumns.containsKey(columnName)) {
+ DataType beforeType = beforeColumns.get(columnName).getType();
+ if (!Objects.equals(beforeType, afterType)) {
+ oldTypeMapping.put(columnName, beforeType);
+ newTypeMapping.put(columnName, afterType);
+ }
+ } else {
+ if (afterWhichColumnPosition == null) {
+ appendedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ afterColumn, AddColumnEvent.ColumnPosition.FIRST, null));
+ } else {
+ appendedColumns.add(
+ new AddColumnEvent.ColumnWithPosition(
+ afterColumn,
+ AddColumnEvent.ColumnPosition.AFTER,
+ afterWhichColumnPosition));
+ }
+ }
+ afterWhichColumnPosition = afterColumn.getName();
+ }
+
+ List schemaChangeEvents = new ArrayList<>();
+ if (!appendedColumns.isEmpty()) {
+ schemaChangeEvents.add(new AddColumnEvent(tableId, appendedColumns));
+ }
+
+ if (!newTypeMapping.isEmpty()) {
+ schemaChangeEvents.add(
+ new AlterColumnTypeEvent(tableId, newTypeMapping, oldTypeMapping));
+ }
+
+ return schemaChangeEvents;
+ }
+
+ /**
+ * Coercing {@code upcomingRow} with {@code upcomingTypes} schema into {@code currentTypes}
+ * schema. Invoking this method implicitly assumes that {@code isSchemaCompatible(currentSchema,
+ * upcomingSchema)} returns true. Otherwise, some upstream records might be lost.
+ */
+ public static Object[] coerceRow(
+ String timezone,
+ Schema currentSchema,
+ Schema upcomingSchema,
+ List