diff --git a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java index 030859965..bd78bd6c5 100644 --- a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java @@ -91,6 +91,7 @@ final class StreamProcessor implements DataSource { private final DataStoreStatusProvider.StatusListener statusListener; private volatile EventSource es; private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); private volatile long esStarted = 0; private volatile boolean lastStoreUpdateFailed = false; private final LDLogger logger; @@ -183,11 +184,25 @@ public Future start() { // EventSource will start the stream connection either way, but if we called start(), it // would swallow any FaultEvents that happened during the initial conection attempt; we // want to know about those. - for (StreamEvent event: es.anyEvents()) { - if (!handleEvent(event, initFuture)) { - // handleEvent returns false if we should fall through and end the thread - break; + try { + for (StreamEvent event: es.anyEvents()) { + if (!handleEvent(event, initFuture)) { + // handleEvent returns false if we should fall through and end the thread + break; + } + } + } catch (Exception e) { + // Any uncaught runtime exception at this point would be coming from es.anyEvents(). + // That's not expected-- all deliberate EventSource exceptions are checked exceptions. + // So we have to assume something is wrong that we can't recover from at this point, + // and just let the thread terminate. That's better than having the thread be killed + // by an uncaught exception. + if (closed.get()) { + return; // ignore any exception that's just a side effect of stopping the EventSource } + logger.error("Stream thread has ended due to unexpected exception: {}", LogValues.exceptionSummary(e)); + // deliberately log stacktrace at error level since this is an unusual circumstance + logger.error(LogValues.exceptionTrace(e)); } }); thread.setName("LaunchDarkly-streaming"); @@ -206,6 +221,9 @@ private void recordStreamInit(boolean failed) { @Override public void close() throws IOException { + if (closed.getAndSet(true)) { + return; // was already closed + } logger.info("Closing LaunchDarkly StreamProcessor"); if (statusListener != null) { dataSourceUpdates.getDataStoreStatusProvider().removeStatusListener(statusListener); @@ -224,6 +242,9 @@ public boolean isInitialized() { // Handles a single StreamEvent and returns true if we should keep the stream alive, // or false if we should shut down permanently. private boolean handleEvent(StreamEvent event, CompletableFuture initFuture) { + if (closed.get()) { + return false; + } logger.debug("Received StreamEvent: {}", event); if (event instanceof MessageEvent) { handleMessage((MessageEvent)event, initFuture); diff --git a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java index 1078b653c..945dae0e2 100644 --- a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java @@ -1,6 +1,8 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.logging.LDLogLevel; +import com.launchdarkly.logging.LogCapture; import com.launchdarkly.sdk.server.DataModel.FeatureFlag; import com.launchdarkly.sdk.server.DataModel.Segment; import com.launchdarkly.sdk.server.DataModel.VersionedData; @@ -46,6 +48,7 @@ import static com.launchdarkly.sdk.server.TestComponents.dataSourceUpdates; import static com.launchdarkly.sdk.server.TestUtil.requireDataSourceStatus; import static com.launchdarkly.testhelpers.ConcurrentHelpers.assertFutureIsCompleted; +import static com.launchdarkly.testhelpers.ConcurrentHelpers.assertNoMoreValues; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -53,6 +56,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -685,6 +689,39 @@ public void testSpecialHttpConfigurations() throws Exception { ); } + @Test + public void closingStreamProcessorDoesNotLogNetworkError() throws Exception { + // This verifies that we're not generating misleading log output or status updates + // due to simply seeing a broken connection when we have already decided to shut down. + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataSourceUpdates.statusBroadcaster.register(statuses::add); + + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + dataSourceUpdates.awaitInit(); + requireDataSourceStatus(statuses, State.VALID); + + while (logCapture.awaitMessage(10) != null) {} // drain captured logs + + sp.close(); + + requireDataSourceStatus(statuses, State.OFF); // should not see INTERRUPTED + assertNoMoreValues(statuses, 100, TimeUnit.MILLISECONDS); + + assertThat(logCapture.requireMessage(10).getText(), startsWith("Closing LaunchDarkly")); + // There shouldn't be any other log output other than debugging + for (;;) { + LogCapture.Message message = logCapture.awaitMessage(10); + if (message == null) { + break; + } + assertThat(message.getLevel(), equalTo(LDLogLevel.DEBUG)); + } + } + } + } + private void testUnrecoverableHttpError(int statusCode) throws Exception { Handler errorResp = Handlers.status(statusCode);