Skip to content

Commit

Permalink
Pipe: Fix startup failure of PipeHistoricalDataRegionTsFileExtractor …
Browse files Browse the repository at this point in the history
…due to unprepared StorageEngine (#13526)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
luoluoyuyu and SteveYurongSu authored Sep 18, 2024
1 parent 324275f commit 07f1475
Showing 1 changed file with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
private boolean isTerminateSignalSent = false;

private volatile boolean hasBeenStarted = false;

private Queue<TsFileResource> pendingQueue;

@Override
Expand Down Expand Up @@ -368,12 +370,18 @@ private void flushDataRegionAllTsFiles() {

@Override
public synchronized void start() {
if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
if (!shouldExtractInsertion) {
hasBeenStarted = true;
return;
}
if (!shouldExtractInsertion) {
if (!StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
LOGGER.info(
"Pipe {}@{}: failed to start to extract historical TsFile, storage engine is not ready. Will retry later.",
pipeName,
dataRegionId);
return;
}
hasBeenStarted = true;

final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
Expand Down Expand Up @@ -567,6 +575,10 @@ private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final TsFileResou

@Override
public synchronized Event supply() {
if (!hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
start();
}

if (Objects.isNull(pendingQueue)) {
return null;
}
Expand Down Expand Up @@ -636,9 +648,10 @@ public synchronized Event supply() {
public synchronized boolean hasConsumedAll() {
// If the pendingQueue is null when the function is called, it implies that the extractor only
// extracts deletion thus the historical event has nothing to consume.
return Objects.isNull(pendingQueue)
|| pendingQueue.isEmpty()
&& (!shouldTerminatePipeOnAllHistoricalEventsConsumed || isTerminateSignalSent);
return hasBeenStarted
&& (Objects.isNull(pendingQueue)
|| pendingQueue.isEmpty()
&& (!shouldTerminatePipeOnAllHistoricalEventsConsumed || isTerminateSignalSent));
}

@Override
Expand Down

0 comments on commit 07f1475

Please sign in to comment.