Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand All @@ -47,7 +48,6 @@
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,20 +61,22 @@
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;

/** Supports {@link DorisDataSink} to schema evolution. */
public class DorisMetadataApplier implements MetadataApplier {
private static final Logger LOG = LoggerFactory.getLogger(DorisMetadataApplier.class);
private DorisOptions dorisOptions;
private SchemaChangeManager schemaChangeManager;
private DorisSchemaChangeManager schemaChangeManager;
private Configuration config;
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;

public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions);
this.config = config;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
Expand All @@ -93,7 +95,13 @@ public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEven

@Override
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
return Sets.newHashSet(ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN);
return Sets.newHashSet(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE);
}

@Override
Expand All @@ -117,14 +125,16 @@ public void applySchemaChange(SchemaChangeEvent event) {
return null;
},
dropTableEvent -> {
throw new UnsupportedSchemaChangeEventException(event);
applyDropTableEvent(dropTableEvent);
return null;
},
renameColumnEvent -> {
applyRenameColumnEvent(renameColumnEvent);
return null;
},
truncateTableEvent -> {
throw new UnsupportedSchemaChangeEventException(event);
applyTruncateTableEvent(truncateTableEvent);
return null;
});
}

Expand Down Expand Up @@ -275,4 +285,23 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
throw new SchemaEvolveException(event, "fail to apply alter column type event", e);
}
}

private void applyTruncateTableEvent(TruncateTableEvent truncateTableEvent)
throws SchemaEvolveException {
TableId tableId = truncateTableEvent.tableId();
try {
schemaChangeManager.truncateTable(tableId.getSchemaName(), tableId.getTableName());
} catch (Exception e) {
throw new SchemaEvolveException(truncateTableEvent, "fail to truncate table", e);
}
}

private void applyDropTableEvent(DropTableEvent dropTableEvent) throws SchemaEvolveException {
TableId tableId = dropTableEvent.tableId();
try {
schemaChangeManager.dropTable(tableId.getSchemaName(), tableId.getTableName());
} catch (Exception e) {
throw new SchemaEvolveException(dropTableEvent, "fail to drop table", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.doris.sink;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
import org.apache.doris.flink.sink.schema.SchemaChangeManager;

import java.io.IOException;

import static org.apache.doris.flink.catalog.doris.DorisSystem.identifier;

/** An enriched version of Doris' {@link SchemaChangeManager}. */
public class DorisSchemaChangeManager extends SchemaChangeManager {
public DorisSchemaChangeManager(DorisOptions dorisOptions) {
super(dorisOptions);
}

public boolean truncateTable(String databaseName, String tableName)
throws IOException, IllegalArgumentException {
String truncateTableDDL =
"TRUNCATE TABLE " + identifier(databaseName) + "." + identifier(tableName);
return this.execute(truncateTableDDL, databaseName);
}

public boolean dropTable(String databaseName, String tableName)
throws IOException, IllegalArgumentException {
String dropTableDDL =
"DROP TABLE " + identifier(databaseName) + "." + identifier(tableName);
return this.execute(dropTableDDL, databaseName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,32 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisContainer;
import org.apache.flink.cdc.connectors.doris.sink.utils.DorisSinkTestBase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -47,10 +54,12 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import java.sql.SQLSyntaxErrorException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -64,6 +73,7 @@
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_BATCH_MODE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_ENABLE_DELETE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
import static org.junit.Assert.fail;

/** IT tests for {@link DorisMetadataApplier}. */
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -429,8 +439,90 @@ public void testDorisNarrowingAlterColumnType() throws Exception {
runJobWithEvents(generateNarrowingAlterColumnTypeEvents(tableId));
}

@Test
public void testDorisTruncateTable() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);

Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();

List<Event> preparationTestingEvents =
Arrays.asList(
new CreateTableEvent(tableId, schema),
DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")),
DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")));
runJobWithEvents(preparationTestingEvents);
waitAndVerify(
tableId,
3,
Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"),
DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L);

List<Event> truncateTestingEvents =
Arrays.asList(
new CreateTableEvent(tableId, schema),
new TruncateTableEvent(tableId),
DataChangeEvent.insertEvent(tableId, generate(schema, 3, 4.5, "Cecily")),
DataChangeEvent.insertEvent(tableId, generate(schema, 4, 5.6, "Derrida")));
runJobWithEvents(truncateTestingEvents);
waitAndVerify(
tableId,
3,
Arrays.asList("3 | 4.5 | Cecily", "4 | 5.6 | Derrida"),
DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L);
}

@Test
public void testDorisDropTable() throws Exception {
TableId tableId =
TableId.tableId(
DorisContainer.DORIS_DATABASE_NAME, DorisContainer.DORIS_TABLE_NAME);

Schema schema =
Schema.newBuilder()
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.primaryKey("id")
.build();

List<Event> preparationTestingEvents =
Arrays.asList(
new CreateTableEvent(tableId, schema),
DataChangeEvent.insertEvent(tableId, generate(schema, 1, 2.3, "Alice")),
DataChangeEvent.insertEvent(tableId, generate(schema, 2, 3.4, "Bob")));
runJobWithEvents(preparationTestingEvents);

waitAndVerify(
tableId,
3,
Arrays.asList("1 | 2.3 | Alice", "2 | 3.4 | Bob"),
DATABASE_OPERATION_TIMEOUT_SECONDS * 1000L);

runJobWithEvents(
Arrays.asList(new CreateTableEvent(tableId, schema), new DropTableEvent(tableId)));

SQLSyntaxErrorException thrown =
Assertions.assertThrows(
SQLSyntaxErrorException.class, () -> fetchTableContent(tableId, 3));
Assertions.assertTrue(
thrown.getMessage()
.contains(
String.format(
"errCode = 2, detailMessage = Unknown table '%s'",
tableId.getTableName())));
}

private void runJobWithEvents(List<Event> events) throws Exception {
DataStream<Event> stream = env.fromCollection(events, TypeInformation.of(Event.class));
DataStream<Event> stream =
env.fromCollection(events, TypeInformation.of(Event.class)).setParallelism(1);

Configuration config =
new Configuration()
Expand Down Expand Up @@ -477,4 +569,38 @@ private void runJobWithEvents(List<Event> events) throws Exception {

env.execute("Doris Schema Evolution Test");
}

BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Arrays.stream(fields)
.map(
e ->
(e instanceof String)
? BinaryStringData.fromString((String) e)
: e)
.toArray());
}

private void waitAndVerify(
TableId tableId, int numberOfColumns, List<String> expected, long timeoutMilliseconds)
throws Exception {
long timeout = System.currentTimeMillis() + timeoutMilliseconds;
while (System.currentTimeMillis() < timeout) {
List<String> actual = fetchTableContent(tableId, numberOfColumns);
if (expected.stream()
.sorted()
.collect(Collectors.toList())
.equals(actual.stream().sorted().collect(Collectors.toList()))) {
return;
}
LOG.info(
"Content of {} isn't ready.\nExpected: {}\nActual: {}",
tableId,
expected,
actual);
Thread.sleep(1000L);
}
fail(String.format("Failed to verify content of %s.", tableId));
}
}
Loading
Loading