Skip to content

Commit

Permalink
[core] fix bug of watermark override with append only table
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Aug 1, 2024
1 parent 5bd3c6f commit f70aae1
Show file tree
Hide file tree
Showing 33 changed files with 93 additions and 58 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/spec/snapshot.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
12. totalRecordCount: record count of all changes occurred in this snapshot.
13. deltaRecordCount: record count of all new changes occurred in this snapshot.
14. changelogRecordCount: record count of all changelog produced in this snapshot.
15. watermark: watermark for input records, from Flink watermark mechanism, null if there is no watermark.
15. watermark: watermark for input records, from Flink watermark mechanism, Long.MIN_VALUE if there is no watermark.
16. statistics: stats file name for statistics of this table.
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
<td>Bucket number for file store.<br />It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).</td>
</tr>
<tr>
<td><h5>bucket-key</h5></td>
Expand Down Expand Up @@ -742,7 +742,7 @@
<td><h5>source.split.target-size</h5></td>
<td style="word-wrap: break-word;">128 mb</td>
<td>MemorySize</td>
<td>Target size of a source split when scanning a bucket.</td>
<td>Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).</td>
</tr>
<tr>
<td><h5>spill-compression</h5></td>
Expand Down Expand Up @@ -856,7 +856,7 @@
<td><h5>write-max-writers-to-spill</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. </td>
<td>When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.</td>
</tr>
<tr>
<td><h5>write-only</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class CoreOptions implements Serializable {
.text("Bucket number for file store.")
.linebreak()
.text(
"It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).")
"It should either be equal to -1 (dynamic bucket mode or unaware bucket mode), or it must be greater than 0 (fixed bucket mode).")
.build());

@Immutable
Expand Down Expand Up @@ -368,7 +368,8 @@ public class CoreOptions implements Serializable {
key("source.split.target-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(128))
.withDescription("Target size of a source split when scanning a bucket.");
.withDescription(
"Target size of a source split when scanning a partition (unaware bucket mode) or bucket (non unaware bucket mode).");

public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST =
key("source.split.open-file-cost")
Expand Down Expand Up @@ -412,7 +413,7 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(5)
.withDescription(
"When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. ");
"When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory.");

public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
key("write-manifest-cache")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public FileIndexResult test(Predicate predicate) {
public FileIndexResult visit(LeafPredicate predicate) {
FileIndexResult compoundResult = REMAIN;
FieldRef fieldRef =
new FieldRef(predicate.index(), predicate.fieldName(), predicate.type());
new FieldRef(predicate.fieldIndex(), predicate.fieldName(), predicate.type());
for (FileIndexReader fileIndexReader : columnIndexReaders.get(predicate.fieldName())) {
compoundResult =
compoundResult.and(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public DataType type() {
return type;
}

public int index() {
public int fieldIndex() {
return fieldIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public static Optional<Predicate> transformFieldMapping(
return Optional.of(new CompoundPredicate(compoundPredicate.function(), children));
} else {
LeafPredicate leafPredicate = (LeafPredicate) predicate;
int mapped = fieldIdxMapping[leafPredicate.index()];
int mapped = fieldIdxMapping[leafPredicate.fieldIndex()];
if (mapped >= 0) {
return Optional.of(
new LeafPredicate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public PredicateProjectionConverter(int[] projection) {

@Override
public Optional<Predicate> visit(LeafPredicate predicate) {
int index = predicate.index();
int index = predicate.fieldIndex();
Integer adjusted = reversed.get(index);
if (adjusted == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public interface Filter<T> {
Filter<?> ALWAYS_TRUE = t -> true;

/**
* Evaluates this predicate on the given argument.
* Evaluates this filter on the given argument.
*
* @param t the input argument
* @return {@code true} if the input argument matches the predicate, otherwise {@code false}
* @return {@code true} if the input argument matches the filter, otherwise {@code false}
*/
boolean test(T t);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void testSetAndGet() throws IOException, ClassNotFoundException {

@Test
public void testWriter() {

int arity = 13;
BinaryRow row = new BinaryRow(arity);
BinaryRowWriter writer = new BinaryRowWriter(row, 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public AppendOnlyFileStoreWrite newWrite(
}

private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
ScanBucketFilter bucketKeyFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate predicate) {
Expand All @@ -132,7 +132,7 @@ public void pushdown(Predicate predicate) {

return new AppendOnlyFileStoreScan(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager(),
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
}

private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
ScanBucketFilter bucketKeyFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
public void pushdown(Predicate keyFilter) {
Expand All @@ -224,7 +224,7 @@ public void pushdown(Predicate keyFilter) {
};
return new KeyValueFileStoreScan(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager(),
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final FileIndexOptions fileIndexOptions;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;
private final MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
return new RecordLevelExpire(fieldIndex, (int) expireTime.getSeconds());
}

public RecordLevelExpire(int timeField, int expireTime) {
private RecordLevelExpire(int timeField, int expireTime) {
this.timeField = timeField;
this.expireTime = expireTime;
}
Expand All @@ -78,7 +78,7 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor
return file -> wrap(readerFactory.createRecordReader(file));
}

public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public String toString() {
*/
public static Filter<InternalRow> createCacheRowFilter(
@Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) {
// manifestCacheFilter is not null only when write
if (manifestCacheFilter == null) {
return Filter.alwaysTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
@Nullable Integer manifestReadParallelism)
throws Exception {
// 1. should trigger full compaction

List<ManifestFileMeta> base = new ArrayList<>();
long totalManifestSize = 0;
int i = 0;
Expand Down Expand Up @@ -276,14 +275,12 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2. do full compaction

LOG.info(
"Start Manifest File Full Compaction, pick the number of delete file: {}, total manifest file size: {}",
deltaDeleteFileNum,
totalManifestSize);

// 2.1. try to skip base files by partition filter

Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
FileEntry.mergeEntries(manifestFile, delta, deltaMerged, manifestReadParallelism);

Expand Down Expand Up @@ -317,7 +314,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.2. try to skip base files by reading entries

Set<Identifier> deleteEntries = new HashSet<>();
deltaMerged.forEach(
(k, v) -> {
Expand Down Expand Up @@ -349,7 +345,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.3. merge

RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
manifestFile.createRollingWriter();
Exception exception = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan() {
List<ManifestEntry> files = new ArrayList<>();
long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();
for (ManifestEntry file : mergedEntries) {
// checkNumOfBuckets is true only when write
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
Expand Down Expand Up @@ -487,6 +488,7 @@ private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
.read(
manifest.fileName(),
manifest.fileSize(),
// ManifestEntry#createCacheRowFilter alway return true when read
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, fileNameFilter, numOfBuckets));
Expand All @@ -502,6 +504,7 @@ private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) {
// use filter for ManifestEntry
// currently, projection is not pushed down to file format
// see SimpleFileEntrySerializer
// ManifestEntry#createCacheRowFilter alway return true when read
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
ManifestEntry.createEntryRowFilter(
partitionFilter, bucketFilter, fileNameFilter, numOfBuckets));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {

public AppendOnlyFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
Expand All @@ -65,7 +65,7 @@ public AppendOnlyFileStoreScan(
boolean fileIndexReadEnabled) {
super(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager,
schemaManager,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {

public KeyValueFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketFilter,
ScanBucketFilter bucketKeyFilter,
SnapshotManager snapshotManager,
SchemaManager schemaManager,
TableSchema schema,
Expand All @@ -66,7 +66,7 @@ public KeyValueFileStoreScan(
MergeEngine mergeEngine) {
super(
partitionType,
bucketFilter,
bucketKeyFilter,
snapshotManager,
schemaManager,
schema,
Expand Down Expand Up @@ -152,8 +152,8 @@ private List<ManifestEntry> filterWholeBucketPerFile(List<ManifestEntry> entries
}

private List<ManifestEntry> filterWholeBucketAllFiles(List<ManifestEntry> entries) {
// entries come from the same bucket, if any of it doesn't meet the request, we could
// filter the bucket.
// entries come from the same bucket, if all of thwm doesn't meet the request, we could
// filter the whole bucket.
for (ManifestEntry entry : entries) {
if (filterByValueFilter(entry)) {
return entries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ static Optional<ScanBucketSelector> create(Predicate bucketPredicate, RowType bu
for (Predicate orPredicate : splitOr(andPredicate)) {
if (orPredicate instanceof LeafPredicate) {
LeafPredicate leaf = (LeafPredicate) orPredicate;
if (reference == null || reference == leaf.index()) {
reference = leaf.index();
if (reference == null || reference == leaf.fieldIndex()) {
reference = leaf.fieldIndex();
if (leaf.function().equals(Equal.INSTANCE)
|| leaf.function().equals(In.INSTANCE)) {
values.addAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer() {
// filter: value = 1
// if we perform filter push down on values, data file 1 will be chosen, but data
// file 2 will be ignored, and the final result will be key = a, value = 1 while the
// correct result is an empty set
// correct result is an empty set.
List<Predicate> keyFilters =
pickTransformFieldMapping(
splitAnd(predicate),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ public class AuditLogTable implements DataTable, ReadonlyTable {

public static final PredicateReplaceVisitor PREDICATE_CONVERTER =
p -> {
if (p.index() == 0) {
if (p.fieldIndex() == 0) {
return Optional.empty();
}
return Optional.of(
new LeafPredicate(
p.function(),
p.type(),
p.index() - 1,
p.fieldIndex() - 1,
p.fieldName(),
p.literals()));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private List<T> readWithIOException(
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
// cache is alway null when read
if (cache != null) {
return cache.read(fileName, fileSize, loadFilter, readFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected ManifestFile createManifestFile(String pathStr) {
path,
getPartitionType(),
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down Expand Up @@ -166,7 +166,7 @@ protected void assertSameContent(

protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean hasPartition) {
List<ManifestFileMeta> input = new ArrayList<>();
// base with 3 partition ,16 entry each parition
// base with 3 partition, 16 entry each partition
for (int j = 0; j < 3; j++) {
List<ManifestEntry> entrys = new ArrayList<>();
for (int i = 0; i < 16; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void testCreateDataFilters() {
LeafPredicate child1 = (LeafPredicate) filters.get(0);
assertThat(child1.function()).isEqualTo(IsNull.INSTANCE);
assertThat(child1.fieldName()).isEqualTo("b");
assertThat(child1.index()).isEqualTo(1);
assertThat(child1.fieldIndex()).isEqualTo(1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ private void buildForDividedMode() {
}

private void buildForCombinedMode() {

ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
Expand Down Expand Up @@ -224,7 +223,6 @@ private void buildForTraditionalCompaction(
String fullName,
FileStoreTable table,
boolean isStreaming) {

CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);

Expand Down
Loading

0 comments on commit f70aae1

Please sign in to comment.