Skip to content

Commit

Permalink
Pipe: fixed potential lose point bug caused by cancelled flush of his…
Browse files Browse the repository at this point in the history
…torical extractor (#12056)

The data loss situation can be caused by the following operations:
 1. PipeA: start historical data extraction with flush
 2. Data insertion
 3. PipeB: start realtime data extraction
 4. PipeB: start historical data extraction without flush
 5. Data inserted in the step2 is not captured by PipeB, and if its tsfile epoch's state is USING_TABLET, the tsfile event will be ignored, which will cause the data loss in the tsfile epoch.

---------

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
Caideyipi and SteveYurongSu authored Feb 21, 2024
1 parent 24638f6 commit a67b0c0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public boolean getIsLoaded() {
return isLoaded;
}

public long getFileStartTime() {
return resource.getFileStartTime();
}

/////////////////////////// EnrichedEvent ///////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
Expand Down Expand Up @@ -138,7 +139,26 @@ private void extractTsFileInsertion(PipeRealtimeEvent event) {
case USING_TSFILE:
return TsFileEpoch.State.USING_TSFILE;
case USING_TABLET:
return TsFileEpoch.State.USING_TABLET;
if (((PipeTsFileInsertionEvent) event.getEvent()).getFileStartTime()
< event.getTsFileEpoch().getInsertNodeMinTime()) {
// Some insert nodes in the tsfile epoch are not captured by pipe, so we should
// capture the tsfile event to make sure all data in the tsfile epoch can be
// extracted.
//
// The situation can be caused by the following operations:
// 1. PipeA: start historical data extraction with flush
// 2. Data insertion
// 3. PipeB: start realtime data extraction
// 4. PipeB: start historical data extraction without flush
// 5. Data inserted in the step2 is not captured by PipeB, and if its tsfile
// epoch's state is USING_TABLET, the tsfile event will be ignored, which
// will cause the data loss in the tsfile epoch.
return TsFileEpoch.State.USING_BOTH;
} else {
// All data in the tsfile epoch has been extracted in tablet mode, so we should
// simply keep the state of the tsfile epoch and discard the tsfile event.
return TsFileEpoch.State.USING_TABLET;
}
case USING_BOTH:
default:
return TsFileEpoch.State.USING_BOTH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,14 @@ private void extractTabletInsertion(PipeRealtimeEvent event) {
}

private void extractTsFileInsertion(PipeRealtimeEvent event) {
if (!((PipeTsFileInsertionEvent) event.getEvent()).getIsLoaded()) {
// only loaded tsfile can be extracted by this extractor. Ignore this event.
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) event.getEvent();
if (!(tsFileInsertionEvent.getIsLoaded()
// some insert nodes in the tsfile epoch are not captured by pipe
|| tsFileInsertionEvent.getFileStartTime()
< event.getTsFileEpoch().getInsertNodeMinTime())) {
// All data in the tsfile epoch has been extracted in tablet mode, so we should
// simply ignore this event.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
return;
}
Expand Down Expand Up @@ -134,7 +140,7 @@ private void extractHeartbeat(PipeRealtimeEvent event) {

@Override
public boolean isNeedListenToTsFile() {
// Only listen to loaded tsFiles
// Only listen to tsFiles that can't be represented by insertNodes
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class TsFileEpoch {

private final String filePath;
private final ConcurrentMap<PipeRealtimeDataRegionExtractor, AtomicReference<State>>
dataRegionExtractor2State;
private final AtomicLong insertNodeMinTime;

public TsFileEpoch(String filePath) {
this.filePath = filePath;
this.dataRegionExtractor2State = new ConcurrentHashMap<>();
this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE);
}

public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) {
Expand All @@ -57,6 +60,14 @@ public void setExtractorsRecentProcessedTsFileEpochState() {
.setRecentProcessedTsFileEpochState(extractor.getTaskID(), state.get()));
}

public void updateInsertNodeMinTime(long newComingMinTime) {
insertNodeMinTime.updateAndGet(recordedMinTime -> Math.min(recordedMinTime, newComingMinTime));
}

public long getInsertNodeMinTime() {
return insertNodeMinTime.get();
}

@Override
public String toString() {
return "TsFileEpoch{"
Expand All @@ -65,6 +76,9 @@ public String toString() {
+ '\''
+ ", dataRegionExtractor2State="
+ dataRegionExtractor2State
+ '\''
+ ", insertNodeMinTime="
+ insertNodeMinTime.get()
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent(

public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) {
final TsFileEpoch epoch =
filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new);
epoch.updateInsertNodeMinTime(node.getMinTime());
return new PipeRealtimeEvent(
event,
filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new),
epoch,
Collections.singletonMap(node.getDevicePath().getFullPath(), node.getMeasurements()),
event.getPattern());
}
Expand Down

0 comments on commit a67b0c0

Please sign in to comment.