diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index cad3b37fc1c..ccf6d879895 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -22,13 +22,14 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; -import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -47,7 +48,6 @@ import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.sink.schema.SchemaChangeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,20 +61,22 @@ import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX; /** Supports {@link DorisDataSink} to schema evolution. */ public class DorisMetadataApplier implements MetadataApplier { private static final Logger LOG = LoggerFactory.getLogger(DorisMetadataApplier.class); private DorisOptions dorisOptions; - private SchemaChangeManager schemaChangeManager; + private DorisSchemaChangeManager schemaChangeManager; private Configuration config; private Set enabledSchemaEvolutionTypes; public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) { this.dorisOptions = dorisOptions; - this.schemaChangeManager = new SchemaChangeManager(dorisOptions); + this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions); this.config = config; this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); } @@ -93,7 +95,13 @@ public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEven @Override public Set getSupportedSchemaEvolutionTypes() { - return Sets.newHashSet(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN); + return Sets.newHashSet( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE); } @Override @@ -117,14 +125,16 @@ public void applySchemaChange(SchemaChangeEvent event) { return null; }, dropTableEvent -> { - throw new UnsupportedSchemaChangeEventException(event); + applyDropTableEvent(dropTableEvent); + return null; }, renameColumnEvent -> { applyRenameColumnEvent(renameColumnEvent); return null; }, truncateTableEvent -> { - throw new UnsupportedSchemaChangeEventException(event); + applyTruncateTableEvent(truncateTableEvent); + return null; }); } @@ -275,4 +285,23 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) throw new SchemaEvolveException(event, "fail to apply alter column type event", e); } } + + private void applyTruncateTableEvent(TruncateTableEvent truncateTableEvent) + throws SchemaEvolveException { + TableId tableId = truncateTableEvent.tableId(); + try { + schemaChangeManager.truncateTable(tableId.getSchemaName(), tableId.getTableName()); + } catch (Exception e) { + throw new SchemaEvolveException(truncateTableEvent, "fail to truncate table", e); + } + } + + private void applyDropTableEvent(DropTableEvent dropTableEvent) throws SchemaEvolveException { + TableId tableId = dropTableEvent.tableId(); + try { + schemaChangeManager.dropTable(tableId.getSchemaName(), tableId.getTableName()); + } catch (Exception e) { + throw new SchemaEvolveException(dropTableEvent, "fail to drop table", e); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java new file mode 100644 index 00000000000..a4636f0457c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaChangeManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.doris.sink; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; + +import java.io.IOException; + +import static org.apache.doris.flink.catalog.doris.DorisSystem.identifier; + +/** An enriched version of Doris' {@link SchemaChangeManager}. */ +public class DorisSchemaChangeManager extends SchemaChangeManager { + public DorisSchemaChangeManager(DorisOptions dorisOptions) { + super(dorisOptions); + } + + public boolean truncateTable(String databaseName, String tableName) + throws IOException, IllegalArgumentException { + String truncateTableDDL = + "TRUNCATE TABLE " + identifier(databaseName) + "." + identifier(tableName); + return this.execute(truncateTableDDL, databaseName); + } + + public boolean dropTable(String databaseName, String tableName) + throws IOException, IllegalArgumentException { + String dropTableDDL = + "DROP TABLE " + identifier(databaseName) + "." + identifier(tableName); + return this.execute(dropTableDDL, databaseName); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 8aa0c542243..d71bfe05bb7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -20,18 +20,24 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; @@ -39,6 +45,7 @@ import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer; import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -47,10 +54,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.sql.SQLSyntaxErrorException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -64,6 +73,7 @@ import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE; import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME; +import static org.junit.Assert.fail; /** IT tests for {@link DorisMetadataApplier}. */ @RunWith(Parameterized.class) @@ -429,8 +439,90 @@ public void testDorisNarrowingAlterColumnType() throws Exception { runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); } + @Test + public void testDorisTruncateTable() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List preparationTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob"))); + runJobWithEvents(preparationTestingEvents); + waitAndVerify( + tableId, + 3, + Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"), + DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L); + + List truncateTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + new TruncateTableEvent(tableId), + DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")), + DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida"))); + runJobWithEvents(truncateTestingEvents); + waitAndVerify( + tableId, + 3, + Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"), + DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L); + } + + @Test + public void testDorisDropTable() throws Exception { + TableId tableId = + TableId.tableId( + DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List preparationTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob"))); + runJobWithEvents(preparationTestingEvents); + + waitAndVerify( + tableId, + 3, + Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"), + DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L); + + runJobWithEvents( + Arrays.asList(new CreateTableEvent(tableId, schema), new DropTableEvent(tableId))); + + SQLSyntaxErrorException thrown = + Assertions.assertThrows( + SQLSyntaxErrorException.class, () -> fetchTableContent(tableId, 3)); + Assertions.assertTrue( + thrown.getMessage() + .contains( + String.format( + "errCode = 2, detailMessage = Unknown table '%s'", + tableId.getTableName()))); + } + private void runJobWithEvents(List events) throws Exception { - DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + DataStream stream = + env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1); Configuration config = new Configuration() @@ -477,4 +569,38 @@ private void runJobWithEvents(List events) throws Exception { env.execute("Doris Schema Evolution Test"); } + + BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } + + private void waitAndVerify( + TableId tableId, int numberOfColumns, List expected, long timeoutMilliseconds) + throws Exception { + long timeout = System.currentTimeMillis() + timeoutMilliseconds; + while (System.currentTimeMillis() < timeout) { + List actual = fetchTableContent(tableId, numberOfColumns); + if (expected.stream() + .sorted() + .collect(Collectors.toList()) + .equals(actual.stream().sorted().collect(Collectors.toList()))) { + return; + } + LOG.info( + "Content of {} isn't ready.\nExpected: {}\nActual: {}", + tableId, + expected, + actual); + Thread.sleep(1000L); + } + fail(String.format("Failed to verify content of %s.", tableId)); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 928e7b6dfcb..0ac52637b54 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -21,10 +21,12 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; @@ -41,6 +43,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,14 +140,16 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) return null; }, dropTableEvent -> { - throw new UnsupportedSchemaChangeEventException(dropTableEvent); + applyDropTable(dropTableEvent); + return null; }, renameColumnEvent -> { applyRenameColumn(renameColumnEvent); return null; }, truncateTableEvent -> { - throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + applyTruncateTable(truncateTableEvent); + return null; }); } @@ -180,10 +185,7 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti .primaryKey(primaryKeys) .options(tableOptions) .options(schema.options()); - catalog.createTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - builder.build(), - true); + catalog.createTable(tableIdToIdentifier(event), builder.build(), true); } catch (Catalog.TableAlreadyExistException | Catalog.DatabaseNotExistException | Catalog.DatabaseAlreadyExistException e) { @@ -194,10 +196,7 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException { try { List tableChangeList = applyAddColumnEventWithPosition(event); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { @@ -286,10 +285,7 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException List tableChangeList = new ArrayList<>(); event.getDroppedColumnNames() .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { @@ -305,10 +301,7 @@ private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveExcep (oldName, newName) -> tableChangeList.add( SchemaChangeProvider.rename(oldName, newName))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { @@ -325,14 +318,38 @@ private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolv tableChangeList.add( SchemaChangeProvider.updateColumnType( oldName, newType))); - catalog.alterTable( - new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), - tableChangeList, - true); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { throw new SchemaEvolveException(event, e.getMessage(), e); } } + + private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException { + try { + Table table = catalog.getTable(tableIdToIdentifier(event)); + if (table.options().get("deletion-vectors.enabled").equals("true")) { + throw new UnsupportedSchemaChangeEventException( + event, "Unable to truncate a table with deletion vectors enabled.", null); + } + try (BatchTableCommit batchTableCommit = table.newBatchWriteBuilder().newCommit()) { + batchTableCommit.truncateTable(); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, "Failed to apply truncate table event", e); + } + } + + private void applyDropTable(DropTableEvent event) throws SchemaEvolveException { + try { + catalog.dropTable(tableIdToIdentifier(event), true); + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, "Failed to apply drop table event", e); + } + } + + private static Identifier tableIdToIdentifier(SchemaChangeEvent event) { + return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 34dddb93fe1..bc257d0db90 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -26,17 +26,22 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; @@ -51,11 +56,12 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.UserCodeClassLoader; +import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.sink.MultiTableCommittable; import org.apache.paimon.options.Options; -import org.junit.jupiter.api.Assertions; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -75,6 +81,8 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.types.DataTypes.STRING; + /** An ITCase for {@link PaimonWriter} and {@link PaimonCommitter}. */ public class PaimonSinkITCase { @@ -86,9 +94,8 @@ public class PaimonSinkITCase { private String warehouse; - private TableId table1; - - private BinaryRecordDataGenerator generator; + private final TableId table1 = TableId.tableId("test", "table1"); + private final TableId table2 = TableId.tableId("test", "table2"); private static int checkpointId = 1; @@ -115,7 +122,6 @@ private void initialize(String metastore) catalogOptions.setString("metastore", metastore); catalogOptions.setString("warehouse", warehouse); catalogOptions.setString("cache-enabled", "false"); - table1 = TableId.tableId("test", "table1"); if ("hive".equals(metastore)) { catalogOptions.setString("hadoop-conf-dir", HADOOP_CONF_DIR); catalogOptions.setString("hive-conf-dir", HIVE_CONF_DIR); @@ -145,8 +151,8 @@ private List createTestEvents(boolean enableDeleteVectors) throws SchemaE // create table Schema schema = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) - .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col1", STRING()) + .physicalColumn("col2", STRING()) .primaryKey("col1") .option("bucket", "1") .option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors)) @@ -156,27 +162,13 @@ private List createTestEvents(boolean enableDeleteVectors) throws SchemaE PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(createTableEvent); - generator = - new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING())); // insert - DataChangeEvent insertEvent1 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("1"), - BinaryStringData.fromString("1") - })); - testEvents.add(insertEvent1); - DataChangeEvent insertEvent2 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("2"), - BinaryStringData.fromString("2") - })); - testEvents.add(insertEvent2); + testEvents.add( + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); + testEvents.add( + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "2")))); return testEvents; } @@ -193,96 +185,44 @@ public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) Committer committer = paimonSink.createCommitter(); // insert - for (Event event : createTestEvents(enableDeleteVector)) { - writer.write(event, null); - } - writer.flush(false); - Collection> commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - List result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")), - result); + writeAndCommit( + writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")); // delete - Event event = - DataChangeEvent.deleteEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("1"), - BinaryStringData.fromString("1") - })); - writer.write(event, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "2")), result); + writeAndCommit( + writer, + committer, + generateDelete( + table1, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); + + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", "2")); // update - event = - DataChangeEvent.updateEvent( + writeAndCommit( + writer, + committer, + generateUpdate( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("2"), - BinaryStringData.fromString("2") - }), - generator.generate( - new Object[] { - BinaryStringData.fromString("2"), - BinaryStringData.fromString("x") - })); - writer.write(event, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Collections.singletonList(Row.ofKind(RowKind.INSERT, "2", "x")), result); + Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "2")), + Arrays.asList(Tuple2.of(STRING(), "2"), Tuple2.of(STRING(), "x")))); + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", "x")); - result = new ArrayList<>(); - tEnv.sqlQuery("select max_sequence_number from paimon_catalog.test.`table1$files`") - .execute() - .collect() - .forEachRemaining(result::add); - // Each commit will generate one sequence number(equal to checkpointId). - List expected = - enableDeleteVector - ? Arrays.asList( - Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L)) - : Arrays.asList( - Row.ofKind(RowKind.INSERT, 1L), - Row.ofKind(RowKind.INSERT, 2L), - Row.ofKind(RowKind.INSERT, 3L)); - Assertions.assertEquals(expected, result); + if (enableDeleteVector) { + Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName())) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 3L)); + } else { + Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName())) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, 1L), + Row.ofKind(RowKind.INSERT, 2L), + Row.ofKind(RowKind.INSERT, 3L)); + } } @ParameterizedTest @@ -292,142 +232,106 @@ public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVecto Catalog.DatabaseNotExistException, SchemaEvolveException { initialize(metastore); PaimonSink paimonSink = - new PaimonSink( + new PaimonSink<>( catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(); // 1. receive only DataChangeEvents during one checkpoint - for (Event event : createTestEvents(enableDeleteVector)) { - writer.write(event, null); - } - writer.flush(false); - Collection> commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - List result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")), - result); + writeAndCommit( + writer, committer, createTestEvents(enableDeleteVector).toArray(new Event[0])); + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")); // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint - DataChangeEvent insertEvent3 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("3"), - BinaryStringData.fromString("3") - })); - writer.write(insertEvent3, null); - writer.flush(false); + writeAndCommit( + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "3"), Tuple2.of(STRING(), "3")))); // add column AddColumnEvent.ColumnWithPosition columnWithPosition = - new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("col3", DataTypes.STRING())); + new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col3", STRING())); AddColumnEvent addColumnEvent = new AddColumnEvent(table1, Collections.singletonList(columnWithPosition)); PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(addColumnEvent); writer.write(addColumnEvent, null); - generator = - new BinaryRecordDataGenerator( - RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING())); - DataChangeEvent insertEvent4 = - DataChangeEvent.insertEvent( + + writeAndCommit( + writer, + committer, + generateInsert( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("4"), - BinaryStringData.fromString("4"), - BinaryStringData.fromString("4") - })); - writer.write(insertEvent4, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( + Arrays.asList( + Tuple2.of(STRING(), "4"), + Tuple2.of(STRING(), "4"), + Tuple2.of(STRING(), "4")))); + + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, "1", "1", null), Row.ofKind(RowKind.INSERT, "2", "2", null), Row.ofKind(RowKind.INSERT, "3", "3", null), - Row.ofKind(RowKind.INSERT, "4", "4", "4")), - result); + Row.ofKind(RowKind.INSERT, "4", "4", "4")); // 2. receive DataChangeEvents and SchemaChangeEvents during one checkpoint - DataChangeEvent insertEvent5 = - DataChangeEvent.insertEvent( + writeAndCommit( + writer, + committer, + generateInsert( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("5"), - BinaryStringData.fromString("5"), - BinaryStringData.fromString("5") - })); - writer.write(insertEvent5, null); - writer.flush(false); + Arrays.asList( + Tuple2.of(STRING(), "5"), + Tuple2.of(STRING(), "5"), + Tuple2.of(STRING(), "5")))); + // drop column DropColumnEvent dropColumnEvent = new DropColumnEvent(table1, Collections.singletonList("col2")); metadataApplier.applySchemaChange(dropColumnEvent); writer.write(dropColumnEvent, null); - generator = - new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING())); - DataChangeEvent insertEvent6 = - DataChangeEvent.insertEvent( - table1, - generator.generate( - new Object[] { - BinaryStringData.fromString("6"), - BinaryStringData.fromString("6") - })); - writer.write(insertEvent6, null); - writer.flush(false); - commitRequests = - writer.prepareCommit().stream() - .map(this::correctCheckpointId) - .map(MockCommitRequestImpl::new) - .collect(Collectors.toList()); - committer.commit(commitRequests); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") - .execute() - .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( + + writeAndCommit( + writer, + committer, + generateInsert( + table1, Arrays.asList(Tuple2.of(STRING(), "6"), Tuple2.of(STRING(), "6")))); + + List result = fetchResults(TableId.tableId("test", "`table1$files`")); + Set deduplicated = new HashSet<>(result); + Assertions.assertThat(result).hasSameSizeAs(deduplicated); + + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, "1", null), Row.ofKind(RowKind.INSERT, "2", null), Row.ofKind(RowKind.INSERT, "3", null), Row.ofKind(RowKind.INSERT, "4", "4"), Row.ofKind(RowKind.INSERT, "5", "5"), - Row.ofKind(RowKind.INSERT, "6", "6")), - result); - result = new ArrayList<>(); - tEnv.sqlQuery("select min_sequence_number from paimon_catalog.test.`table1$files`") - .execute() - .collect() - .forEachRemaining(result::add); - Set deduplicated = new HashSet<>(result); - Assertions.assertEquals(result.size(), deduplicated.size()); + Row.ofKind(RowKind.INSERT, "6", "6")); + + TruncateTableEvent truncateTableEvent = new TruncateTableEvent(table1); + if (enableDeleteVector) { + Assertions.assertThatThrownBy( + () -> metadataApplier.applySchemaChange(truncateTableEvent)) + .isExactlyInstanceOf(SchemaEvolveException.class) + .cause() + .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class) + .extracting("exceptionMessage") + .isEqualTo("Unable to truncate a table with deletion vectors enabled."); + } else { + metadataApplier.applySchemaChange(truncateTableEvent); + Assertions.assertThat(fetchResults(table1)).isEmpty(); + } + + DropTableEvent dropTableEvent = new DropTableEvent(table1); + metadataApplier.applySchemaChange(dropTableEvent); + Assertions.assertThatThrownBy(() -> fetchResults(table1)) + .hasRootCauseExactlyInstanceOf(SqlValidatorException.class) + .hasRootCauseMessage("Object 'table1' not found within 'paimon_catalog.test'"); } @ParameterizedTest @@ -443,11 +347,10 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector Committer committer = paimonSink.createCommitter(); List testEvents = createTestEvents(enableDeleteVector); // create table - TableId table2 = TableId.tableId("test", "table2"); Schema schema = Schema.newBuilder() - .physicalColumn("col1", DataTypes.STRING()) - .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col1", STRING()) + .physicalColumn("col2", STRING()) .primaryKey("col1") .option("bucket", "1") .build(); @@ -456,43 +359,89 @@ public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); metadataApplier.applySchemaChange(createTableEvent); // insert - DataChangeEvent insertEvent1 = - DataChangeEvent.insertEvent( - table2, - generator.generate( - new Object[] { - BinaryStringData.fromString("1"), - BinaryStringData.fromString("1") - })); - testEvents.add(insertEvent1); + testEvents.add( + generateInsert( + table2, Arrays.asList(Tuple2.of(STRING(), "1"), Tuple2.of(STRING(), "1")))); // insert - for (Event event : testEvents) { - writer.write(event, null); - } - writer.flush(false); + writeAndCommit(writer, committer, testEvents.toArray(new Event[0])); + + Assertions.assertThat(fetchResults(table1)) + .containsExactlyInAnyOrder( + Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")); + Assertions.assertThat(fetchResults(table2)) + .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1", "1")); + } + + private static void commit( + PaimonWriter writer, Committer committer) + throws IOException, InterruptedException { Collection> commitRequests = writer.prepareCommit().stream() - .map(this::correctCheckpointId) + .map(PaimonSinkITCase::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); - List result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table1") + } + + private static void writeAndCommit( + PaimonWriter writer, Committer committer, Event... events) + throws IOException, InterruptedException { + for (Event event : events) { + writer.write(event, null); + } + writer.flush(false); + commit(writer, committer); + } + + private List fetchResults(TableId tableId) { + List results = new ArrayList<>(); + tEnv.sqlQuery("select * from paimon_catalog." + tableId.toString()) .execute() .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2")), - result); - result = new ArrayList<>(); - tEnv.sqlQuery("select * from paimon_catalog.test.table2") + .forEachRemaining(results::add); + return results; + } + + private List fetchMaxSequenceNumber(String tableName) { + List results = new ArrayList<>(); + tEnv.sqlQuery( + "select max_sequence_number from paimon_catalog.test.`" + + tableName + + "$files`") .execute() .collect() - .forEachRemaining(result::add); - Assertions.assertEquals( - Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result); + .forEachRemaining(results::add); + return results; + } + + private BinaryRecordData generate(List> elements) { + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + RowType.of(elements.stream().map(e -> e.f0).toArray(DataType[]::new))); + return generator.generate( + elements.stream() + .map(e -> e.f1) + .map(o -> o instanceof String ? BinaryStringData.fromString((String) o) : o) + .toArray(Object[]::new)); + } + + private DataChangeEvent generateInsert( + TableId tableId, List> elements) { + return DataChangeEvent.insertEvent(tableId, generate(elements)); + } + + private DataChangeEvent generateUpdate( + TableId tableId, + List> beforeElements, + List> afterElements) { + return DataChangeEvent.updateEvent( + tableId, generate(beforeElements), generate(afterElements)); + } + + private DataChangeEvent generateDelete( + TableId tableId, List> elements) { + return DataChangeEvent.deleteEvent(tableId, generate(elements)); } @ParameterizedTest @@ -514,7 +463,7 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele writer.flush(false); Collection> commitRequests = writer.prepareCommit().stream() - .map(this::correctCheckpointId) + .map(PaimonSinkITCase::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -529,25 +478,24 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele // CommitterOperator will try to re-commit recovered transactions. committer.commit(commitRequests); List events = - Arrays.asList( - DataChangeEvent.insertEvent( + Collections.singletonList( + generateInsert( table1, - generator.generate( - new Object[] { - BinaryStringData.fromString(Integer.toString(i)), - BinaryStringData.fromString(Integer.toString(i)) - }))); - Assertions.assertDoesNotThrow( - () -> { - for (Event event : events) { - writer.write(event, null); - } - }); + Arrays.asList( + Tuple2.of(STRING(), String.valueOf(i)), + Tuple2.of(STRING(), String.valueOf(i))))); + Assertions.assertThatCode( + () -> { + for (Event event : events) { + writer.write(event, null); + } + }) + .doesNotThrowAnyException(); writer.flush(false); // Checkpoint id start from 1 committer.commit( writer.prepareCommit().stream() - .map(this::correctCheckpointId) + .map(PaimonSinkITCase::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList())); } @@ -559,10 +507,10 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele .forEachRemaining(result::add); if (enableDeleteVector) { // Each APPEND will trigger COMPACT once enable deletion-vectors. - Assertions.assertEquals(16, result.size()); + Assertions.assertThat(result).hasSize(16); } else { // 8 APPEND and 1 COMPACT - Assertions.assertEquals(9, result.size()); + Assertions.assertThat(result).hasSize(9); } result.clear(); @@ -570,8 +518,8 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele .execute() .collect() .forEachRemaining(result::add); - Assertions.assertEquals( - Arrays.asList( + Assertions.assertThat(result) + .containsExactlyInAnyOrder( Row.ofKind(RowKind.INSERT, "1", "1"), Row.ofKind(RowKind.INSERT, "2", "2"), Row.ofKind(RowKind.INSERT, "3", "3"), @@ -579,11 +527,10 @@ public void testDuplicateCommitAfterRestore(String metastore, boolean enableDele Row.ofKind(RowKind.INSERT, "5", "5"), Row.ofKind(RowKind.INSERT, "6", "6"), Row.ofKind(RowKind.INSERT, "7", "7"), - Row.ofKind(RowKind.INSERT, "8", "8")), - result); + Row.ofKind(RowKind.INSERT, "8", "8")); } - private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { + private static MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { // update the right checkpointId for MultiTableCommittable return new MultiTableCommittable( committable.getDatabase(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java index 9811a010dc2..24bd943556e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksDataSink.java @@ -24,7 +24,6 @@ import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.MetadataApplier; -import com.starrocks.connector.flink.catalog.StarRocksCatalog; import com.starrocks.connector.flink.table.sink.SinkFunctionFactory; import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; @@ -72,8 +71,8 @@ public EventSinkProvider getEventSinkProvider() { @Override public MetadataApplier getMetadataApplier() { - StarRocksCatalog catalog = - new StarRocksCatalog( + StarRocksEnrichedCatalog catalog = + new StarRocksEnrichedCatalog( sinkOptions.getJdbcUrl(), sinkOptions.getUsername(), sinkOptions.getPassword()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java new file mode 100644 index 00000000000..24f1f444cf2 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.starrocks.sink; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.starrocks.connector.flink.catalog.StarRocksCatalog; +import com.starrocks.connector.flink.catalog.StarRocksCatalogException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */ +public class StarRocksEnrichedCatalog extends StarRocksCatalog { + public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password) { + super(jdbcUrl, username, password); + } + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksEnrichedCatalog.class); + + public void truncateTable(String databaseName, String tableName) + throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "Table name cannot be null or empty."); + String alterSql = this.buildTruncateTableSql(databaseName, tableName); + try { + // TRUNCATE TABLE is not regarded as a column-based schema change for StarRocks, so + // there's no need to check the evolution state. + executeUpdateStatement(alterSql); + } catch (Exception e) { + LOG.error( + "Failed to truncate table `{}`.`{}`. SQL executed: {}", + databaseName, + tableName, + alterSql); + throw new StarRocksCatalogException( + String.format("Failed to truncate table `%s`.`%s`.", databaseName, tableName), + e); + } + } + + public void dropTable(String databaseName, String tableName) throws StarRocksCatalogException { + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tableName), + "Table name cannot be null or empty."); + String alterSql = this.buildDropTableSql(databaseName, tableName); + try { + // like TRUNCATE TABLE, DROP TABLE isn't a column-affecting operation and `executeAlter` + // method isn't appropriate. + executeUpdateStatement(alterSql); + } catch (Exception e) { + LOG.error( + "Failed to drop table `{}`.`{}`. SQL executed: {}", + databaseName, + tableName, + alterSql); + throw new StarRocksCatalogException( + String.format("Failed to drop table `%s`.`%s`.", databaseName, tableName), e); + } + } + + private String buildTruncateTableSql(String databaseName, String tableName) { + return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName); + } + + private String buildDropTableSql(String databaseName, String tableName) { + return String.format("DROP TABLE `%s`.`%s`;", databaseName, tableName); + } + + private void executeUpdateStatement(String sql) throws StarRocksCatalogException { + try { + Method m = + getClass() + .getSuperclass() + .getDeclaredMethod("executeUpdateStatement", String.class); + m.setAccessible(true); + m.invoke(this, sql); + } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index 25bb2656c68..4204dbf9c9e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -21,10 +21,12 @@ import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; @@ -33,7 +35,6 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; -import com.starrocks.connector.flink.catalog.StarRocksCatalog; import com.starrocks.connector.flink.catalog.StarRocksCatalogException; import com.starrocks.connector.flink.catalog.StarRocksColumn; import com.starrocks.connector.flink.catalog.StarRocksTable; @@ -53,14 +54,14 @@ public class StarRocksMetadataApplier implements MetadataApplier { private static final Logger LOG = LoggerFactory.getLogger(StarRocksMetadataApplier.class); - private final StarRocksCatalog catalog; + private final StarRocksEnrichedCatalog catalog; private final TableCreateConfig tableCreateConfig; private final SchemaChangeConfig schemaChangeConfig; private boolean isOpened; private Set enabledSchemaEvolutionTypes; public StarRocksMetadataApplier( - StarRocksCatalog catalog, + StarRocksEnrichedCatalog catalog, TableCreateConfig tableCreateConfig, SchemaChangeConfig schemaChangeConfig) { this.catalog = catalog; @@ -87,7 +88,9 @@ public Set getSupportedSchemaEvolutionTypes() { return Sets.newHashSet( SchemaChangeEventType.CREATE_TABLE, SchemaChangeEventType.ADD_COLUMN, - SchemaChangeEventType.DROP_COLUMN); + SchemaChangeEventType.DROP_COLUMN, + SchemaChangeEventType.DROP_TABLE, + SchemaChangeEventType.TRUNCATE_TABLE); } @Override @@ -117,14 +120,16 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) return null; }, dropTableEvent -> { - throw new UnsupportedSchemaChangeEventException(dropTableEvent); + applyDropTable(dropTableEvent); + return null; }, renameColumnEvent -> { applyRenameColumn(renameColumnEvent); return null; }, truncateTableEvent -> { - throw new UnsupportedSchemaChangeEventException(truncateTableEvent); + applyTruncateTable(truncateTableEvent); + return null; }); } @@ -315,4 +320,24 @@ private void applyAlterColumnType(AlterColumnTypeEvent alterColumnTypeEvent) // the alter after a discussion. throw new UnsupportedSchemaChangeEventException(alterColumnTypeEvent); } + + private void applyTruncateTable(TruncateTableEvent truncateTableEvent) { + try { + catalog.truncateTable( + truncateTableEvent.tableId().getSchemaName(), + truncateTableEvent.tableId().getTableName()); + } catch (StarRocksCatalogException e) { + throw new SchemaEvolveException(truncateTableEvent, e.getMessage(), e); + } + } + + private void applyDropTable(DropTableEvent dropTableEvent) { + try { + catalog.dropTable( + dropTableEvent.tableId().getSchemaName(), + dropTableEvent.tableId().getTableName()); + } catch (StarRocksCatalogException e) { + throw new SchemaEvolveException(dropTableEvent, e.getMessage(), e); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java index 643a8b31a86..c12be85a735 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/MockStarRocksCatalog.java @@ -29,7 +29,7 @@ import java.util.Optional; /** Mock {@link StarRocksCatalog} for testing. */ -public class MockStarRocksCatalog extends StarRocksCatalog { +public class MockStarRocksCatalog extends StarRocksEnrichedCatalog { /** database name -> table name -> table. */ private final Map> tables; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index 338af86d50b..8afd11fe7d0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -20,18 +20,24 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.DataSink; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator; @@ -39,11 +45,14 @@ import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator; import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksContainer; import org.apache.flink.cdc.connectors.starrocks.sink.utils.StarRocksSinkTestBase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -350,8 +359,70 @@ public void testStarRocksNarrowingAlterColumnType() throws Exception { runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId)); } + @Test + public void testStarRocksTruncateTable() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List truncateTableTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")), + new TruncateTableEvent(tableId), + DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")), + DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida"))); + runJobWithEvents(truncateTableTestingEvents); + + assertEqualsInAnyOrder( + Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"), + fetchTableContent(tableId, 3)); + } + + @Test + public void testStarRocksDropTable() throws Exception { + TableId tableId = + TableId.tableId( + StarRocksContainer.STARROCKS_DATABASE_NAME, + StarRocksContainer.STARROCKS_TABLE_NAME); + + Schema schema = + Schema.newBuilder() + .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null)) + .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null)) + .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null)) + .primaryKey("id") + .build(); + + List dropTableTestingEvents = + Arrays.asList( + new CreateTableEvent(tableId, schema), + DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")), + DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")), + new DropTableEvent(tableId)); + runJobWithEvents(dropTableTestingEvents); + + Assert.assertThrows( + String.format( + "Getting analyzing error. Detail message: Unknown table '%s.%s'.", + tableId.getSchemaName(), tableId.getTableName()), + MySQLSyntaxErrorException.class, + () -> fetchTableContent(tableId, 3)); + } + private void runJobWithEvents(List events) throws Exception { - DataStream stream = env.fromCollection(events, TypeInformation.of(Event.class)); + DataStream stream = + env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1); Configuration config = new Configuration() @@ -392,4 +463,16 @@ private void runJobWithEvents(List events) throws Exception { env.execute("StarRocks Schema Evolution Test"); } + + BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java index 508eebdb704..a83c6484a23 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.lifecycle.Startables; @@ -43,9 +42,9 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; import java.sql.Statement; import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,9 +52,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** End-to-end tests for mysql cdc to Doris pipeline job. */ @RunWith(Parameterized.class) @@ -68,8 +67,8 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { protected static final String MYSQL_TEST_USER = "mysqluser"; protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; - public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; - public static final int TESTCASE_TIMEOUT_SECONDS = 60; + public static final Duration DEFAULT_STARTUP_TIMEOUT = Duration.ofSeconds(240); + public static final Duration DEFAULT_RESULT_VERIFY_TIMEOUT = Duration.ofSeconds(30); @ClassRule public static final MySqlContainer MYSQL = @@ -82,14 +81,11 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { .withUsername("flinkuser") .withPassword("flinkpw") .withNetwork(NETWORK) - .withNetworkAliases("mysql") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + .withNetworkAliases("mysql"); @ClassRule public static final DorisContainer DORIS = - new DorisContainer(NETWORK) - .withNetworkAliases("doris") - .withLogConsumer(new Slf4jLogConsumer(LOG)); + new DorisContainer(NETWORK).withNetworkAliases("doris"); protected final UniqueDatabase mysqlInventoryDatabase = new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); @@ -108,14 +104,13 @@ public static void initializeContainers() { new LogMessageWaitStrategy() .withRegEx(".*get heartbeat from FE.*") .withTimes(1) - .withStartupTimeout( - Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, ChronoUnit.SECONDS)) + .withStartupTimeout(DEFAULT_STARTUP_TIMEOUT) .waitUntilReady(DORIS); while (!checkBackendAvailability()) { try { if (System.currentTimeMillis() - startWaitingTimestamp - > DEFAULT_STARTUP_TIMEOUT_SECONDS * 1000) { + > DEFAULT_STARTUP_TIMEOUT.toMillis()) { throw new RuntimeException("Doris backend startup timed out."); } LOG.info("Waiting for backends to be available"); @@ -173,6 +168,7 @@ public void after() { @Test public void testSyncWholeDatabase() throws Exception { + String databaseName = mysqlInventoryDatabase.getDatabaseName(); String pipelineJob = String.format( "source:\n" @@ -197,7 +193,7 @@ public void testSyncWholeDatabase() throws Exception { + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - mysqlInventoryDatabase.getDatabaseName(), + databaseName, DORIS.getUsername(), DORIS.getPassword(), parallelism); @@ -208,8 +204,19 @@ public void testSyncWholeDatabase() throws Exception { waitUntilJobRunning(Duration.ofSeconds(30)); LOG.info("Pipeline job is running"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null")); validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -223,8 +230,16 @@ public void testSyncWholeDatabase() throws Exception { "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + validateSinkSchema( + databaseName, + "customers", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "address | VARCHAR(3072) | Yes | false | null", + "phone_number | VARCHAR(1536) | Yes | false | null")); validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "customers", 4, Arrays.asList( @@ -238,9 +253,7 @@ public void testSyncWholeDatabase() throws Exception { String mysqlJdbcUrl = String.format( "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - mysqlInventoryDatabase.getDatabaseName()); + MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName); try (Connection conn = DriverManager.getConnection( mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); @@ -250,7 +263,7 @@ public void testSyncWholeDatabase() throws Exception { "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), + databaseName, "products", 7, Arrays.asList( @@ -267,39 +280,65 @@ public void testSyncWholeDatabase() throws Exception { stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.1 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); // modify table schema stat.execute("ALTER TABLE products DROP COLUMN point_c;"); - stat.execute("DELETE FROM products WHERE id=101;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null")); + stat.execute("DELETE FROM products WHERE id=101;"); stat.execute( "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111 stat.execute( "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112 + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null", + "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.1 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", + "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null", + "112 | finally | null | 2.14 | null | null | null")); } catch (SQLException e) { LOG.error("Update table for CDC failed.", e); throw e; } - validateSinkResult( - mysqlInventoryDatabase.getDatabaseName(), - "products", - 7, - Arrays.asList( - "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | null", - "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | null", - "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | null", - "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | null", - "106 | hammer | 18oz carpenter hammer | 1.0 | null | null | null", - "107 | rocks | box of assorted rocks | 5.1 | null | null | null", - "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", - "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", - "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", - "111 | scooter | Big 2-wheel scooter | 5.18 | null | null | null", - "112 | finally | null | 2.14 | null | null | null")); } @Test public void testComplexDataTypes() throws Exception { + String databaseName = complexDataTypesDatabase.getDatabaseName(); String pipelineJob = String.format( "source:\n" @@ -328,32 +367,87 @@ public void testComplexDataTypes() throws Exception { + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - complexDataTypesDatabase.getDatabaseName(), + databaseName, DORIS.getUsername(), DORIS.getPassword(), - complexDataTypesDatabase.getDatabaseName(), + databaseName, parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); waitUntilJobRunning(Duration.ofSeconds(30)); - LOG.info("Pipeline job is running"); + + LOG.info("Verifying snapshot stage of DATA_TYPES_TABLE..."); + validateSinkSchema( + databaseName, + "DATA_TYPES_TABLE", + Arrays.asList( + "id | INT | Yes | true | null", + "tiny_c | TINYINT | Yes | false | null", + "tiny_un_c | SMALLINT | Yes | false | null", + "tiny_un_z_c | SMALLINT | Yes | false | null", + "small_c | SMALLINT | Yes | false | null", + "small_un_c | INT | Yes | false | null", + "small_un_z_c | INT | Yes | false | null", + "medium_c | INT | Yes | false | null", + "medium_un_c | INT | Yes | false | null", + "medium_un_z_c | INT | Yes | false | null", + "int_c | INT | Yes | false | null", + "int_un_c | BIGINT | Yes | false | null", + "int_un_z_c | BIGINT | Yes | false | null", + "int11_c | INT | Yes | false | null", + "big_c | BIGINT | Yes | false | null", + "varchar_c | VARCHAR(765) | Yes | false | null", + "char_c | CHAR(9) | Yes | false | null", + "real_c | DOUBLE | Yes | false | null", + "float_c | FLOAT | Yes | false | null", + "float_un_c | FLOAT | Yes | false | null", + "float_un_z_c | FLOAT | Yes | false | null", + "double_c | DOUBLE | Yes | false | null", + "double_un_c | DOUBLE | Yes | false | null", + "double_un_z_c | DOUBLE | Yes | false | null", + "decimal_c | DECIMAL(8, 4) | Yes | false | null", + "decimal_un_c | DECIMAL(8, 4) | Yes | false | null", + "decimal_un_z_c | DECIMAL(8, 4) | Yes | false | null", + "numeric_c | DECIMAL(6, 0) | Yes | false | null", + "big_decimal_c | TEXT | Yes | false | null", + "bit1_c | BOOLEAN | Yes | false | null", + "tiny1_c | BOOLEAN | Yes | false | null", + "boolean_c | BOOLEAN | Yes | false | null", + "date_c | DATE | Yes | false | null", + "datetime3_c | DATETIME(3) | Yes | false | null", + "datetime6_c | DATETIME(6) | Yes | false | null", + "timestamp_c | DATETIME | Yes | false | null", + "text_c | TEXT | Yes | false | null", + "tiny_blob_c | TEXT | Yes | false | null", + "blob_c | TEXT | Yes | false | null", + "medium_blob_c | TEXT | Yes | false | null", + "long_blob_c | TEXT | Yes | false | null", + "year_c | INT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "point_c | TEXT | Yes | false | null", + "geometry_c | TEXT | Yes | false | null", + "linestring_c | TEXT | Yes | false | null", + "polygon_c | TEXT | Yes | false | null", + "multipoint_c | TEXT | Yes | false | null", + "multiline_c | TEXT | Yes | false | null", + "multipolygon_c | TEXT | Yes | false | null", + "geometrycollection_c | TEXT | Yes | false | null", + "FINE | TEXT | Yes | false | null")); validateSinkResult( - complexDataTypesDatabase.getDatabaseName(), + databaseName, "DATA_TYPES_TABLE", 52, Collections.singletonList( "1 | 127 | 255 | 255 | 32767 | 65535 | 65535 | 8388607 | 16777215 | 16777215 | 2147483647 | 4294967295 | 4294967295 | 2147483647 | 9223372036854775807 | Hello World | abc | 123.102 | 123.102 | 123.103 | 123.104 | 404.4443 | 404.4444 | 404.4445 | 123.4567 | 123.4568 | 123.4569 | 346 | 34567892.1 | 0 | 1 | 1 | 2020-07-17 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22.0 | 2020-07-17 18:00:22 | text | EA== | EA== | EA== | EA== | 2021 | red | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0} | {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0} | {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0} | {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0} | {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0} | fine")); - LOG.info("Begin incremental reading stage."); + LOG.info("Verifying streaming stage of DATA_TYPES_TABLE..."); // generate binlogs String mysqlJdbcUrl = String.format( "jdbc:mysql://%s:%s/%s", - MYSQL.getHost(), - MYSQL.getDatabasePort(), - complexDataTypesDatabase.getDatabaseName()); + MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName); try (Connection conn = DriverManager.getConnection( mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); @@ -383,7 +477,7 @@ public void testComplexDataTypes() throws Exception { } validateSinkResult( - complexDataTypesDatabase.getDatabaseName(), + databaseName, "DATA_TYPES_TABLE", 52, Arrays.asList( @@ -396,6 +490,296 @@ public void testComplexDataTypes() throws Exception { } } + @Test + public void testSchemaEvolution() throws Exception { + String databaseName = mysqlInventoryDatabase.getDatabaseName(); + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: doris\n" + + " fenodes: doris:8030\n" + + " benodes: doris:8040\n" + + " username: %s\n" + + " password: \"%s\"\n" + + " table.create.properties.replication_num: 1\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: %d", + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + databaseName, + DORIS.getUsername(), + DORIS.getPassword(), + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, dorisCdcConnector, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + + LOG.info("Verifying snapshot data from `products`..."); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null")); + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null")); + + LOG.info("Verifying snapshot data from `customers`..."); + validateSinkSchema( + databaseName, + "customers", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "address | VARCHAR(3072) | Yes | false | null", + "phone_number | VARCHAR(1536) | Yes | false | null")); + validateSinkResult( + databaseName, + "customers", + 4, + Arrays.asList( + "101 | user_1 | Shanghai | 123567891234", + "102 | user_2 | Shanghai | 123567891234", + "103 | user_3 | Shanghai | 123567891234", + "104 | user_4 | Shanghai | 123567891234")); + + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + LOG.info("Switching to streaming stage..."); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110 + + // Ensure we've entered binlog reading stage + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null")); + + // Schema change - Add Column + LOG.info("Test Schema Change - Add Column..."); + stat.execute("ALTER TABLE products ADD COLUMN extras INT;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null", + "extras | INT | Yes | false | null")); + stat.execute( + "INSERT INTO products VALUES (default, 'blt', 'bacon, lettuce and tomato sandwich', 0.2, null, null, null, 17)"); // 111 + validateSinkResult( + databaseName, + "products", + 8, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | null", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0} | null", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17")); + + // Schema change - Rename Column + LOG.info("Test Schema Change - Rename Column..."); + stat.execute("ALTER TABLE products RENAME COLUMN extras TO extra_col;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null", + "extra_col | INT | Yes | false | null")); + stat.execute( + "INSERT INTO products VALUES (default, 'cheeseburger', 'meat patty, cheese slice and onions', 0.1, null, null, null, 18)"); // 112 + validateSinkResult( + databaseName, + "products", + 8, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | null", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0} | null", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17", + "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null | 18")); + + // Schema change - Alter Column Type + LOG.info("Test Schema Change - Alter Column Type..."); + stat.execute("ALTER TABLE products MODIFY COLUMN extra_col DOUBLE;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null", + "extra_col | DOUBLE | Yes | false | null")); + stat.execute( + "INSERT INTO products VALUES (default, 'fries', 'potato and salt', 0.05, null, null, null, 19.5)"); // 113 + validateSinkResult( + databaseName, + "products", + 8, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0} | null", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0} | null", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0} | null", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0} | null", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0} | null", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null | 17.0", + "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null | 18.0", + "113 | fries | potato and salt | 0.05 | null | null | null | 19.5")); + + // Schema change - Drop Column + LOG.info("Test Schema Change - Drop Column..."); + stat.execute("ALTER TABLE products DROP COLUMN extra_col;"); + validateSinkSchema( + databaseName, + "products", + Arrays.asList( + "id | INT | Yes | true | null", + "name | VARCHAR(765) | Yes | false | flink", + "description | VARCHAR(1536) | Yes | false | null", + "weight | FLOAT | Yes | false | null", + "enum_c | TEXT | Yes | false | red", + "json_c | TEXT | Yes | false | null", + "point_c | TEXT | Yes | false | null")); + stat.execute( + "INSERT INTO products VALUES (default, 'mac', 'cheese', 0.025, null, null, null)"); // 114 + validateSinkResult( + databaseName, + "products", + 7, + Arrays.asList( + "101 | scooter | Small 2-wheel scooter | 3.14 | red | {\"key1\": \"value1\"} | {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}", + "102 | car battery | 12V car battery | 8.1 | white | {\"key2\": \"value2\"} | {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}", + "103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | red | {\"key3\": \"value3\"} | {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}", + "104 | hammer | 12oz carpenter's hammer | 0.75 | white | {\"key4\": \"value4\"} | {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}", + "105 | hammer | 14oz carpenter's hammer | 0.875 | red | {\"k1\": \"v1\", \"k2\": \"v2\"} | {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}", + "106 | hammer | 16oz carpenter's hammer | 1.0 | null | null | null", + "107 | rocks | box of assorted rocks | 5.3 | null | null | null", + "108 | jacket | water resistent black wind breaker | 0.1 | null | null | null", + "109 | spare tire | 24 inch spare tire | 22.2 | null | null | null", + "110 | jacket | water resistent white wind breaker | 0.2 | null | null | null", + "111 | blt | bacon, lettuce and tomato sandwich | 0.2 | null | null | null", + "112 | cheeseburger | meat patty, cheese slice and onions | 0.1 | null | null | null", + "113 | fries | potato and salt | 0.05 | null | null | null", + "114 | mac | cheese | 0.025 | null | null | null")); + + stat.execute("TRUNCATE TABLE products;"); + Thread.sleep(5000L); + stat.execute( + "INSERT INTO products VALUES (default, 'pasta', 'noodles', 0, null, null, null);"); // 1, because truncating resets auto_increment id + + validateSinkResult( + databaseName, + "products", + 7, + Collections.singletonList("1 | pasta | noodles | 0.0 | null | null | null")); + + stat.execute("DROP TABLE products;"); + Thread.sleep(5000L); + SQLException thrown = + assertThrows( + SQLSyntaxErrorException.class, + () -> { + try (Connection connection = + DriverManager.getConnection( + DORIS.getJdbcUrl( + databaseName, + DORIS.getUsername())); + Statement statement = connection.createStatement()) { + statement.executeQuery("SELECT * FROM products;"); + } + }); + assertTrue( + thrown.getMessage() + .contains("errCode = 2, detailMessage = Unknown table 'products'")); + } catch (SQLException e) { + throw new RuntimeException("Failed to trigger schema change.", e); + } + } + public static void createDorisDatabase(String databaseName) { try { Container.ExecResult rs = @@ -437,57 +821,87 @@ public static void dropDorisDatabase(String databaseName) { private void validateSinkResult( String databaseName, String tableName, int columnCount, List expected) throws Exception { - long startWaitingTimestamp = System.currentTimeMillis(); - while (true) { - if (System.currentTimeMillis() - startWaitingTimestamp - > TESTCASE_TIMEOUT_SECONDS * 1000) { - throw new RuntimeException("Doris backend startup timed out."); - } - List results = new ArrayList<>(); - try (Connection conn = - DriverManager.getConnection( - DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); - Statement stat = conn.createStatement()) { - ResultSet rs = - stat.executeQuery( - String.format("SELECT * FROM `%s`.`%s`;", databaseName, tableName)); - - while (rs.next()) { - List columns = new ArrayList<>(); - for (int i = 1; i <= columnCount; i++) { - try { - columns.add(rs.getString(i)); - } catch (SQLException ignored) { - // Column count could change after schema evolution - columns.add(null); - } - } - results.add(String.join(" | ", columns)); - } + waitAndVerify( + databaseName, + "SELECT * FROM " + tableName, + columnCount, + expected, + DEFAULT_RESULT_VERIFY_TIMEOUT.toMillis(), + true); + } - if (expected.size() == results.size()) { - assertEqualsInAnyOrder(expected, results); - break; + private void validateSinkSchema(String databaseName, String tableName, List expected) + throws Exception { + waitAndVerify( + databaseName, + "DESCRIBE " + tableName, + 5, + expected, + DEFAULT_RESULT_VERIFY_TIMEOUT.toMillis(), + false); + } + + private void waitAndVerify( + String databaseName, + String sql, + int numberOfColumns, + List expected, + long timeoutMilliseconds, + boolean inAnyOrder) + throws Exception { + long deadline = System.currentTimeMillis() + timeoutMilliseconds; + while (System.currentTimeMillis() < deadline) { + try { + List actual = fetchTableContent(databaseName, sql, numberOfColumns); + if (inAnyOrder) { + if (expected.stream() + .sorted() + .collect(Collectors.toList()) + .equals(actual.stream().sorted().collect(Collectors.toList()))) { + return; + } } else { - Thread.sleep(1000); + if (expected.equals(actual)) { + return; + } } - } catch (SQLException e) { - LOG.info("Validate sink result failure, waiting for next turn...", e); - Thread.sleep(1000); + LOG.info( + "Executing {}::{} didn't get expected results.\nExpected: {}\n Actual: {}", + databaseName, + sql, + expected, + actual); + } catch (SQLSyntaxErrorException t) { + LOG.info("Database {} isn't ready yet. Waiting for the next loop...", databaseName); } + Thread.sleep(1000L); } + fail(String.format("Failed to verify content of %s::%s.", databaseName, sql)); } - public static void assertEqualsInAnyOrder(List expected, List actual) { - assertTrue(expected != null && actual != null); - assertEqualsInOrder( - expected.stream().sorted().collect(Collectors.toList()), - actual.stream().sorted().collect(Collectors.toList())); - } + private List fetchTableContent(String databaseName, String sql, int columnCount) + throws Exception { - public static void assertEqualsInOrder(List expected, List actual) { - assertTrue(expected != null && actual != null); - assertEquals(expected.size(), actual.size()); - assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + List results = new ArrayList<>(); + try (Connection conn = + DriverManager.getConnection( + DORIS.getJdbcUrl(databaseName, DORIS.getUsername())); + Statement stat = conn.createStatement()) { + ResultSet rs = stat.executeQuery(sql); + + while (rs.next()) { + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + try { + columns.add(rs.getString(i)); + } catch (SQLException ignored) { + // Column count could change after schema evolution + columns.add(null); + } + } + results.add(String.join(" | ", columns)); + } + } + return results; } }