diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 532ca5a1a7e6..412e4143a037 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -116,6 +116,10 @@ public boolean getIsLoaded() { return isLoaded; } + public long getFileStartTime() { + return resource.getFileStartTime(); + } + /////////////////////////// EnrichedEvent /////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index 568c96b52636..63a4098cff9f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch; import org.apache.iotdb.db.pipe.resource.PipeResourceManager; @@ -138,7 +139,26 @@ private void extractTsFileInsertion(PipeRealtimeEvent event) { case USING_TSFILE: return TsFileEpoch.State.USING_TSFILE; case USING_TABLET: - return TsFileEpoch.State.USING_TABLET; + if (((PipeTsFileInsertionEvent) event.getEvent()).getFileStartTime() + < event.getTsFileEpoch().getInsertNodeMinTime()) { + // Some insert nodes in the tsfile epoch are not captured by pipe, so we should + // capture the tsfile event to make sure all data in the tsfile epoch can be + // extracted. + // + // The situation can be caused by the following operations: + // 1. PipeA: start historical data extraction with flush + // 2. Data insertion + // 3. PipeB: start realtime data extraction + // 4. PipeB: start historical data extraction without flush + // 5. Data inserted in the step2 is not captured by PipeB, and if its tsfile + // epoch's state is USING_TABLET, the tsfile event will be ignored, which + // will cause the data loss in the tsfile epoch. + return TsFileEpoch.State.USING_BOTH; + } else { + // All data in the tsfile epoch has been extracted in tablet mode, so we should + // simply keep the state of the tsfile epoch and discard the tsfile event. + return TsFileEpoch.State.USING_TABLET; + } case USING_BOTH: default: return TsFileEpoch.State.USING_BOTH; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index 2d857bcad9b3..68f21ea283ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -75,8 +75,14 @@ private void extractTabletInsertion(PipeRealtimeEvent event) { } private void extractTsFileInsertion(PipeRealtimeEvent event) { - if (!((PipeTsFileInsertionEvent) event.getEvent()).getIsLoaded()) { - // only loaded tsfile can be extracted by this extractor. Ignore this event. + final PipeTsFileInsertionEvent tsFileInsertionEvent = + (PipeTsFileInsertionEvent) event.getEvent(); + if (!(tsFileInsertionEvent.getIsLoaded() + // some insert nodes in the tsfile epoch are not captured by pipe + || tsFileInsertionEvent.getFileStartTime() + < event.getTsFileEpoch().getInsertNodeMinTime())) { + // All data in the tsfile epoch has been extracted in tablet mode, so we should + // simply ignore this event. event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false); return; } @@ -134,7 +140,7 @@ private void extractHeartbeat(PipeRealtimeEvent event) { @Override public boolean isNeedListenToTsFile() { - // Only listen to loaded tsFiles + // Only listen to tsFiles that can't be represented by insertNodes return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java index b0ec157f0166..1ccc4e979dcb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class TsFileEpoch { @@ -31,10 +32,12 @@ public class TsFileEpoch { private final String filePath; private final ConcurrentMap> dataRegionExtractor2State; + private final AtomicLong insertNodeMinTime; public TsFileEpoch(String filePath) { this.filePath = filePath; this.dataRegionExtractor2State = new ConcurrentHashMap<>(); + this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE); } public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) { @@ -57,6 +60,14 @@ public void setExtractorsRecentProcessedTsFileEpochState() { .setRecentProcessedTsFileEpochState(extractor.getTaskID(), state.get())); } + public void updateInsertNodeMinTime(long newComingMinTime) { + insertNodeMinTime.updateAndGet(recordedMinTime -> Math.min(recordedMinTime, newComingMinTime)); + } + + public long getInsertNodeMinTime() { + return insertNodeMinTime.get(); + } + @Override public String toString() { return "TsFileEpoch{" @@ -65,6 +76,9 @@ public String toString() { + '\'' + ", dataRegionExtractor2State=" + dataRegionExtractor2State + + '\'' + + ", insertNodeMinTime=" + + insertNodeMinTime.get() + '}'; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java index fdd2ad85c624..a1ad2f49ec92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java @@ -69,9 +69,12 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent( public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent( PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) { + final TsFileEpoch epoch = + filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new); + epoch.updateInsertNodeMinTime(node.getMinTime()); return new PipeRealtimeEvent( event, - filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new), + epoch, Collections.singletonMap(node.getDevicePath().getFullPath(), node.getMeasurements()), event.getPattern()); }