Skip to content

Commit

Permalink
Do not throw exceptions resulting from persisting datafeed timing sta…
Browse files Browse the repository at this point in the history
…ts. (#49044) (#49051)
  • Loading branch information
przemekwitek authored Nov 13, 2019
1 parent 1cc9878 commit 457939e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ public Integer getMaxEmptySearches() {
}

public void finishReportingTimingStats() {
timingStatsReporter.finishReporting();
try {
timingStatsReporter.finishReporting();
} catch (Exception e) {
// We don't want the exception to propagate out of this method as it can leave the datafeed in the "stopping" state forever.
// Since persisting datafeed timing stats is not critical, we just log a warning here.
LOGGER.warn("[{}] Datafeed timing stats could not be reported due to: {}", jobId, e);
}
}

Long runLookBack(long startTime, Long endTime) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public void stop(String source, TimeValue timeout, Exception e, boolean autoClos
acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} finally {
} finally { // It is crucial that none of the calls this "finally" block makes throws an exception for minor problems.
logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId,
datafeedJob.getJobId(), acquired);
runningDatafeedsOnThisNode.remove(allocationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -454,6 +456,15 @@ public void testFlushAnalysisProblemIsConflict() {
assertThat(analysisProblemException.shouldStop, is(true));
}

public void testFinishReportingTimingStats() {
doThrow(new EsRejectedExecutionException()).when(timingStatsReporter).finishReporting();

long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean());
datafeedJob.finishReportingTimingStats();
}

private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs, boolean haveSeenDataPreviously) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
Expand Down

0 comments on commit 457939e

Please sign in to comment.