Skip to content

Commit

Permalink
PipeConsensus: always execute flush for historical data extraction of…
Browse files Browse the repository at this point in the history
… consensus pipe to reduce data sync delay (apache#14132)
  • Loading branch information
VGalaxies authored Nov 21, 2024
1 parent 4420f21 commit 43ed865
Showing 1 changed file with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
Expand Down Expand Up @@ -502,6 +503,22 @@ public synchronized void start() {
private void flushTsFilesForExtraction(
DataRegion dataRegion, final long startHistoricalExtractionTime) {
LOGGER.info("Pipe {}@{}: start to flush data region", pipeName, dataRegionId);

// Consider the scenario: a consensus pipe comes to the same region, followed by another pipe
// **immediately**, the latter pipe will skip the flush operation.
// Since a large number of consensus pipes are not created at the same time, resulting in no
// serious waiting for locks. Therefore, the flush operation is always performed for the
// consensus pipe, and the lastFlushed timestamp is not updated here.
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
LOGGER.info(
"Pipe {}@{}: finish to flush data region, took {} ms",
pipeName,
dataRegionId,
System.currentTimeMillis() - startHistoricalExtractionTime);
return;
}

synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
final long lastFlushedByPipeTime = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
Expand Down

0 comments on commit 43ed865

Please sign in to comment.