Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Collection<MultiTableCommittable> prepareCommit() throws IOException {
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue();
boolean waitCompaction = false;
boolean waitCompaction = true;
committables.addAll(
// here we set it to lastCheckpointId+1 to
// avoid prepareCommit the same checkpointId with the first round.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.CsvSource;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -140,7 +140,7 @@ private void initialize(String metastore)
.dropDatabase(TEST_DATABASE, true, true);
}

private List<Event> createTestEvents() throws SchemaEvolveException {
private List<Event> createTestEvents(boolean enableDeleteVectors) throws SchemaEvolveException {
List<Event> testEvents = new ArrayList<>();
// create table
Schema schema =
Expand All @@ -149,6 +149,7 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.option("bucket", "1")
.option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors))
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
testEvents.add(createTableEvent);
Expand Down Expand Up @@ -180,8 +181,8 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testSinkWithDataChange(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithDataChange(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -192,7 +193,7 @@ public void testSinkWithDataChange(String metastore)
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// insert
for (Event event : createTestEvents()) {
for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null);
}
writer.flush(false);
Expand All @@ -215,7 +216,7 @@ public void testSinkWithDataChange(String metastore)
// delete
Event event =
DataChangeEvent.deleteEvent(
TableId.tableId("test", "table1"),
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
Expand All @@ -240,7 +241,7 @@ public void testSinkWithDataChange(String metastore)
// update
event =
DataChangeEvent.updateEvent(
TableId.tableId("test", "table1"),
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
Expand Down Expand Up @@ -273,17 +274,19 @@ public void testSinkWithDataChange(String metastore)
.collect()
.forEachRemaining(result::add);
// Each commit will generate one sequence number(equal to checkpointId).
Assertions.assertEquals(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L),
Row.ofKind(RowKind.INSERT, 3L)),
result);
List<Row> expected =
enableDeleteVector
? Collections.singletonList(Row.ofKind(RowKind.INSERT, 3L))
: Arrays.asList(
Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L),
Row.ofKind(RowKind.INSERT, 3L));
Assertions.assertEquals(expected, result);
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testSinkWithSchemaChange(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -294,7 +297,7 @@ public void testSinkWithSchemaChange(String metastore)
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// 1. receive only DataChangeEvents during one checkpoint
for (Event event : createTestEvents()) {
for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null);
}
writer.flush(false);
Expand Down Expand Up @@ -427,8 +430,8 @@ public void testSinkWithSchemaChange(String metastore)
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testSinkWithMultiTables(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -437,7 +440,7 @@ public void testSinkWithMultiTables(String metastore)
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
List<Event> testEvents = createTestEvents();
List<Event> testEvents = createTestEvents(enableDeleteVector);
// create table
TableId table2 = TableId.tableId("test", "table2");
Schema schema =
Expand Down Expand Up @@ -492,8 +495,8 @@ public void testSinkWithMultiTables(String metastore)
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testDuplicateCommitAfterRestore(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testDuplicateCommitAfterRestore(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -504,7 +507,7 @@ public void testDuplicateCommitAfterRestore(String metastore)
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// insert
for (Event event : createTestEvents()) {
for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null);
}
writer.flush(false);
Expand Down Expand Up @@ -553,8 +556,13 @@ public void testDuplicateCommitAfterRestore(String metastore)
.execute()
.collect()
.forEachRemaining(result::add);
// 8 APPEND and 1 COMPACT
Assertions.assertEquals(result.size(), 9);
if (enableDeleteVector) {
// Each APPEND will trigger COMPACT once enable deletion-vectors.
Assertions.assertEquals(16, result.size());
} else {
// 8 APPEND and 1 COMPACT
Assertions.assertEquals(9, result.size());
}
result.clear();

tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")
Expand Down
Loading