Skip to content

Commit

Permalink
Keep all null rows in table model compaction (apache#13523)
Browse files Browse the repository at this point in the history
* keep all null rows in table model compaction

* add ut

* adddeletion ut

* add ut

* revert some ut

* fix ut

* add context in cross space compaction task selection

* fix bug

* spotless

* modify repair task

* update tsfile version
  • Loading branch information
shuwenwei authored Sep 20, 2024
1 parent 68eb91a commit 598cabe
Show file tree
Hide file tree
Showing 56 changed files with 1,175 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public AlignedSeriesScanOperator(
seriesScanOptions,
context.getInstanceContext(),
queryAllSensors,
dataTypes,
true);
dataTypes);
this.valueColumnCount = seriesPath.getColumnNum();
this.maxReturnSize =
Math.min(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public AlignedSeriesScanUtil(
Ordering scanOrder,
SeriesScanOptions scanOptions,
FragmentInstanceContext context) {
this(seriesPath, scanOrder, scanOptions, context, false, null, true);
this(seriesPath, scanOrder, scanOptions, context, false, null);
}

public AlignedSeriesScanUtil(
Expand All @@ -68,8 +68,7 @@ public AlignedSeriesScanUtil(
SeriesScanOptions scanOptions,
FragmentInstanceContext context,
boolean queryAllSensors,
List<TSDataType> givenDataTypes,
boolean ignoreAllNullRows) {
List<TSDataType> givenDataTypes) {
super(seriesPath, scanOrder, scanOptions, context);
isAligned = true;
this.dataTypes =
Expand All @@ -79,7 +78,7 @@ public AlignedSeriesScanUtil(
.map(IMeasurementSchema::getType)
.collect(Collectors.toList());
this.queryAllSensors = queryAllSensors;
this.ignoreAllNullRows = ignoreAllNullRows;
this.ignoreAllNullRows = context.isIgnoreAllNullRows();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,7 @@ private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceE
seriesScanOptions,
operatorContext.getInstanceContext(),
true,
measurementColumnTSDataTypes,
false);
measurementColumnTSDataTypes);
}

public static AlignedFullPath constructAlignedPath(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ public class DataRegion implements IDataRegionForQuery {

private final AtomicBoolean isCompactionSelecting = new AtomicBoolean(false);

private boolean isTreeModel;

private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET =
QueryResourceMetricSet.getInstance();

Expand All @@ -333,6 +335,7 @@ public DataRegion(
this.dataRegionId = dataRegionId;
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
this.isTreeModel = this.databaseName.startsWith("root.");
acquireDirectBufferMemory();

dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
Expand Down Expand Up @@ -383,6 +386,7 @@ public DataRegion(String databaseName, String id) {
this.dataRegionId = id;
this.tsFileManager = new TsFileManager(databaseName, id, "");
this.partitionMaxFileVersions = new HashMap<>();
this.isTreeModel = this.databaseName.startsWith("root.");
partitionMaxFileVersions.put(0L, 0L);
}

Expand Down Expand Up @@ -2765,7 +2769,7 @@ public int executeCompaction() throws InterruptedException {
// Sort the time partition from largest to smallest
timePartitions.sort(Comparator.reverseOrder());

CompactionScheduleContext context = new CompactionScheduleContext();
CompactionScheduleContext context = new CompactionScheduleContext(this.isTreeModel);

// schedule insert compaction
trySubmitCount += executeInsertionCompaction(timePartitions, context);
Expand Down Expand Up @@ -2808,7 +2812,7 @@ public int executeTTLCheck() throws InterruptedException {
logger.info("[TTL] {}-{} Start ttl checking.", databaseName, dataRegionId);
int trySubmitCount = 0;
try {
CompactionScheduleContext context = new CompactionScheduleContext();
CompactionScheduleContext context = new CompactionScheduleContext(this.isTreeModel);
List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions());
// Sort the time partition from smallest to largest
Collections.sort(timePartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface ICompactionPerformer {

void setSummary(CompactionTaskSummary summary);

void setIgnoreAllNullRows(boolean ignoreAllNullRows);

default void setSourceFiles(List<TsFileResource> files) {
throw new IllegalSourceFileTypeException(
"Cannot set single type of source files to this kind of performer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class FastCompactionPerformer
modificationCache = new ConcurrentHashMap<>();

private final boolean isCrossCompaction;
private boolean ignoreAllNullRows = true;

public FastCompactionPerformer(
List<TsFileResource> seqFiles,
Expand All @@ -115,7 +116,7 @@ public FastCompactionPerformer(boolean isCrossCompaction) {
public void perform() throws Exception {
this.subTaskSummary.setTemporalFileNum(targetFiles.size());
try (MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap);
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, readerCacheMap, ignoreAllNullRows);
AbstractCompactionWriter compactionWriter =
isCrossCompaction
? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap)
Expand Down Expand Up @@ -192,7 +193,7 @@ private void compactAlignedSeries(
// end offset of each timeseries metadata, in order to facilitate the reading of chunkMetadata
// directly by this offset later. Instead of deserializing chunk metadata later, we need to
// deserialize chunk metadata here to get the schemas of all value measurements, because we
// should get schemas of all value measurement to startMeasruement() and compaction process is
// should get schemas of all value measurement to startMeasurement() and compaction process is
// to read a batch of overlapped files each time, and we cannot make sure if the first batch of
// overlapped tsfiles contain all the value measurements.
for (Map.Entry<String, Pair<MeasurementSchema, Map<TsFileResource, Pair<Long, Long>>>> entry :
Expand All @@ -210,7 +211,8 @@ private void compactAlignedSeries(
sortedSourceFiles,
measurementSchemas,
deviceId,
taskSummary)
taskSummary,
ignoreAllNullRows)
.call();
subTaskSummary.increase(taskSummary);
}
Expand Down Expand Up @@ -296,6 +298,11 @@ public void setSummary(CompactionTaskSummary summary) {
this.subTaskSummary = (FastCompactionTaskSummary) summary;
}

@Override
public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
this.ignoreAllNullRows = ignoreAllNullRows;
}

@Override
public void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
this.seqFiles = seqFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer {
private CompactionTsFileWriter currentWriter;
private long endedFileSize = 0;
private int currentTargetFileIndex = 0;
private boolean ignoreAllNullRows = true;
// memory budget for file writer is 5% of per compaction task memory budget
private final long memoryBudgetForFileWriter =
(long)
Expand Down Expand Up @@ -86,7 +87,8 @@ public void perform()
InterruptedException,
StorageEngineException,
PageException {
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles)) {
try (MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, ignoreAllNullRows)) {
schema =
CompactionTableSchemaCollector.collectSchema(seqFiles, deviceIterator.getReaderMap());
while (deviceIterator.hasNextDevice()) {
Expand Down Expand Up @@ -172,6 +174,11 @@ public void setSummary(CompactionTaskSummary summary) {
this.summary = summary;
}

@Override
public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
this.ignoreAllNullRows = ignoreAllNullRows;
}

private void compactAlignedSeries(
IDeviceID device,
TsFileResource targetResource,
Expand All @@ -187,7 +194,7 @@ private void compactAlignedSeries(
writer.startChunkGroup(device);
BatchedReadChunkAlignedSeriesCompactionExecutor compactionExecutor =
new BatchedReadChunkAlignedSeriesCompactionExecutor(
device, targetResource, readerAndChunkMetadataList, writer, summary);
device, targetResource, readerAndChunkMetadataList, writer, summary, ignoreAllNullRows);
compactionExecutor.execute();
for (ChunkMetadata chunkMetadata : writer.getChunkMetadataListOfCurrentDeviceInMemory()) {
if (chunkMetadata.getMeasurementUid().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ReadPointCompactionPerformer
private CompactionTaskSummary summary;

protected List<TsFileResource> targetFiles = Collections.emptyList();
protected boolean ignoreAllNullRows = true;

public ReadPointCompactionPerformer(
List<TsFileResource> seqFiles,
Expand Down Expand Up @@ -112,7 +113,7 @@ public void perform() throws Exception {
getCompactionWriter(seqFiles, unseqFiles, targetFiles)) {
// Do not close device iterator, because tsfile reader is managed by FileReaderManager.
MultiTsFileDeviceIterator deviceIterator =
new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
new MultiTsFileDeviceIterator(seqFiles, unseqFiles, ignoreAllNullRows);
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
seqFiles, unseqFiles, deviceIterator.getReaderMap());
Expand Down Expand Up @@ -152,6 +153,11 @@ public void setSummary(CompactionTaskSummary summary) {
this.summary = summary;
}

@Override
public void setIgnoreAllNullRows(boolean ignoreAllNullRows) {
this.ignoreAllNullRows = ignoreAllNullRows;
}

private void compactAlignedSeries(
IDeviceID device,
MultiTsFileDeviceIterator deviceIterator,
Expand All @@ -169,6 +175,8 @@ private void compactAlignedSeries(
measurementSchemas.stream()
.map(IMeasurementSchema::getMeasurementId)
.collect(Collectors.toList());

fragmentInstanceContext.setIgnoreAllNullRows(ignoreAllNullRows);
IDataBlockReader dataBlockReader =
constructReader(
device,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public class RepairUnsortedFileCompactionPerformer extends ReadPointCompactionPe

private final boolean rewriteFile;

public RepairUnsortedFileCompactionPerformer(boolean rewriteFile) {
public RepairUnsortedFileCompactionPerformer(boolean rewriteFile, boolean ignoreAllNullRows) {
super();
this.rewriteFile = rewriteFile;
this.ignoreAllNullRows = ignoreAllNullRows;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ public RepairUnsortedFileCompactionTask(
TsFileManager tsFileManager,
TsFileResource sourceFile,
boolean sequence,
long serialId) {
long serialId,
boolean ignoreAllNullRows) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(true),
new RepairUnsortedFileCompactionPerformer(true, ignoreAllNullRows),
serialId);
this.sourceFile = sourceFile;
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
Expand All @@ -82,13 +83,14 @@ public RepairUnsortedFileCompactionTask(
TsFileResource sourceFile,
boolean sequence,
boolean rewriteFile,
long serialId) {
long serialId,
boolean ignoreAllNullRows) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(rewriteFile),
new RepairUnsortedFileCompactionPerformer(rewriteFile, ignoreAllNullRows),
serialId);
this.sourceFile = sourceFile;
if (rewriteFile) {
Expand All @@ -103,13 +105,14 @@ public RepairUnsortedFileCompactionTask(
TsFileResource sourceFile,
CountDownLatch latch,
boolean sequence,
long serialId) {
long serialId,
boolean ignoreAllNullRows) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(true),
new RepairUnsortedFileCompactionPerformer(true, ignoreAllNullRows),
serialId);
this.sourceFile = sourceFile;
this.innerSpaceEstimator = new RepairUnsortedFileCompactionEstimator();
Expand All @@ -124,13 +127,14 @@ public RepairUnsortedFileCompactionTask(
CountDownLatch latch,
boolean sequence,
boolean rewriteFile,
long serialId) {
long serialId,
boolean ignoreAllNullRows) {
super(
timePartition,
tsFileManager,
Collections.singletonList(sourceFile),
sequence,
new RepairUnsortedFileCompactionPerformer(rewriteFile),
new RepairUnsortedFileCompactionPerformer(rewriteFile, ignoreAllNullRows),
serialId);
this.sourceFile = sourceFile;
if (rewriteFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class FastCompactionPerformerSubTask implements Callable<Void> {

private final boolean isAligned;

private final boolean ignoreAllNullRows;

private IDeviceID deviceId;

private List<String> measurements;
Expand Down Expand Up @@ -96,6 +98,7 @@ public FastCompactionPerformerSubTask(
this.sortedSourceFiles = sortedSourceFiles;
this.measurements = measurements;
this.summary = summary;
this.ignoreAllNullRows = true;
}

/** Used for aligned timeseries. */
Expand All @@ -108,7 +111,8 @@ public FastCompactionPerformerSubTask(
List<TsFileResource> sortedSourceFiles,
List<IMeasurementSchema> measurementSchemas,
IDeviceID deviceId,
FastCompactionTaskSummary summary) {
FastCompactionTaskSummary summary,
boolean ignoreAllNullRows) {
this.compactionWriter = compactionWriter;
this.subTaskId = 0;
this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap;
Expand All @@ -119,6 +123,7 @@ public FastCompactionPerformerSubTask(
this.sortedSourceFiles = sortedSourceFiles;
this.measurementSchemas = measurementSchemas;
this.summary = summary;
this.ignoreAllNullRows = ignoreAllNullRows;
}

@Override
Expand Down Expand Up @@ -154,7 +159,8 @@ public Void call()
deviceId,
subTaskId,
measurementSchemas,
summary);
summary,
ignoreAllNullRows);
} else {
seriesCompactionExecutor =
new FastAlignedSeriesCompactionExecutor(
Expand All @@ -166,7 +172,8 @@ public Void call()
deviceId,
subTaskId,
measurementSchemas,
summary);
summary,
ignoreAllNullRows);
}
seriesCompactionExecutor.execute();
}
Expand Down
Loading

0 comments on commit 598cabe

Please sign in to comment.