diff --git a/build.gradle b/build.gradle index c3f0b805c..541c54980 100644 --- a/build.gradle +++ b/build.gradle @@ -121,15 +121,13 @@ libraries.optional = [ // Add dependencies to "libraries.test" that are used only in unit tests. libraries.test = [ - // Note that the okhttp3 test deps must be kept in sync with the okhttp version used in okhttp-eventsource - "com.squareup.okhttp3:mockwebserver:${versions.okhttp}", - "com.squareup.okhttp3:okhttp-tls:${versions.okhttp}", "org.hamcrest:hamcrest-all:1.3", "org.easymock:easymock:3.4", "junit:junit:4.12", "ch.qos.logback:logback-classic:1.1.7", "com.fasterxml.jackson.core:jackson-core:${versions.jackson}", - "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}" + "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}", + "com.launchdarkly:test-helpers:1.0.0" ] configurations { diff --git a/src/main/java/com/launchdarkly/sdk/server/Components.java b/src/main/java/com/launchdarkly/sdk/server/Components.java index e2a4092f8..0b2694fff 100644 --- a/src/main/java/com/launchdarkly/sdk/server/Components.java +++ b/src/main/java/com/launchdarkly/sdk/server/Components.java @@ -187,6 +187,11 @@ public static PollingDataSourceBuilder pollingDataSource() { return new PollingDataSourceBuilderImpl(); } + // For testing only - allows us to override the minimum polling interval + static PollingDataSourceBuilderImpl pollingDataSourceInternal() { + return new PollingDataSourceBuilderImpl(); + } + /** * Returns a configuration object that disables a direct connection with LaunchDarkly for feature flag updates. *

diff --git a/src/main/java/com/launchdarkly/sdk/server/ComponentsImpl.java b/src/main/java/com/launchdarkly/sdk/server/ComponentsImpl.java index 370f1a1e3..33ddaa4fb 100644 --- a/src/main/java/com/launchdarkly/sdk/server/ComponentsImpl.java +++ b/src/main/java/com/launchdarkly/sdk/server/ComponentsImpl.java @@ -33,6 +33,7 @@ import java.net.InetSocketAddress; import java.net.Proxy; import java.net.URI; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -143,7 +144,6 @@ public DataSource createDataSource(ClientContext context, DataSourceUpdates data return new StreamProcessor( context.getHttp(), dataSourceUpdates, - null, context.getBasic().getThreadPriority(), ClientContextImpl.get(context).diagnosticAccumulator, streamUri, @@ -165,6 +165,12 @@ public LDValue describeConfiguration(BasicConfiguration basicConfiguration) { } static final class PollingDataSourceBuilderImpl extends PollingDataSourceBuilder implements DiagnosticDescription { + // for testing only + PollingDataSourceBuilderImpl pollIntervalWithNoMinimum(Duration pollInterval) { + this.pollInterval = pollInterval; + return this; + } + @Override public DataSource createDataSource(ClientContext context, DataSourceUpdates dataSourceUpdates) { // Note, we log startup messages under the LDClient class to keep logs more readable diff --git a/src/main/java/com/launchdarkly/sdk/server/PollingProcessor.java b/src/main/java/com/launchdarkly/sdk/server/PollingProcessor.java index 45f49cb09..31ad32a39 100644 --- a/src/main/java/com/launchdarkly/sdk/server/PollingProcessor.java +++ b/src/main/java/com/launchdarkly/sdk/server/PollingProcessor.java @@ -111,6 +111,10 @@ private void poll() { } else { dataSourceUpdates.updateStatus(State.OFF, errorInfo); initFuture.complete(null); // if client is initializing, make it stop waiting; has no effect if already inited + if (task != null) { + task.cancel(true); + task = null; + } } } catch (IOException e) { checkIfErrorIsRecoverableAndLog(logger, e.toString(), ERROR_CONTEXT_MESSAGE, 0, WILL_RETRY_MESSAGE); diff --git a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java index 36c4e6cab..b0758eac4 100644 --- a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java @@ -84,7 +84,6 @@ final class StreamProcessor implements DataSource { @VisibleForTesting final URI streamUri; @VisibleForTesting final Duration initialReconnectDelay; private final DiagnosticAccumulator diagnosticAccumulator; - private final EventSourceCreator eventSourceCreator; private final int threadPriority; private final DataStoreStatusProvider.StatusListener statusListener; private volatile EventSource es; @@ -94,34 +93,9 @@ final class StreamProcessor implements DataSource { ConnectionErrorHandler connectionErrorHandler = createDefaultConnectionErrorHandler(); // exposed for testing - static final class EventSourceParams { - final EventHandler handler; - final URI streamUri; - final Duration initialReconnectDelay; - final ConnectionErrorHandler errorHandler; - final Headers headers; - final HttpConfiguration httpConfig; - - EventSourceParams(EventHandler handler, URI streamUri, Duration initialReconnectDelay, - ConnectionErrorHandler errorHandler, Headers headers, HttpConfiguration httpConfig) { - this.handler = handler; - this.streamUri = streamUri; - this.initialReconnectDelay = initialReconnectDelay; - this.errorHandler = errorHandler; - this.headers = headers; - this.httpConfig = httpConfig; - } - } - - @FunctionalInterface - static interface EventSourceCreator { - EventSource createEventSource(EventSourceParams params); - } - StreamProcessor( HttpConfiguration httpConfig, DataSourceUpdates dataSourceUpdates, - EventSourceCreator eventSourceCreator, int threadPriority, DiagnosticAccumulator diagnosticAccumulator, URI streamUri, @@ -130,7 +104,6 @@ static interface EventSourceCreator { this.dataSourceUpdates = dataSourceUpdates; this.httpConfig = httpConfig; this.diagnosticAccumulator = diagnosticAccumulator; - this.eventSourceCreator = eventSourceCreator != null ? eventSourceCreator : this::defaultEventSourceCreator; this.threadPriority = threadPriority; this.streamUri = streamUri; this.initialReconnectDelay = initialReconnectDelay; @@ -202,13 +175,26 @@ public Future start() { }; EventHandler handler = new StreamEventHandler(initFuture); - - es = eventSourceCreator.createEventSource(new EventSourceParams(handler, - concatenateUriPath(streamUri, STREAM_URI_PATH), - initialReconnectDelay, - wrappedConnectionErrorHandler, - headers, - httpConfig)); + URI endpointUri = concatenateUriPath(streamUri, STREAM_URI_PATH); + + EventSource.Builder builder = new EventSource.Builder(handler, endpointUri) + .threadPriority(threadPriority) + .loggerBaseName(Loggers.DATA_SOURCE_LOGGER_NAME) + .clientBuilderActions(new EventSource.Builder.ClientConfigurer() { + public void configure(OkHttpClient.Builder builder) { + configureHttpClientBuilder(httpConfig, builder); + } + }) + .connectionErrorHandler(wrappedConnectionErrorHandler) + .headers(headers) + .reconnectTime(initialReconnectDelay) + .readTimeout(DEAD_CONNECTION_INTERVAL); + // Note that this is not the same read timeout that can be set in LDConfig. We default to a smaller one + // there because we don't expect long delays within any *non*-streaming response that the LD client gets. + // A read timeout on the stream will result in the connection being cycled, so we set this to be slightly + // more than the expected interval between heartbeat signals. + + es = builder.build(); esStarted = System.currentTimeMillis(); es.start(); return initFuture; @@ -356,27 +342,6 @@ public void onError(Throwable throwable) { } } - private EventSource defaultEventSourceCreator(EventSourceParams params) { - EventSource.Builder builder = new EventSource.Builder(params.handler, params.streamUri) - .threadPriority(threadPriority) - .loggerBaseName(Loggers.DATA_SOURCE_LOGGER_NAME) - .clientBuilderActions(new EventSource.Builder.ClientConfigurer() { - public void configure(OkHttpClient.Builder builder) { - configureHttpClientBuilder(params.httpConfig, builder); - } - }) - .connectionErrorHandler(params.errorHandler) - .headers(params.headers) - .reconnectTime(params.initialReconnectDelay) - .readTimeout(DEAD_CONNECTION_INTERVAL); - // Note that this is not the same read timeout that can be set in LDConfig. We default to a smaller one - // there because we don't expect long delays within any *non*-streaming response that the LD client gets. - // A read timeout on the stream will result in the connection being cycled, so we set this to be slightly - // more than the expected interval between heartbeat signals. - - return builder.build(); - } - private static Map.Entry getKindAndKeyFromStreamApiPath(String path) throws StreamInputException { if (path == null) { throw new StreamInputException("missing item path"); diff --git a/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java b/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java index 630b53c26..1c7c60fb1 100644 --- a/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java +++ b/src/main/java/com/launchdarkly/sdk/server/integrations/FileData.java @@ -1,7 +1,5 @@ package com.launchdarkly.sdk.server.integrations; -import com.launchdarkly.sdk.server.LDConfig; - /** * Integration between the LaunchDarkly SDK and file data. *

@@ -59,9 +57,9 @@ public enum DuplicateKeysHandling { * This will cause the client not to connect to LaunchDarkly to get feature flags. The * client may still make network connections to send analytics events, unless you have disabled * this with {@link com.launchdarkly.sdk.server.Components#noEvents()}. IMPORTANT: Do not - * set {@link LDConfig.Builder#offline(boolean)} to {@code true}; doing so would not just put the - * SDK "offline" with regard to LaunchDarkly, but will completely turn off all flag data sources - * to the SDK including the file data source. + * set {@link com.launchdarkly.sdk.server.LDConfig.Builder#offline(boolean)} to {@code true}; doing so + * would not just put the SDK "offline" with regard to LaunchDarkly, but will completely turn off + * all flag data sources to the SDK including the file data source. *

* Flag data files can be either JSON or YAML. They contain an object with three possible * properties: diff --git a/src/test/java/com/launchdarkly/sdk/server/DataStoreTestTypes.java b/src/test/java/com/launchdarkly/sdk/server/DataStoreTestTypes.java index 5d728d018..4a4a28595 100644 --- a/src/test/java/com/launchdarkly/sdk/server/DataStoreTestTypes.java +++ b/src/test/java/com/launchdarkly/sdk/server/DataStoreTestTypes.java @@ -3,6 +3,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.launchdarkly.sdk.LDValue; +import com.launchdarkly.sdk.ObjectBuilder; import com.launchdarkly.sdk.server.DataModel.VersionedData; import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.FullDataSet; @@ -137,6 +139,15 @@ private static ItemDescriptor deserializeTestItem(String s) { public static class DataBuilder { private Map> data = new HashMap<>(); + public static DataBuilder forStandardTypes() { + // This just ensures that we use realistic-looking data sets in our tests when simulating + // an LD service response, which will always include "flags" and "segments" even if empty. + DataBuilder ret = new DataBuilder(); + ret.add(DataModel.FEATURES); + ret.add(DataModel.SEGMENTS); + return ret; + } + public DataBuilder add(DataKind kind, TestItem... items) { return addAny(kind, items); } @@ -182,5 +193,19 @@ public FullDataSet buildSerialized() { ) ).entrySet()); } + + public LDValue buildJson() { + FullDataSet allData = buildSerialized(); + ObjectBuilder allBuilder = LDValue.buildObject(); + for (Map.Entry> coll: allData.getData()) { + String namespace = coll.getKey().getName().equals("features") ? "flags" : coll.getKey().getName(); + ObjectBuilder itemsBuilder = LDValue.buildObject(); + for (Map.Entry item: coll.getValue().getItems()) { + itemsBuilder.put(item.getKey(), LDValue.parse(item.getValue().getSerializedItem())); + } + allBuilder.put(namespace, itemsBuilder.build()); + } + return allBuilder.build(); + } } } diff --git a/src/test/java/com/launchdarkly/sdk/server/DefaultEventSenderTest.java b/src/test/java/com/launchdarkly/sdk/server/DefaultEventSenderTest.java index b67a3ce40..a24d1d002 100644 --- a/src/test/java/com/launchdarkly/sdk/server/DefaultEventSenderTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/DefaultEventSenderTest.java @@ -4,6 +4,10 @@ import com.launchdarkly.sdk.server.interfaces.EventSender; import com.launchdarkly.sdk.server.interfaces.EventSenderFactory; import com.launchdarkly.sdk.server.interfaces.HttpConfiguration; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; import org.junit.Test; @@ -13,31 +17,22 @@ import java.util.Date; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static com.launchdarkly.sdk.server.TestComponents.clientContext; -import static com.launchdarkly.sdk.server.TestUtil.makeSocketFactorySingleHost; -import static com.launchdarkly.sdk.server.TestHttpUtil.httpsServerWithSelfSignedCert; -import static com.launchdarkly.sdk.server.TestHttpUtil.makeStartedServer; import static com.launchdarkly.sdk.server.interfaces.EventSender.EventDataKind.ANALYTICS; import static com.launchdarkly.sdk.server.interfaces.EventSender.EventDataKind.DIAGNOSTICS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.equalToIgnoringCase; import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import okhttp3.HttpUrl; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; - @SuppressWarnings("javadoc") public class DefaultEventSenderTest { private static final String SDK_KEY = "SDK_KEY"; @@ -56,10 +51,6 @@ private static EventSender makeEventSender(LDConfig config) { ); } - private static URI getBaseUri(MockWebServer server) { - return server.url("/").uri(); - } - @Test public void factoryCreatesDefaultSenderWithDefaultRetryDelay() throws Exception { EventSenderFactory f = new DefaultEventSender.Factory(); @@ -80,35 +71,35 @@ public void constructorUsesDefaultRetryDelayIfNotSpecified() throws Exception { @Test public void analyticsDataIsDelivered() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, getBaseUri(server)); + EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, server.getUri()); assertTrue(result.isSuccess()); assertFalse(result.isMustShutDown()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/bulk", req.getPath()); - assertEquals("application/json; charset=utf-8", req.getHeader("content-type")); - assertEquals(FAKE_DATA, req.getBody().readUtf8()); + assertThat(req.getHeader("content-type"), equalToIgnoringCase("application/json; charset=utf-8")); + assertEquals(FAKE_DATA, req.getBody()); } } @Test public void diagnosticDataIsDelivered() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); assertTrue(result.isSuccess()); assertFalse(result.isMustShutDown()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/diagnostic", req.getPath()); - assertEquals("application/json; charset=utf-8", req.getHeader("content-type")); - assertEquals(FAKE_DATA, req.getBody().readUtf8()); + assertThat(req.getHeader("content-type"), equalToIgnoringCase("application/json; charset=utf-8")); + assertEquals(FAKE_DATA, req.getBody()); } } @@ -116,12 +107,12 @@ public void diagnosticDataIsDelivered() throws Exception { public void defaultHeadersAreSentForAnalytics() throws Exception { HttpConfiguration httpConfig = clientContext(SDK_KEY, LDConfig.DEFAULT).getHttp(); - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - es.sendEventData(ANALYTICS, FAKE_DATA, 1, getBaseUri(server)); + es.sendEventData(ANALYTICS, FAKE_DATA, 1, server.getUri()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); for (Map.Entry kv: httpConfig.getDefaultHeaders()) { assertThat(req.getHeader(kv.getKey()), equalTo(kv.getValue())); } @@ -132,12 +123,12 @@ public void defaultHeadersAreSentForAnalytics() throws Exception { public void defaultHeadersAreSentForDiagnostics() throws Exception { HttpConfiguration httpConfig = clientContext(SDK_KEY, LDConfig.DEFAULT).getHttp(); - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); for (Map.Entry kv: httpConfig.getDefaultHeaders()) { assertThat(req.getHeader(kv.getKey()), equalTo(kv.getValue())); } @@ -146,24 +137,24 @@ public void defaultHeadersAreSentForDiagnostics() throws Exception { @Test public void eventSchemaIsSentForAnalytics() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - es.sendEventData(ANALYTICS, FAKE_DATA, 1, getBaseUri(server)); + es.sendEventData(ANALYTICS, FAKE_DATA, 1, server.getUri()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertThat(req.getHeader("X-LaunchDarkly-Event-Schema"), equalTo("3")); } } @Test public void eventPayloadIdIsSentForAnalytics() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - es.sendEventData(ANALYTICS, FAKE_DATA, 1, getBaseUri(server)); + es.sendEventData(ANALYTICS, FAKE_DATA, 1, server.getUri()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); String payloadHeaderValue = req.getHeader("X-LaunchDarkly-Payload-ID"); assertThat(payloadHeaderValue, notNullValue(String.class)); assertThat(UUID.fromString(payloadHeaderValue), notNullValue(UUID.class)); @@ -172,23 +163,24 @@ public void eventPayloadIdIsSentForAnalytics() throws Exception { @Test public void eventPayloadIdReusedOnRetry() throws Exception { - MockResponse errorResponse = new MockResponse().setResponseCode(429); + Handler errorResponse = Handlers.status(429); + Handler errorThenSuccess = Handlers.sequential(errorResponse, eventsSuccessResponse(), eventsSuccessResponse()); - try (MockWebServer server = makeStartedServer(errorResponse, eventsSuccessResponse(), eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(errorThenSuccess)) { try (EventSender es = makeEventSender()) { - es.sendEventData(ANALYTICS, FAKE_DATA, 1, getBaseUri(server)); - es.sendEventData(ANALYTICS, FAKE_DATA, 1, getBaseUri(server)); + es.sendEventData(ANALYTICS, FAKE_DATA, 1, server.getUri()); + es.sendEventData(ANALYTICS, FAKE_DATA, 1, server.getUri()); } // Failed response request - RecordedRequest req = server.takeRequest(0, TimeUnit.SECONDS); + RequestInfo req = server.getRecorder().requireRequest(); String payloadId = req.getHeader("X-LaunchDarkly-Payload-ID"); // Retry request has same payload ID as failed request - req = server.takeRequest(0, TimeUnit.SECONDS); + req = server.getRecorder().requireRequest(); String retryId = req.getHeader("X-LaunchDarkly-Payload-ID"); assertThat(retryId, equalTo(payloadId)); // Second request has different payload ID from first request - req = server.takeRequest(0, TimeUnit.SECONDS); + req = server.getRecorder().requireRequest(); payloadId = req.getHeader("X-LaunchDarkly-Payload-ID"); assertThat(retryId, not(equalTo(payloadId))); } @@ -196,12 +188,12 @@ public void eventPayloadIdReusedOnRetry() throws Exception { @Test public void eventSchemaNotSetOnDiagnosticEvents() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertNull(req.getHeader("X-LaunchDarkly-Event-Schema")); } } @@ -241,11 +233,11 @@ public void http500ErrorIsRecoverable() throws Exception { @Test public void serverDateIsParsed() throws Exception { long fakeTime = ((new Date().getTime() - 100000) / 1000) * 1000; // don't expect millisecond precision - MockResponse resp = addDateHeader(eventsSuccessResponse(), new Date(fakeTime)); + Handler resp = Handlers.all(eventsSuccessResponse(), addDateHeader(new Date(fakeTime))); - try (MockWebServer server = makeStartedServer(resp)) { + try (HttpServer server = HttpServer.start(resp)) { try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); assertNotNull(result.getTimeFromServer()); assertEquals(fakeTime, result.getTimeFromServer().getTime()); @@ -255,183 +247,153 @@ public void serverDateIsParsed() throws Exception { @Test public void invalidServerDateIsIgnored() throws Exception { - MockResponse resp = eventsSuccessResponse().addHeader("Date", "not a date"); + Handler resp = Handlers.all(eventsSuccessResponse(), Handlers.header("Date", "not a date")); - try (MockWebServer server = makeStartedServer(resp)) { + try (HttpServer server = HttpServer.start(resp)) { try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); assertTrue(result.isSuccess()); assertNull(result.getTimeFromServer()); } } } - - @Test - public void httpClientDoesNotAllowSelfSignedCertByDefault() throws Exception { - try (TestHttpUtil.ServerWithCert serverWithCert = httpsServerWithSelfSignedCert(eventsSuccessResponse())) { - try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, serverWithCert.uri()); - - assertFalse(result.isSuccess()); - assertFalse(result.isMustShutDown()); - } - assertEquals(0, serverWithCert.server.getRequestCount()); - } - } - @Test - public void httpClientCanUseCustomTlsConfig() throws Exception { - try (TestHttpUtil.ServerWithCert serverWithCert = httpsServerWithSelfSignedCert(eventsSuccessResponse())) { - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration() - .sslSocketFactory(serverWithCert.socketFactory, serverWithCert.trustManager) - // allows us to trust the self-signed cert - ) - .build(); - - try (EventSender es = makeEventSender(config)) { - EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, serverWithCert.uri()); + public void testSpecialHttpConfigurations() throws Exception { + Handler handler = eventsSuccessResponse(); + + TestHttpUtil.testWithSpecialHttpConfigurations(handler, + (targetUri, goodHttpConfig) -> { + LDConfig config = new LDConfig.Builder().http(goodHttpConfig).build(); + + try (EventSender es = makeEventSender(config)) { + EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, targetUri); + + assertTrue(result.isSuccess()); + assertFalse(result.isMustShutDown()); + } + }, - assertTrue(result.isSuccess()); - assertFalse(result.isMustShutDown()); - } - - assertEquals(1, serverWithCert.server.getRequestCount()); - } - } - - @Test - public void httpClientCanUseCustomSocketFactory() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().socketFactory(makeSocketFactorySingleHost(serverUrl.host(), serverUrl.port()))) - .build(); - - URI uriWithWrongPort = URI.create("http://localhost:1"); - try (EventSender es = makeEventSender(config)) { - EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, uriWithWrongPort); + (targetUri, badHttpConfig) -> { + LDConfig config = new LDConfig.Builder().http(badHttpConfig).build(); + + try (EventSender es = makeEventSender(config)) { + EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, targetUri); - assertTrue(result.isSuccess()); - assertFalse(result.isMustShutDown()); + assertFalse(result.isSuccess()); + assertFalse(result.isMustShutDown()); + } } - - assertEquals(1, server.getRequestCount()); - } + ); } - + @Test public void baseUriDoesNotNeedTrailingSlash() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - URI uriWithoutSlash = URI.create(server.url("/").toString().replaceAll("/$", "")); + URI uriWithoutSlash = URI.create(server.getUri().toString().replaceAll("/$", "")); EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, uriWithoutSlash); assertTrue(result.isSuccess()); assertFalse(result.isMustShutDown()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/bulk", req.getPath()); - assertEquals("application/json; charset=utf-8", req.getHeader("content-type")); - assertEquals(FAKE_DATA, req.getBody().readUtf8()); + assertThat(req.getHeader("content-type"), equalToIgnoringCase("application/json; charset=utf-8")); + assertEquals(FAKE_DATA, req.getBody()); } } @Test public void baseUriCanHaveContextPath() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - URI baseUri = URI.create(server.url("/context/path").toString()); + URI baseUri = server.getUri().resolve("/context/path"); EventSender.Result result = es.sendEventData(ANALYTICS, FAKE_DATA, 1, baseUri); assertTrue(result.isSuccess()); assertFalse(result.isMustShutDown()); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/context/path/bulk", req.getPath()); - assertEquals("application/json; charset=utf-8", req.getHeader("content-type")); - assertEquals(FAKE_DATA, req.getBody().readUtf8()); + assertThat(req.getHeader("content-type"), equalToIgnoringCase("application/json; charset=utf-8")); + assertEquals(FAKE_DATA, req.getBody()); } } @Test public void nothingIsSentForNullData() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - EventSender.Result result1 = es.sendEventData(ANALYTICS, null, 0, getBaseUri(server)); - EventSender.Result result2 = es.sendEventData(DIAGNOSTICS, null, 0, getBaseUri(server)); + EventSender.Result result1 = es.sendEventData(ANALYTICS, null, 0, server.getUri()); + EventSender.Result result2 = es.sendEventData(DIAGNOSTICS, null, 0, server.getUri()); assertTrue(result1.isSuccess()); assertTrue(result2.isSuccess()); - assertEquals(0, server.getRequestCount()); + assertEquals(0, server.getRecorder().count()); } } } @Test public void nothingIsSentForEmptyData() throws Exception { - try (MockWebServer server = makeStartedServer(eventsSuccessResponse())) { + try (HttpServer server = HttpServer.start(eventsSuccessResponse())) { try (EventSender es = makeEventSender()) { - EventSender.Result result1 = es.sendEventData(ANALYTICS, "", 0, getBaseUri(server)); - EventSender.Result result2 = es.sendEventData(DIAGNOSTICS, "", 0, getBaseUri(server)); + EventSender.Result result1 = es.sendEventData(ANALYTICS, "", 0, server.getUri()); + EventSender.Result result2 = es.sendEventData(DIAGNOSTICS, "", 0, server.getUri()); assertTrue(result1.isSuccess()); assertTrue(result2.isSuccess()); - assertEquals(0, server.getRequestCount()); + assertEquals(0, server.getRecorder().count()); } } } private void testUnrecoverableHttpError(int status) throws Exception { - MockResponse errorResponse = new MockResponse().setResponseCode(status); + Handler errorResponse = Handlers.status(status); - try (MockWebServer server = makeStartedServer(errorResponse)) { + try (HttpServer server = HttpServer.start(errorResponse)) { try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); assertFalse(result.isSuccess()); assertTrue(result.isMustShutDown()); } - RecordedRequest req = server.takeRequest(0, TimeUnit.SECONDS); - assertThat(req, notNullValue(RecordedRequest.class)); // this was the initial request that received the error + server.getRecorder().requireRequest(); - // it does not retry after this type of error, so there are no more requests - assertThat(server.takeRequest(0, TimeUnit.SECONDS), nullValue(RecordedRequest.class)); + // it does not retry after this type of error, so there are no more requests + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); } } private void testRecoverableHttpError(int status) throws Exception { - MockResponse errorResponse = new MockResponse().setResponseCode(status); - + Handler errorResponse = Handlers.status(status); + Handler errorsThenSuccess = Handlers.sequential(errorResponse, errorResponse, eventsSuccessResponse()); // send two errors in a row, because the flush will be retried one time - try (MockWebServer server = makeStartedServer(errorResponse, errorResponse, eventsSuccessResponse())) { + + try (HttpServer server = HttpServer.start(errorsThenSuccess)) { try (EventSender es = makeEventSender()) { - EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, getBaseUri(server)); + EventSender.Result result = es.sendEventData(DIAGNOSTICS, FAKE_DATA, 1, server.getUri()); assertFalse(result.isSuccess()); assertFalse(result.isMustShutDown()); } - RecordedRequest req = server.takeRequest(0, TimeUnit.SECONDS); - assertThat(req, notNullValue(RecordedRequest.class)); - req = server.takeRequest(0, TimeUnit.SECONDS); - assertThat(req, notNullValue(RecordedRequest.class)); - req = server.takeRequest(0, TimeUnit.SECONDS); - assertThat(req, nullValue(RecordedRequest.class)); // only 2 requests total + server.getRecorder().requireRequest(); + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); // only 2 requests total } } - private MockResponse eventsSuccessResponse() { - return new MockResponse().setResponseCode(202); + private Handler eventsSuccessResponse() { + return Handlers.status(202); } - private MockResponse addDateHeader(MockResponse response, Date date) { - return response.addHeader("Date", httpDateFormat.format(date)); + private Handler addDateHeader(Date date) { + return Handlers.header("Date", httpDateFormat.format(date)); } - } diff --git a/src/test/java/com/launchdarkly/sdk/server/DefaultFeatureRequestorTest.java b/src/test/java/com/launchdarkly/sdk/server/DefaultFeatureRequestorTest.java index 4b254e556..2c3f52587 100644 --- a/src/test/java/com/launchdarkly/sdk/server/DefaultFeatureRequestorTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/DefaultFeatureRequestorTest.java @@ -2,20 +2,17 @@ import com.launchdarkly.sdk.server.interfaces.BasicConfiguration; import com.launchdarkly.sdk.server.interfaces.HttpConfiguration; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; import org.junit.Test; import java.net.URI; import java.util.Map; -import javax.net.SocketFactory; -import javax.net.ssl.SSLHandshakeException; - import static com.launchdarkly.sdk.server.TestComponents.clientContext; -import static com.launchdarkly.sdk.server.TestUtil.makeSocketFactorySingleHost; -import static com.launchdarkly.sdk.server.TestHttpUtil.httpsServerWithSelfSignedCert; -import static com.launchdarkly.sdk.server.TestHttpUtil.jsonResponse; -import static com.launchdarkly.sdk.server.TestHttpUtil.makeStartedServer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; @@ -23,11 +20,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; -import okhttp3.HttpUrl; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; - @SuppressWarnings("javadoc") public class DefaultFeatureRequestorTest { private static final String sdkKey = "sdk-key"; @@ -39,15 +31,14 @@ public class DefaultFeatureRequestorTest { private static final String segmentsJson = "{\"" + segment1Key + "\":" + segment1Json + "}"; private static final String allDataJson = "{\"flags\":" + flagsJson + ",\"segments\":" + segmentsJson + "}"; - private DefaultFeatureRequestor makeRequestor(MockWebServer server) { + private DefaultFeatureRequestor makeRequestor(HttpServer server) { return makeRequestor(server, LDConfig.DEFAULT); // We can always use LDConfig.DEFAULT unless we need to modify HTTP properties, since DefaultFeatureRequestor // no longer uses the deprecated LDConfig.baseUri property. } - private DefaultFeatureRequestor makeRequestor(MockWebServer server, LDConfig config) { - URI uri = server.url("/").uri(); - return new DefaultFeatureRequestor(makeHttpConfig(config), uri); + private DefaultFeatureRequestor makeRequestor(HttpServer server, LDConfig config) { + return new DefaultFeatureRequestor(makeHttpConfig(config), server.getUri()); } private HttpConfiguration makeHttpConfig(LDConfig config) { @@ -66,13 +57,13 @@ private void verifyExpectedData(FeatureRequestor.AllData data) { @Test public void requestAllData() throws Exception { - MockResponse resp = jsonResponse(allDataJson); + Handler resp = Handlers.bodyJson(allDataJson); - try (MockWebServer server = makeStartedServer(resp)) { + try (HttpServer server = HttpServer.start(resp)) { try (DefaultFeatureRequestor r = makeRequestor(server)) { FeatureRequestor.AllData data = r.getAllData(true); - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/sdk/latest-all", req.getPath()); verifyHeaders(req); @@ -83,17 +74,20 @@ public void requestAllData() throws Exception { @Test public void responseIsCached() throws Exception { - MockResponse cacheableResp = jsonResponse(allDataJson) - .setHeader("ETag", "aaa") - .setHeader("Cache-Control", "max-age=0"); - MockResponse cachedResp = new MockResponse().setResponseCode(304); + Handler cacheableResp = Handlers.all( + Handlers.header("ETag", "aaa"), + Handlers.header("Cache-Control", "max-age=0"), + Handlers.bodyJson(allDataJson) + ); + Handler cachedResp = Handlers.status(304); + Handler cacheableThenCached = Handlers.sequential(cacheableResp, cachedResp); - try (MockWebServer server = makeStartedServer(cacheableResp, cachedResp)) { + try (HttpServer server = HttpServer.start(cacheableThenCached)) { try (DefaultFeatureRequestor r = makeRequestor(server)) { FeatureRequestor.AllData data1 = r.getAllData(true); verifyExpectedData(data1); - RecordedRequest req1 = server.takeRequest(); + RequestInfo req1 = server.getRecorder().requireRequest(); assertEquals("/sdk/latest-all", req1.getPath()); verifyHeaders(req1); assertNull(req1.getHeader("If-None-Match")); @@ -101,7 +95,7 @@ public void responseIsCached() throws Exception { FeatureRequestor.AllData data2 = r.getAllData(false); assertNull(data2); - RecordedRequest req2 = server.takeRequest(); + RequestInfo req2 = server.getRecorder().requireRequest(); assertEquals("/sdk/latest-all", req2.getPath()); verifyHeaders(req2); assertEquals("aaa", req2.getHeader("If-None-Match")); @@ -111,17 +105,20 @@ public void responseIsCached() throws Exception { @Test public void responseIsCachedButWeWantDataAnyway() throws Exception { - MockResponse cacheableResp = jsonResponse(allDataJson) - .setHeader("ETag", "aaa") - .setHeader("Cache-Control", "max-age=0"); - MockResponse cachedResp = new MockResponse().setResponseCode(304); + Handler cacheableResp = Handlers.all( + Handlers.header("ETag", "aaa"), + Handlers.header("Cache-Control", "max-age=0"), + Handlers.bodyJson(allDataJson) + ); + Handler cachedResp = Handlers.status(304); + Handler cacheableThenCached = Handlers.sequential(cacheableResp, cachedResp); - try (MockWebServer server = makeStartedServer(cacheableResp, cachedResp)) { + try (HttpServer server = HttpServer.start(cacheableThenCached)) { try (DefaultFeatureRequestor r = makeRequestor(server)) { FeatureRequestor.AllData data1 = r.getAllData(true); verifyExpectedData(data1); - RecordedRequest req1 = server.takeRequest(); + RequestInfo req1 = server.getRecorder().requireRequest(); assertEquals("/sdk/latest-all", req1.getPath()); verifyHeaders(req1); assertNull(req1.getHeader("If-None-Match")); @@ -129,94 +126,53 @@ public void responseIsCachedButWeWantDataAnyway() throws Exception { FeatureRequestor.AllData data2 = r.getAllData(true); verifyExpectedData(data2); - RecordedRequest req2 = server.takeRequest(); + RequestInfo req2 = server.getRecorder().requireRequest(); assertEquals("/sdk/latest-all", req2.getPath()); verifyHeaders(req2); assertEquals("aaa", req2.getHeader("If-None-Match")); } } } - - @Test - public void httpClientDoesNotAllowSelfSignedCertByDefault() throws Exception { - MockResponse resp = jsonResponse(allDataJson); - - try (TestHttpUtil.ServerWithCert serverWithCert = httpsServerWithSelfSignedCert(resp)) { - try (DefaultFeatureRequestor r = makeRequestor(serverWithCert.server)) { - try { - r.getAllData(false); - fail("expected exception"); - } catch (SSLHandshakeException e) { - } - - assertEquals(0, serverWithCert.server.getRequestCount()); - } - } - } - - @Test - public void httpClientCanUseCustomTlsConfig() throws Exception { - MockResponse resp = jsonResponse(allDataJson); - - try (TestHttpUtil.ServerWithCert serverWithCert = httpsServerWithSelfSignedCert(resp)) { - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().sslSocketFactory(serverWithCert.socketFactory, serverWithCert.trustManager)) - // allows us to trust the self-signed cert - .build(); - - try (DefaultFeatureRequestor r = makeRequestor(serverWithCert.server, config)) { - FeatureRequestor.AllData data = r.getAllData(false); - verifyExpectedData(data); - } - } - } - - @Test - public void httpClientCanUseCustomSocketFactory() throws Exception { - try (MockWebServer server = makeStartedServer(jsonResponse(allDataJson))) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().socketFactory(makeSocketFactorySingleHost(serverUrl.host(), serverUrl.port()))) - .build(); - URI uriWithWrongPort = URI.create("http://localhost:1"); - try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(config), uriWithWrongPort)) { - FeatureRequestor.AllData data = r.getAllData(false); - verifyExpectedData(data); - - assertEquals(1, server.getRequestCount()); - } - } - } - @Test - public void httpClientCanUseProxyConfig() throws Exception { - URI fakeBaseUri = URI.create("http://not-a-real-host"); - try (MockWebServer server = makeStartedServer(jsonResponse(allDataJson))) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().proxyHostAndPort(serverUrl.host(), serverUrl.port())) - .build(); - - try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(config), fakeBaseUri)) { - FeatureRequestor.AllData data = r.getAllData(false); - verifyExpectedData(data); + public void testSpecialHttpConfigurations() throws Exception { + Handler handler = Handlers.bodyJson(allDataJson); + + TestHttpUtil.testWithSpecialHttpConfigurations(handler, + (targetUri, goodHttpConfig) -> { + LDConfig config = new LDConfig.Builder().http(goodHttpConfig).build(); + try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(config), targetUri)) { + try { + FeatureRequestor.AllData data = r.getAllData(false); + verifyExpectedData(data); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }, - assertEquals(1, server.getRequestCount()); - } - } + (targetUri, badHttpConfig) -> { + LDConfig config = new LDConfig.Builder().http(badHttpConfig).build(); + try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(config), targetUri)) { + try { + r.getAllData(false); + fail("expected exception"); + } catch (Exception e) { + } + } + } + ); } @Test public void baseUriDoesNotNeedTrailingSlash() throws Exception { - MockResponse resp = jsonResponse(allDataJson); + Handler resp = Handlers.bodyJson(allDataJson); - try (MockWebServer server = makeStartedServer(resp)) { - URI uri = server.url("").uri(); - try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(LDConfig.DEFAULT), uri)) { + try (HttpServer server = HttpServer.start(resp)) { + try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(LDConfig.DEFAULT), server.getUri())) { FeatureRequestor.AllData data = r.getAllData(true); - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/sdk/latest-all", req.getPath()); verifyHeaders(req); @@ -227,14 +183,15 @@ public void baseUriDoesNotNeedTrailingSlash() throws Exception { @Test public void baseUriCanHaveContextPath() throws Exception { - MockResponse resp = jsonResponse(allDataJson); + Handler resp = Handlers.bodyJson(allDataJson); - try (MockWebServer server = makeStartedServer(resp)) { - URI uri = server.url("/context/path").uri(); + try (HttpServer server = HttpServer.start(resp)) { + URI uri = server.getUri().resolve("/context/path"); + try (DefaultFeatureRequestor r = new DefaultFeatureRequestor(makeHttpConfig(LDConfig.DEFAULT), uri)) { FeatureRequestor.AllData data = r.getAllData(true); - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/context/path/sdk/latest-all", req.getPath()); verifyHeaders(req); @@ -243,7 +200,7 @@ public void baseUriCanHaveContextPath() throws Exception { } } - private void verifyHeaders(RecordedRequest req) { + private void verifyHeaders(RequestInfo req) { HttpConfiguration httpConfig = clientContext(sdkKey, LDConfig.DEFAULT).getHttp(); for (Map.Entry kv: httpConfig.getDefaultHeaders()) { assertThat(req.getHeader(kv.getKey()), equalTo(kv.getValue())); diff --git a/src/test/java/com/launchdarkly/sdk/server/LDClientEndToEndTest.java b/src/test/java/com/launchdarkly/sdk/server/LDClientEndToEndTest.java index cb1dd8aea..37faffa5d 100644 --- a/src/test/java/com/launchdarkly/sdk/server/LDClientEndToEndTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/LDClientEndToEndTest.java @@ -5,6 +5,11 @@ import com.launchdarkly.sdk.LDUser; import com.launchdarkly.sdk.LDValue; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; +import com.launchdarkly.sdk.server.interfaces.HttpConfigurationFactory; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; import org.junit.Test; @@ -12,28 +17,18 @@ import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.BiFunction; import static com.launchdarkly.sdk.server.Components.externalUpdatesOnly; import static com.launchdarkly.sdk.server.Components.noEvents; import static com.launchdarkly.sdk.server.ModelBuilders.flagBuilder; -import static com.launchdarkly.sdk.server.TestHttpUtil.basePollingConfig; -import static com.launchdarkly.sdk.server.TestHttpUtil.baseStreamingConfig; -import static com.launchdarkly.sdk.server.TestHttpUtil.httpsServerWithSelfSignedCert; -import static com.launchdarkly.sdk.server.TestHttpUtil.jsonResponse; -import static com.launchdarkly.sdk.server.TestHttpUtil.makeStartedServer; +import static com.launchdarkly.testhelpers.httptest.Handlers.bodyJson; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import okhttp3.HttpUrl; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; - @SuppressWarnings("javadoc") public class LDClientEndToEndTest { private static final Gson gson = new Gson(); @@ -44,13 +39,30 @@ public class LDClientEndToEndTest { .build(); private static final LDUser user = new LDUser("user-key"); + private static Handler makePollingSuccessResponse() { + return bodyJson(makeAllDataJson()); + } + + private static Handler makeStreamingSuccessResponse() { + String streamData = "event: put\n" + + "data: {\"data\":" + makeAllDataJson() + "}"; + return Handlers.all(Handlers.SSE.start(), + Handlers.SSE.event(streamData), Handlers.SSE.leaveOpen()); + } + + private static Handler makeInvalidSdkKeyResponse() { + return Handlers.status(401); + } + + private static Handler makeServiceUnavailableResponse() { + return Handlers.status(503); + } + @Test public void clientStartsInPollingMode() throws Exception { - MockResponse resp = jsonResponse(makeAllDataJson()); - - try (MockWebServer server = makeStartedServer(resp)) { + try (HttpServer server = HttpServer.start(makePollingSuccessResponse())) { LDConfig config = new LDConfig.Builder() - .dataSource(basePollingConfig(server)) + .dataSource(Components.pollingDataSource().baseURI(server.getUri())) .events(noEvents()) .build(); @@ -62,50 +74,65 @@ public void clientStartsInPollingMode() throws Exception { } @Test - public void clientFailsInPollingModeWith401Error() throws Exception { - MockResponse resp = new MockResponse().setResponseCode(401); - - try (MockWebServer server = makeStartedServer(resp)) { + public void clientStartsInPollingModeAfterRecoverableError() throws Exception { + Handler errorThenSuccess = Handlers.sequential( + makeServiceUnavailableResponse(), + makePollingSuccessResponse() + ); + + try (HttpServer server = HttpServer.start(errorThenSuccess)) { LDConfig config = new LDConfig.Builder() - .dataSource(basePollingConfig(server)) + .dataSource(Components.pollingDataSourceInternal() + .pollIntervalWithNoMinimum(Duration.ofMillis(5)) // use small interval because we expect it to retry + .baseURI(server.getUri())) .events(noEvents()) .build(); try (LDClient client = new LDClient(sdkKey, config)) { - assertFalse(client.isInitialized()); - assertFalse(client.boolVariation(flagKey, user, false)); + assertTrue(client.isInitialized()); + assertTrue(client.boolVariation(flagKey, user, false)); } } } @Test - public void clientStartsInPollingModeWithSelfSignedCert() throws Exception { - MockResponse resp = jsonResponse(makeAllDataJson()); - - try (TestHttpUtil.ServerWithCert serverWithCert = httpsServerWithSelfSignedCert(resp)) { + public void clientFailsInPollingModeWith401Error() throws Exception { + try (HttpServer server = HttpServer.start(makeInvalidSdkKeyResponse())) { LDConfig config = new LDConfig.Builder() - .dataSource(basePollingConfig(serverWithCert.server)) + .dataSource(Components.pollingDataSourceInternal() + .pollIntervalWithNoMinimum(Duration.ofMillis(5)) // use small interval so we'll know if it does not stop permanently + .baseURI(server.getUri())) .events(noEvents()) - .http(Components.httpConfiguration().sslSocketFactory(serverWithCert.socketFactory, serverWithCert.trustManager)) - // allows us to trust the self-signed cert .build(); try (LDClient client = new LDClient(sdkKey, config)) { - assertTrue(client.isInitialized()); - assertTrue(client.boolVariation(flagKey, user, false)); + assertFalse(client.isInitialized()); + assertFalse(client.boolVariation(flagKey, user, false)); + + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); } } } @Test - public void clientStartsInStreamingMode() throws Exception { - String streamData = "event: put\n" + - "data: {\"data\":" + makeAllDataJson() + "}\n\n"; - MockResponse resp = TestHttpUtil.eventStreamResponse(streamData); - - try (MockWebServer server = makeStartedServer(resp)) { + public void testPollingModeSpecialHttpConfigurations() throws Exception { + testWithSpecialHttpConfigurations( + makePollingSuccessResponse(), + (serverUri, httpConfig) -> + new LDConfig.Builder() + .dataSource(Components.pollingDataSource().baseURI(serverUri)) + .events(noEvents()) + .http(httpConfig) + .startWait(Duration.ofMillis(100)) + .build()); + } + + @Test + public void clientStartsInStreamingMode() throws Exception { + try (HttpServer server = HttpServer.start(makeStreamingSuccessResponse())) { LDConfig config = new LDConfig.Builder() - .dataSource(baseStreamingConfig(server)) + .dataSource(Components.streamingDataSource().baseURI(server.getUri())) .events(noEvents()) .build(); @@ -118,32 +145,35 @@ public void clientStartsInStreamingMode() throws Exception { @Test public void clientStartsInStreamingModeAfterRecoverableError() throws Exception { - MockResponse errorResp = new MockResponse().setResponseCode(503); - - String streamData = "event: put\n" + - "data: {\"data\":" + makeAllDataJson() + "}\n\n"; - MockResponse streamResp = TestHttpUtil.eventStreamResponse(streamData); + Handler errorThenStream = Handlers.sequential( + makeServiceUnavailableResponse(), + makeStreamingSuccessResponse() + ); - try (MockWebServer server = makeStartedServer(errorResp, streamResp)) { + try (HttpServer server = HttpServer.start(errorThenStream)) { LDConfig config = new LDConfig.Builder() - .dataSource(baseStreamingConfig(server)) + .dataSource(Components.streamingDataSource().baseURI(server.getUri()).initialReconnectDelay(Duration.ZERO)) + // use zero reconnect delay so we'll know if it does not stop permanently .events(noEvents()) .build(); try (LDClient client = new LDClient(sdkKey, config)) { assertTrue(client.isInitialized()); assertTrue(client.boolVariation(flagKey, user, false)); + + server.getRecorder().requireRequest(); + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); } } } @Test public void clientFailsInStreamingModeWith401Error() throws Exception { - MockResponse resp = new MockResponse().setResponseCode(401); - - try (MockWebServer server = makeStartedServer(resp, resp, resp)) { + try (HttpServer server = HttpServer.start(makeInvalidSdkKeyResponse())) { LDConfig config = new LDConfig.Builder() - .dataSource(baseStreamingConfig(server).initialReconnectDelay(Duration.ZERO)) + .dataSource(Components.streamingDataSource().baseURI(server.getUri()).initialReconnectDelay(Duration.ZERO)) + // use zero reconnect delay so we'll know if it does not stop permanently .events(noEvents()) .build(); @@ -163,94 +193,34 @@ public void clientFailsInStreamingModeWith401Error() throws Exception { assertThat(statuses.take().getState(), equalTo(DataSourceStatusProvider.State.OFF)); } assertThat(statuses.isEmpty(), equalTo(true)); - assertThat(server.getRequestCount(), equalTo(1)); // no retries - } - } - } - - @Test - public void clientStartsInStreamingModeWithSelfSignedCert() throws Exception { - String streamData = "event: put\n" + - "data: {\"data\":" + makeAllDataJson() + "}\n\n"; - MockResponse resp = TestHttpUtil.eventStreamResponse(streamData); - - try (TestHttpUtil.ServerWithCert serverWithCert = httpsServerWithSelfSignedCert(resp)) { - LDConfig config = new LDConfig.Builder() - .dataSource(baseStreamingConfig(serverWithCert.server)) - .events(noEvents()) - .http(Components.httpConfiguration().sslSocketFactory(serverWithCert.socketFactory, serverWithCert.trustManager)) - // allows us to trust the self-signed cert - .build(); - - try (LDClient client = new LDClient(sdkKey, config)) { - assertTrue(client.isInitialized()); - assertTrue(client.boolVariation(flagKey, user, false)); - } - } - } - - @Test - public void clientUsesProxy() throws Exception { - URI fakeBaseUri = URI.create("http://not-a-real-host"); - MockResponse resp = jsonResponse(makeAllDataJson()); - - try (MockWebServer server = makeStartedServer(resp)) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration() - .proxyHostAndPort(serverUrl.host(), serverUrl.port())) - .dataSource(Components.pollingDataSource().baseURI(fakeBaseUri)) - .events(Components.noEvents()) - .build(); - - try (LDClient client = new LDClient(sdkKey, config)) { - assertTrue(client.isInitialized()); - RecordedRequest req = server.takeRequest(); - assertThat(req.getRequestLine(), startsWith("GET " + fakeBaseUri + "/sdk/latest-all")); - assertThat(req.getHeader("Proxy-Authorization"), nullValue()); + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); } } } @Test - public void clientUsesProxyWithBasicAuth() throws Exception { - URI fakeBaseUri = URI.create("http://not-a-real-host"); - MockResponse challengeResp = new MockResponse().setResponseCode(407).setHeader("Proxy-Authenticate", "Basic realm=x"); - MockResponse resp = jsonResponse(makeAllDataJson()); - - try (MockWebServer server = makeStartedServer(challengeResp, resp)) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration() - .proxyHostAndPort(serverUrl.host(), serverUrl.port()) - .proxyAuth(Components.httpBasicAuthentication("user", "pass"))) - .dataSource(Components.pollingDataSource().baseURI(fakeBaseUri)) - .events(Components.noEvents()) - .build(); - - try (LDClient client = new LDClient(sdkKey, config)) { - assertTrue(client.isInitialized()); - - RecordedRequest req1 = server.takeRequest(); - assertThat(req1.getRequestLine(), startsWith("GET " + fakeBaseUri + "/sdk/latest-all")); - assertThat(req1.getHeader("Proxy-Authorization"), nullValue()); - - RecordedRequest req2 = server.takeRequest(); - assertThat(req2.getRequestLine(), equalTo(req1.getRequestLine())); - assertThat(req2.getHeader("Proxy-Authorization"), equalTo("Basic dXNlcjpwYXNz")); - } - } + public void testStreamingModeSpecialHttpConfigurations() throws Exception { + testWithSpecialHttpConfigurations( + makeStreamingSuccessResponse(), + (serverUri, httpConfig) -> + new LDConfig.Builder() + .dataSource(Components.streamingDataSource().baseURI(serverUri)) + .events(noEvents()) + .http(httpConfig) + .startWait(Duration.ofMillis(100)) + .build()); } @Test public void clientSendsAnalyticsEvent() throws Exception { - MockResponse resp = new MockResponse().setResponseCode(202); + Handler resp = Handlers.status(202); - try (MockWebServer server = makeStartedServer(resp)) { + try (HttpServer server = HttpServer.start(resp)) { LDConfig config = new LDConfig.Builder() .dataSource(externalUpdatesOnly()) - .events(Components.sendEvents().baseURI(server.url("/").uri())) + .events(Components.sendEvents().baseURI(server.getUri())) .diagnosticOptOut(true) .build(); @@ -259,31 +229,50 @@ public void clientSendsAnalyticsEvent() throws Exception { client.identify(new LDUser("userkey")); } - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/bulk", req.getPath()); } } @Test public void clientSendsDiagnosticEvent() throws Exception { - MockResponse resp = new MockResponse().setResponseCode(202); + Handler resp = Handlers.status(202); - try (MockWebServer server = makeStartedServer(resp)) { + try (HttpServer server = HttpServer.start(resp)) { LDConfig config = new LDConfig.Builder() .dataSource(externalUpdatesOnly()) - .events(Components.sendEvents().baseURI(server.url("/").uri())) + .events(Components.sendEvents().baseURI(server.getUri())) .build(); try (LDClient client = new LDClient(sdkKey, config)) { assertTrue(client.isInitialized()); - RecordedRequest req = server.takeRequest(); + RequestInfo req = server.getRecorder().requireRequest(); assertEquals("/diagnostic", req.getPath()); } } } - public String makeAllDataJson() { + private static void testWithSpecialHttpConfigurations(Handler handler, + BiFunction makeConfig) throws Exception { + TestHttpUtil.testWithSpecialHttpConfigurations(handler, + (serverUri, httpConfig) -> { + LDConfig config = makeConfig.apply(serverUri, httpConfig); + try (LDClient client = new LDClient(sdkKey, config)) { + assertTrue(client.isInitialized()); + assertTrue(client.boolVariation(flagKey, user, false)); + } + }, + (serverUri, httpConfig) -> { + LDConfig config = makeConfig.apply(serverUri, httpConfig); + try (LDClient client = new LDClient(sdkKey, config)) { + assertFalse(client.isInitialized()); + } + } + ); + } + + private static String makeAllDataJson() { JsonObject flagsData = new JsonObject(); flagsData.add(flagKey, gson.toJsonTree(flag)); JsonObject allData = new JsonObject(); diff --git a/src/test/java/com/launchdarkly/sdk/server/PollingProcessorTest.java b/src/test/java/com/launchdarkly/sdk/server/PollingProcessorTest.java index 584f6bef4..cf8bba201 100644 --- a/src/test/java/com/launchdarkly/sdk/server/PollingProcessorTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/PollingProcessorTest.java @@ -1,6 +1,7 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.sdk.server.DataModel.FeatureFlag; +import com.launchdarkly.sdk.server.DataStoreTestTypes.DataBuilder; import com.launchdarkly.sdk.server.TestComponents.MockDataSourceUpdates; import com.launchdarkly.sdk.server.TestComponents.MockDataStoreStatusProvider; import com.launchdarkly.sdk.server.TestUtil.ActionCanThrowAnyException; @@ -12,42 +13,36 @@ import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status; import com.launchdarkly.sdk.server.interfaces.DataStore; import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; -import com.launchdarkly.sdk.server.interfaces.SerializationException; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestContext; -import org.hamcrest.MatcherAssert; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.net.URI; import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static com.launchdarkly.sdk.server.TestComponents.clientContext; import static com.launchdarkly.sdk.server.TestComponents.dataStoreThatThrowsException; +import static com.launchdarkly.sdk.server.TestComponents.defaultHttpConfiguration; import static com.launchdarkly.sdk.server.TestComponents.sharedExecutor; -import static com.launchdarkly.sdk.server.TestUtil.awaitValue; -import static com.launchdarkly.sdk.server.TestUtil.expectNoMoreValues; +import static com.launchdarkly.sdk.server.TestUtil.assertDataSetEquals; import static com.launchdarkly.sdk.server.TestUtil.requireDataSourceStatus; import static com.launchdarkly.sdk.server.TestUtil.requireDataSourceStatusEventually; import static com.launchdarkly.sdk.server.TestUtil.shouldNotTimeOut; -import static com.launchdarkly.sdk.server.TestUtil.shouldTimeOut; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; @SuppressWarnings("javadoc") public class PollingProcessorTest { @@ -56,23 +51,45 @@ public class PollingProcessorTest { private static final Duration BRIEF_INTERVAL = Duration.ofMillis(20); private MockDataSourceUpdates dataSourceUpdates; - private MockFeatureRequestor requestor; @Before public void setup() { DataStore store = new InMemoryDataStore(); dataSourceUpdates = TestComponents.dataSourceUpdates(store, new MockDataStoreStatusProvider()); - requestor = new MockFeatureRequestor(); } - private PollingProcessor makeProcessor() { - return makeProcessor(LENGTHY_INTERVAL); - } - - private PollingProcessor makeProcessor(Duration pollInterval) { + private PollingProcessor makeProcessor(URI baseUri, Duration pollInterval) { + FeatureRequestor requestor = new DefaultFeatureRequestor(defaultHttpConfiguration(), baseUri); return new PollingProcessor(requestor, dataSourceUpdates, sharedExecutor, pollInterval); } + private static class TestPollHandler implements Handler { + private final String data; + private volatile int errorStatus; + + public TestPollHandler() { + this(DataBuilder.forStandardTypes()); + } + + public TestPollHandler(DataBuilder data) { + this.data = data.buildJson().toJsonString(); + } + + @Override + public void apply(RequestContext context) { + int err = errorStatus; + if (err == 0) { + Handlers.bodyJson(data).apply(context); + } else { + context.setStatus(err); + } + } + + public void setError(int status) { + this.errorStatus = status; + } + } + @Test public void builderHasDefaultConfiguration() throws Exception { DataSourceFactory f = Components.pollingDataSource(); @@ -98,59 +115,60 @@ public void builderCanSpecifyConfiguration() throws Exception { public void successfulPolls() throws Exception { FeatureFlag flagv1 = ModelBuilders.flagBuilder("flag").version(1).build(); FeatureFlag flagv2 = ModelBuilders.flagBuilder(flagv1.getKey()).version(2).build(); - FeatureRequestor.AllData datav1 = new FeatureRequestor.AllData(Collections.singletonMap(flagv1.getKey(), flagv1), - Collections.emptyMap()); - FeatureRequestor.AllData datav2 = new FeatureRequestor.AllData(Collections.singletonMap(flagv1.getKey(), flagv2), - Collections.emptyMap()); + DataBuilder datav1 = DataBuilder.forStandardTypes().addAny(DataModel.FEATURES, flagv1); + DataBuilder datav2 = DataBuilder.forStandardTypes().addAny(DataModel.FEATURES, flagv2); - requestor.gate = new Semaphore(0); - requestor.allData = datav1; - BlockingQueue statuses = new LinkedBlockingQueue<>(); dataSourceUpdates.statusBroadcaster.register(statuses::add); - try (PollingProcessor pollingProcessor = makeProcessor(Duration.ofMillis(100))) { - Future initFuture = pollingProcessor.start(); - - // allow first poll to complete - requestor.gate.release(); - - initFuture.get(1000, TimeUnit.MILLISECONDS); - - assertTrue(pollingProcessor.isInitialized()); - assertEquals(datav1.toFullDataSet(), dataSourceUpdates.awaitInit()); - - // allow second poll to complete - should return new data - requestor.allData = datav2; - requestor.gate.release(); - - requireDataSourceStatus(statuses, State.VALID); + Semaphore allowSecondPollToProceed = new Semaphore(0); + + Handler pollingHandler = Handlers.sequential( + new TestPollHandler(datav1), + Handlers.all( + Handlers.waitFor(allowSecondPollToProceed), + new TestPollHandler(datav2) + ), + Handlers.hang() // we don't want any more polls to complete after the second one + ); + + try (HttpServer server = HttpServer.start(pollingHandler)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), Duration.ofMillis(100))) { + Future initFuture = pollingProcessor.start(); + shouldNotTimeOut(initFuture, Duration.ofSeconds(1)); + + assertTrue(pollingProcessor.isInitialized()); + assertDataSetEquals(datav1.build(), dataSourceUpdates.awaitInit()); - assertEquals(datav2.toFullDataSet(), dataSourceUpdates.awaitInit()); + allowSecondPollToProceed.release(); + + assertDataSetEquals(datav2.build(), dataSourceUpdates.awaitInit()); + } } } @Test - public void testConnectionProblem() throws Exception { - requestor.ioException = new IOException("This exception is part of a test and yes you should be seeing it."); - + public void testTimeoutFromConnectionProblem() throws Exception { BlockingQueue statuses = new LinkedBlockingQueue<>(); dataSourceUpdates.statusBroadcaster.register(statuses::add); - try (PollingProcessor pollingProcessor = makeProcessor()) { - Future initFuture = pollingProcessor.start(); - try { - initFuture.get(200L, TimeUnit.MILLISECONDS); - fail("Expected Timeout, instead initFuture.get() returned."); - } catch (TimeoutException ignored) { + Handler errorThenSuccess = Handlers.sequential( + Handlers.malformedResponse(), // this will cause an IOException + new TestPollHandler() // it should time out before reaching this + ); + + try (HttpServer server = HttpServer.start(errorThenSuccess)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), LENGTHY_INTERVAL)) { + Future initFuture = pollingProcessor.start(); + TestUtil.shouldTimeOut(initFuture, Duration.ofMillis(200)); + assertFalse(initFuture.isDone()); + assertFalse(pollingProcessor.isInitialized()); + assertEquals(0, dataSourceUpdates.receivedInits.size()); + + Status status = requireDataSourceStatus(statuses, State.INITIALIZING); + assertNotNull(status.getLastError()); + assertEquals(ErrorKind.NETWORK_ERROR, status.getLastError().getKind()); } - assertFalse(initFuture.isDone()); - assertFalse(pollingProcessor.isInitialized()); - assertEquals(0, dataSourceUpdates.receivedInits.size()); - - Status status = requireDataSourceStatus(statuses, State.INITIALIZING); - assertNotNull(status.getLastError()); - assertEquals(ErrorKind.NETWORK_ERROR, status.getLastError().getKind()); } } @@ -160,76 +178,56 @@ public void testDataStoreFailure() throws Exception { DataStoreStatusProvider badStoreStatusProvider = new MockDataStoreStatusProvider(false); dataSourceUpdates = TestComponents.dataSourceUpdates(badStore, badStoreStatusProvider); - requestor.allData = new FeatureRequestor.AllData(new HashMap<>(), new HashMap<>()); - BlockingQueue statuses = new LinkedBlockingQueue<>(); dataSourceUpdates.statusBroadcaster.register(statuses::add); - try (PollingProcessor pollingProcessor = makeProcessor()) { - pollingProcessor.start(); - - assertEquals(requestor.allData.toFullDataSet(), dataSourceUpdates.awaitInit()); + try (HttpServer server = HttpServer.start(new TestPollHandler())) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), LENGTHY_INTERVAL)) { + pollingProcessor.start(); + + assertDataSetEquals(DataBuilder.forStandardTypes().build(), dataSourceUpdates.awaitInit()); - assertFalse(pollingProcessor.isInitialized()); - - Status status = requireDataSourceStatus(statuses, State.INITIALIZING); - assertNotNull(status.getLastError()); - assertEquals(ErrorKind.STORE_ERROR, status.getLastError().getKind()); + assertFalse(pollingProcessor.isInitialized()); + + Status status = requireDataSourceStatus(statuses, State.INITIALIZING); + assertNotNull(status.getLastError()); + assertEquals(ErrorKind.STORE_ERROR, status.getLastError().getKind()); + } } } @Test public void testMalformedData() throws Exception { - requestor.runtimeException = new SerializationException(new Exception("the JSON was displeasing")); + Handler badDataHandler = Handlers.bodyJson("{bad"); BlockingQueue statuses = new LinkedBlockingQueue<>(); dataSourceUpdates.statusBroadcaster.register(statuses::add); - try (PollingProcessor pollingProcessor = makeProcessor()) { - pollingProcessor.start(); - - Status status = requireDataSourceStatus(statuses, State.INITIALIZING); - assertNotNull(status.getLastError()); - assertEquals(ErrorKind.INVALID_DATA, status.getLastError().getKind()); - assertEquals(requestor.runtimeException.toString(), status.getLastError().getMessage()); - - assertFalse(pollingProcessor.isInitialized()); + try (HttpServer server = HttpServer.start(badDataHandler)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), LENGTHY_INTERVAL)) { + pollingProcessor.start(); + + Status status = requireDataSourceStatus(statuses, State.INITIALIZING); + assertNotNull(status.getLastError()); + assertEquals(ErrorKind.INVALID_DATA, status.getLastError().getKind()); + + assertFalse(pollingProcessor.isInitialized()); + } } } - @Test - public void testUnknownException() throws Exception { - requestor.runtimeException = new RuntimeException("everything is displeasing"); - - BlockingQueue statuses = new LinkedBlockingQueue<>(); - dataSourceUpdates.statusBroadcaster.register(statuses::add); - - try (PollingProcessor pollingProcessor = makeProcessor()) { - pollingProcessor.start(); - - Status status = requireDataSourceStatus(statuses, State.INITIALIZING); - assertNotNull(status.getLastError()); - assertEquals(ErrorKind.UNKNOWN, status.getLastError().getKind()); - assertEquals(requestor.runtimeException.toString(), status.getLastError().getMessage()); - - assertFalse(pollingProcessor.isInitialized()); - } - } - @Test public void startingWhenAlreadyStartedDoesNothing() throws Exception { - requestor.allData = new FeatureRequestor.AllData(new HashMap<>(), new HashMap<>()); - - try (PollingProcessor pollingProcessor = makeProcessor(Duration.ofMillis(500))) { - Future initFuture1 = pollingProcessor.start(); - - awaitValue(requestor.queries, Duration.ofMillis(100)); // a poll request was made - - Future initFuture2 = pollingProcessor.start(); - assertSame(initFuture1, initFuture2); - - - expectNoMoreValues(requestor.queries, Duration.ofMillis(100)); // we did NOT start another polling task + try (HttpServer server = HttpServer.start(new TestPollHandler())) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), LENGTHY_INTERVAL)) { + Future initFuture1 = pollingProcessor.start(); + shouldNotTimeOut(initFuture1, Duration.ofSeconds(1)); + server.getRecorder().requireRequest(); + + Future initFuture2 = pollingProcessor.start(); + assertSame(initFuture1, initFuture2); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); + } } } @@ -264,43 +262,51 @@ public void http500ErrorIsRecoverable() throws Exception { } private void testUnrecoverableHttpError(int statusCode) throws Exception { - HttpErrorException httpError = new HttpErrorException(statusCode); + TestPollHandler handler = new TestPollHandler(); // Test a scenario where the very first request gets this error + handler.setError(statusCode); withStatusQueue(statuses -> { - requestor.httpException = httpError; - - try (PollingProcessor pollingProcessor = makeProcessor()) { - long startTime = System.currentTimeMillis(); - Future initFuture = pollingProcessor.start(); - - shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); - assertTrue((System.currentTimeMillis() - startTime) < 9000); - assertTrue(initFuture.isDone()); - assertFalse(pollingProcessor.isInitialized()); - - verifyHttpErrorCausedShutdown(statuses, statusCode); + try (HttpServer server = HttpServer.start(handler)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), BRIEF_INTERVAL)) { + long startTime = System.currentTimeMillis(); + Future initFuture = pollingProcessor.start(); + + shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); + assertTrue((System.currentTimeMillis() - startTime) < 9000); + assertTrue(initFuture.isDone()); + assertFalse(pollingProcessor.isInitialized()); + + verifyHttpErrorCausedShutdown(statuses, statusCode); + + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); + } } }); - // Now test a scenario where we have a successful startup, but the next poll gets the error + // Now test a scenario where we have a successful startup, but a subsequent poll gets the error + handler.setError(0); dataSourceUpdates = TestComponents.dataSourceUpdates(new InMemoryDataStore(), new MockDataStoreStatusProvider()); withStatusQueue(statuses -> { - requestor = new MockFeatureRequestor(); - requestor.allData = new FeatureRequestor.AllData(new HashMap<>(), new HashMap<>()); - - try (PollingProcessor pollingProcessor = makeProcessor(BRIEF_INTERVAL)) { - Future initFuture = pollingProcessor.start(); - - shouldNotTimeOut(initFuture, Duration.ofSeconds(20000)); - assertTrue(initFuture.isDone()); - assertTrue(pollingProcessor.isInitialized()); - requireDataSourceStatusEventually(statuses, State.VALID, State.INITIALIZING); - - // cause the next poll to get an error - requestor.httpException = httpError; - - verifyHttpErrorCausedShutdown(statuses, statusCode); + try (HttpServer server = HttpServer.start(handler)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), BRIEF_INTERVAL)) { + Future initFuture = pollingProcessor.start(); + + shouldNotTimeOut(initFuture, Duration.ofSeconds(20000)); + assertTrue(initFuture.isDone()); + assertTrue(pollingProcessor.isInitialized()); + requireDataSourceStatus(statuses, State.VALID); + + // now make it so polls fail + handler.setError(statusCode); + + verifyHttpErrorCausedShutdown(statuses, statusCode); + while (server.getRecorder().count() > 0) { + server.getRecorder().requireRequest(); + } + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); + } } }); } @@ -313,87 +319,65 @@ private void verifyHttpErrorCausedShutdown(BlockingQueue { - requestor.httpException = httpError; - - try (PollingProcessor pollingProcessor = makeProcessor(BRIEF_INTERVAL)) { - Future initFuture = pollingProcessor.start(); - - // first poll gets an error - shouldTimeOut(initFuture, Duration.ofMillis(200)); - assertFalse(initFuture.isDone()); - assertFalse(pollingProcessor.isInitialized()); - - Status status0 = requireDataSourceStatus(statuses, State.INITIALIZING); - assertNotNull(status0.getLastError()); - assertEquals(ErrorKind.ERROR_RESPONSE, status0.getLastError().getKind()); - assertEquals(statusCode, status0.getLastError().getStatusCode()); - - verifyHttpErrorWasRecoverable(statuses, statusCode, false); - - shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); - assertTrue(initFuture.isDone()); - assertTrue(pollingProcessor.isInitialized()); + try (HttpServer server = HttpServer.start(handler)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), BRIEF_INTERVAL)) { + Future initFuture = pollingProcessor.start(); + + // make sure it's done a couple of polls (which will have failed) + server.getRecorder().requireRequest(); + server.getRecorder().requireRequest(); + + // now make it so polls will succeed + handler.setError(0); + + shouldNotTimeOut(initFuture, Duration.ofSeconds(1)); + + // verify that it got the error + Status status0 = requireDataSourceStatus(statuses, State.INITIALIZING); + assertNotNull(status0.getLastError()); + assertEquals(ErrorKind.ERROR_RESPONSE, status0.getLastError().getKind()); + assertEquals(statusCode, status0.getLastError().getStatusCode()); + + // and then that it succeeded + requireDataSourceStatusEventually(statuses, State.VALID, State.INITIALIZING); + } } }); - // Now test a scenario where we have a successful startup, but the next poll gets the error + // Now test a scenario where we have a successful startup, but then it gets the error. + // The result is a bit different because it will report an INTERRUPTED state. + handler.setError(0); dataSourceUpdates = TestComponents.dataSourceUpdates(new InMemoryDataStore(), new MockDataStoreStatusProvider()); withStatusQueue(statuses -> { - requestor = new MockFeatureRequestor(); - requestor.allData = new FeatureRequestor.AllData(new HashMap<>(), new HashMap<>()); - - try (PollingProcessor pollingProcessor = makeProcessor(BRIEF_INTERVAL)) { - Future initFuture = pollingProcessor.start(); - - shouldNotTimeOut(initFuture, Duration.ofSeconds(20000)); - assertTrue(initFuture.isDone()); - assertTrue(pollingProcessor.isInitialized()); - requireDataSourceStatusEventually(statuses, State.VALID, State.INITIALIZING); - - // cause the next poll to get an error - requestor.httpException = httpError; - - Status status0 = requireDataSourceStatusEventually(statuses, State.INTERRUPTED, State.VALID); - assertEquals(ErrorKind.ERROR_RESPONSE, status0.getLastError().getKind()); - assertEquals(statusCode, status0.getLastError().getStatusCode()); - - verifyHttpErrorWasRecoverable(statuses, statusCode, true); + try (HttpServer server = HttpServer.start(handler)) { + try (PollingProcessor pollingProcessor = makeProcessor(server.getUri(), BRIEF_INTERVAL)) { + Future initFuture = pollingProcessor.start(); + shouldNotTimeOut(initFuture, Duration.ofSeconds(1)); + assertTrue(pollingProcessor.isInitialized()); + + // first poll succeeded + requireDataSourceStatus(statuses, State.VALID); + + // now make it so polls will fail + handler.setError(statusCode); + + Status status1 = requireDataSourceStatus(statuses, State.INTERRUPTED); + assertEquals(ErrorKind.ERROR_RESPONSE, status1.getLastError().getKind()); + assertEquals(statusCode, status1.getLastError().getStatusCode()); + + // and then succeed again + handler.setError(0); + requireDataSourceStatusEventually(statuses, State.VALID, State.INTERRUPTED); + } } }); } - private void verifyHttpErrorWasRecoverable( - BlockingQueue statuses, - int statusCode, - boolean didAlreadyConnect - ) throws Exception { - long startTime = System.currentTimeMillis(); - - // first make it so the requestor will succeed after the previous error - requestor.allData = new FeatureRequestor.AllData(new HashMap<>(), new HashMap<>()); - requestor.httpException = null; - - // status should now be VALID (although there might have been more failed polls before that) - Status status1 = requireDataSourceStatusEventually(statuses, State.VALID, - didAlreadyConnect ? State.INTERRUPTED : State.INITIALIZING); - assertNotNull(status1.getLastError()); - assertEquals(ErrorKind.ERROR_RESPONSE, status1.getLastError().getKind()); - assertEquals(statusCode, status1.getLastError().getStatusCode()); - - // simulate another error of the same kind - the state will be INTERRUPTED - requestor.httpException = new HttpErrorException(statusCode); - - Status status2 = requireDataSourceStatusEventually(statuses, State.INTERRUPTED, State.VALID); - assertNotNull(status2.getLastError()); - assertEquals(ErrorKind.ERROR_RESPONSE, status2.getLastError().getKind()); - assertEquals(statusCode, status2.getLastError().getStatusCode()); - MatcherAssert.assertThat(status2.getLastError().getTime().toEpochMilli(), greaterThanOrEqualTo(startTime)); - } - private void withStatusQueue(ActionCanThrowAnyException> action) throws Exception { BlockingQueue statuses = new LinkedBlockingQueue<>(); DataSourceStatusProvider.StatusListener addStatus = statuses::add; @@ -404,34 +388,4 @@ private void withStatusQueue(ActionCanThrowAnyException queries = new LinkedBlockingQueue<>(); - - public void close() throws IOException {} - - public AllData getAllData(boolean returnDataEvenIfCached) throws IOException, HttpErrorException { - queries.add(true); - if (gate != null) { - try { - gate.acquire(); - } catch (InterruptedException e) {} - } - if (httpException != null) { - throw httpException; - } - if (ioException != null) { - throw ioException; - } - if (runtimeException != null) { - throw runtimeException; - } - return allData; - } - } } diff --git a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java index 8a66319a5..fa096b1c7 100644 --- a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java @@ -1,103 +1,136 @@ package com.launchdarkly.sdk.server; import com.launchdarkly.eventsource.ConnectionErrorHandler; -import com.launchdarkly.eventsource.EventHandler; -import com.launchdarkly.eventsource.EventSource; import com.launchdarkly.eventsource.MessageEvent; -import com.launchdarkly.eventsource.UnsuccessfulResponseException; +import com.launchdarkly.sdk.server.DataModel.FeatureFlag; +import com.launchdarkly.sdk.server.DataModel.Segment; import com.launchdarkly.sdk.server.DataModel.VersionedData; -import com.launchdarkly.sdk.server.StreamProcessor.EventSourceParams; +import com.launchdarkly.sdk.server.DataStoreTestTypes.DataBuilder; +import com.launchdarkly.sdk.server.TestComponents.DelegatingDataStore; import com.launchdarkly.sdk.server.TestComponents.MockDataSourceUpdates; +import com.launchdarkly.sdk.server.TestComponents.MockDataSourceUpdates.UpsertParams; import com.launchdarkly.sdk.server.TestComponents.MockDataStoreStatusProvider; -import com.launchdarkly.sdk.server.TestComponents.MockEventSourceCreator; import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder; import com.launchdarkly.sdk.server.interfaces.DataSourceFactory; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorKind; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status; -import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates; -import com.launchdarkly.sdk.server.interfaces.DataStore; import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider; import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.ItemDescriptor; import com.launchdarkly.sdk.server.interfaces.HttpConfiguration; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.Handlers; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; -import org.easymock.EasyMockSupport; +import org.hamcrest.MatcherAssert; import org.junit.Before; import org.junit.Test; import java.io.EOFException; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; - -import javax.net.ssl.SSLHandshakeException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import static com.launchdarkly.sdk.server.DataModel.FEATURES; import static com.launchdarkly.sdk.server.DataModel.SEGMENTS; -import static com.launchdarkly.sdk.server.JsonHelpers.gsonInstance; import static com.launchdarkly.sdk.server.ModelBuilders.flagBuilder; import static com.launchdarkly.sdk.server.ModelBuilders.segmentBuilder; import static com.launchdarkly.sdk.server.TestComponents.clientContext; import static com.launchdarkly.sdk.server.TestComponents.dataSourceUpdates; -import static com.launchdarkly.sdk.server.TestComponents.dataStoreThatThrowsException; -import static com.launchdarkly.sdk.server.TestHttpUtil.eventStreamResponse; -import static com.launchdarkly.sdk.server.TestHttpUtil.makeStartedServer; -import static com.launchdarkly.sdk.server.TestUtil.makeSocketFactorySingleHost; import static com.launchdarkly.sdk.server.TestUtil.requireDataSourceStatus; import static com.launchdarkly.sdk.server.TestUtil.shouldNotTimeOut; -import static com.launchdarkly.sdk.server.TestUtil.shouldTimeOut; -import static com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder.DEFAULT_INITIAL_RECONNECT_DELAY; -import static org.easymock.EasyMock.expectLastCall; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import okhttp3.HttpUrl; -import okhttp3.mockwebserver.MockWebServer; - @SuppressWarnings("javadoc") -public class StreamProcessorTest extends EasyMockSupport { +public class StreamProcessorTest { private static final String SDK_KEY = "sdk_key"; - private static final URI STREAM_URI = URI.create("http://stream.test.com/"); - private static final URI STREAM_URI_WITHOUT_SLASH = URI.create("http://stream.test.com"); + private static final Duration BRIEF_RECONNECT_DELAY = Duration.ofMillis(10); private static final String FEATURE1_KEY = "feature1"; private static final int FEATURE1_VERSION = 11; private static final DataModel.FeatureFlag FEATURE = flagBuilder(FEATURE1_KEY).version(FEATURE1_VERSION).build(); private static final String SEGMENT1_KEY = "segment1"; private static final int SEGMENT1_VERSION = 22; private static final DataModel.Segment SEGMENT = segmentBuilder(SEGMENT1_KEY).version(SEGMENT1_VERSION).build(); - private static final String STREAM_RESPONSE_WITH_EMPTY_DATA = - "event: put\n" + - "data: {\"data\":{\"flags\":{},\"segments\":{}}}\n\n"; + private static final String EMPTY_DATA_EVENT = makePutEvent(new DataBuilder().addAny(FEATURES).addAny(SEGMENTS)); private InMemoryDataStore dataStore; private MockDataSourceUpdates dataSourceUpdates; private MockDataStoreStatusProvider dataStoreStatusProvider; - private EventSource mockEventSource; - private MockEventSourceCreator mockEventSourceCreator; + private static Handler streamResponse(String data) { + return Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(data), + Handlers.SSE.leaveOpen() + ); + } + + private static Handler closableStreamResponse(String data, Semaphore closeSignal) { + return Handlers.all( + Handlers.SSE.start(), + Handlers.SSE.event(data), + Handlers.waitFor(closeSignal) + ); + } + + private static Handler streamResponseFromQueue(BlockingQueue events) { + return Handlers.all( + Handlers.SSE.start(), + ctx -> { + while (true) { + try { + String event = events.take(); + Handlers.SSE.event(event).apply(ctx); + } catch (InterruptedException e) { + break; + } + } + } + ); + } + + private static String makeEvent(String type, String data) { + return "event: " + type + "\ndata: " + data; + } + + private static String makePutEvent(DataBuilder data) { + return makeEvent("put", "{\"data\":" + data.buildJson().toJsonString() + "}"); + } + + private static String makePatchEvent(String path, DataKind kind, VersionedData item) { + String json = kind.serialize(new ItemDescriptor(item.getVersion(), item)); + return makeEvent("patch", "{\"path\":\"" + path + "\",\"data\":" + json + "}"); + } + + private static String makeDeleteEvent(String path, int version) { + return makeEvent("delete", "{\"path\":\"" + path + "\",\"version\":" + version + "}"); + } + @Before public void setup() { dataStore = new InMemoryDataStore(); dataStoreStatusProvider = new MockDataStoreStatusProvider(); dataSourceUpdates = TestComponents.dataSourceUpdates(dataStore, dataStoreStatusProvider); - mockEventSource = createMock(EventSource.class); - mockEventSourceCreator = new MockEventSourceCreator(mockEventSource); } @Test @@ -124,118 +157,128 @@ public void builderCanSpecifyConfiguration() throws Exception { } @Test - public void streamUriHasCorrectEndpoint() { - createStreamProcessor(STREAM_URI).start(); - assertEquals(URI.create(STREAM_URI.toString() + "all"), - mockEventSourceCreator.getNextReceivedParams().streamUri); + public void verifyStreamRequestProperties() throws Exception { + HttpConfiguration httpConfig = clientContext(SDK_KEY, LDConfig.DEFAULT).getHttp(); + + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + + RequestInfo req = server.getRecorder().requireRequest(); + assertThat(req.getMethod(), equalTo("GET")); + assertThat(req.getPath(), equalTo("/all")); + + for (Map.Entry kv: httpConfig.getDefaultHeaders()) { + assertThat(req.getHeader(kv.getKey()), equalTo(kv.getValue())); + } + assertThat(req.getHeader("Accept"), equalTo("text/event-stream")); + } + } } @Test - public void streamBaseUriDoesNotNeedTrailingSlash() { - createStreamProcessor(STREAM_URI_WITHOUT_SLASH).start(); - assertEquals(URI.create(STREAM_URI_WITHOUT_SLASH.toString() + "/all"), - mockEventSourceCreator.getNextReceivedParams().streamUri); + public void streamBaseUriDoesNotNeedTrailingSlash() throws Exception { + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + URI baseUri = server.getUri(); + MatcherAssert.assertThat(baseUri.toString(), endsWith("/")); + URI trimmedUri = URI.create(server.getUri().toString().substring(0, server.getUri().toString().length() - 1)); + try (StreamProcessor sp = createStreamProcessor(null, trimmedUri)) { + sp.start(); + + RequestInfo req = server.getRecorder().requireRequest(); + assertThat(req.getPath(), equalTo("/all")); + } + } } @Test - public void streamBaseUriCanHaveContextPath() { - createStreamProcessor(URI.create(STREAM_URI.toString() + "/context/path")).start(); - assertEquals(URI.create(STREAM_URI.toString() + "/context/path/all"), - mockEventSourceCreator.getNextReceivedParams().streamUri); - } - - @Test - public void basicHeadersAreSent() { - HttpConfiguration httpConfig = clientContext(SDK_KEY, LDConfig.DEFAULT).getHttp(); - - createStreamProcessor(STREAM_URI).start(); - EventSourceParams params = mockEventSourceCreator.getNextReceivedParams(); - - for (Map.Entry kv: httpConfig.getDefaultHeaders()) { - assertThat(params.headers.get(kv.getKey()), equalTo(kv.getValue())); + public void streamBaseUriCanHaveContextPath() throws Exception { + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + URI baseUri = server.getUri().resolve("/context/path"); + try (StreamProcessor sp = createStreamProcessor(null, baseUri)) { + sp.start(); + + RequestInfo req = server.getRecorder().requireRequest(); + assertThat(req.getPath(), equalTo("/context/path/all")); + } } } - @Test - public void headersHaveAccept() { - createStreamProcessor(STREAM_URI).start(); - assertEquals("text/event-stream", - mockEventSourceCreator.getNextReceivedParams().headers.get("Accept")); - } - @Test public void putCausesFeatureToBeStored() throws Exception { - expectNoStreamRestart(); - - createStreamProcessor(STREAM_URI).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - - MessageEvent event = new MessageEvent("{\"data\":{\"flags\":{\"" + - FEATURE1_KEY + "\":" + featureJson(FEATURE1_KEY, FEATURE1_VERSION) + "}," + - "\"segments\":{}}}"); - handler.onMessage("put", event); + FeatureFlag flag = flagBuilder(FEATURE1_KEY).version(FEATURE1_VERSION).build(); + DataBuilder data = new DataBuilder().addAny(FEATURES, flag).addAny(SEGMENTS); + Handler streamHandler = streamResponse(makePutEvent(data)); - assertFeatureInStore(FEATURE); + try (HttpServer server = HttpServer.start(streamHandler)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + + dataSourceUpdates.awaitInit(); + assertFeatureInStore(flag); + } + } } @Test public void putCausesSegmentToBeStored() throws Exception { - expectNoStreamRestart(); - - createStreamProcessor(STREAM_URI).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - - MessageEvent event = new MessageEvent("{\"data\":{\"flags\":{},\"segments\":{\"" + - SEGMENT1_KEY + "\":" + segmentJson(SEGMENT1_KEY, SEGMENT1_VERSION) + "}}}"); - handler.onMessage("put", event); - - assertSegmentInStore(SEGMENT); + Segment segment = ModelBuilders.segmentBuilder(SEGMENT1_KEY).version(SEGMENT1_VERSION).build(); + DataBuilder data = new DataBuilder().addAny(FEATURES).addAny(SEGMENTS, segment); + Handler streamHandler = streamResponse(makePutEvent(data)); + + try (HttpServer server = HttpServer.start(streamHandler)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + + dataSourceUpdates.awaitInit(); + assertSegmentInStore(SEGMENT); + } + } } @Test public void storeNotInitializedByDefault() throws Exception { - createStreamProcessor(STREAM_URI).start(); - assertFalse(dataStore.isInitialized()); - } - - @Test - public void putCausesStoreToBeInitialized() throws Exception { - createStreamProcessor(STREAM_URI).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - assertTrue(dataStore.isInitialized()); + try (HttpServer server = HttpServer.start(streamResponse(""))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + assertFalse(dataStore.isInitialized()); + } + } } @Test public void processorNotInitializedByDefault() throws Exception { - StreamProcessor sp = createStreamProcessor(STREAM_URI); - sp.start(); - assertFalse(sp.isInitialized()); - } - - @Test - public void putCausesProcessorToBeInitialized() throws Exception { - StreamProcessor sp = createStreamProcessor(STREAM_URI); - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - assertTrue(sp.isInitialized()); + try (HttpServer server = HttpServer.start(streamResponse(""))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + assertFalse(sp.isInitialized()); + } + } } @Test public void futureIsNotSetByDefault() throws Exception { - StreamProcessor sp = createStreamProcessor(STREAM_URI); - Future future = sp.start(); - assertFalse(future.isDone()); + try (HttpServer server = HttpServer.start(streamResponse(""))) { + try (StreamProcessor sp = createStreamProcessor(server.getUri())) { + Future future = sp.start(); + assertFalse(future.isDone()); + } + } } @Test - public void putCausesFutureToBeSet() throws Exception { - StreamProcessor sp = createStreamProcessor(STREAM_URI); - Future future = sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - assertTrue(future.isDone()); + public void putCausesStoreAndProcessorToBeInitialized() throws Exception { + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + Future future = sp.start(); + + dataSourceUpdates.awaitInit(); + shouldNotTimeOut(future, Duration.ofSeconds(1)); + assertTrue(dataStore.isInitialized()); + assertTrue(sp.isInitialized()); + assertTrue(future.isDone()); + } + } } @Test @@ -249,19 +292,26 @@ public void patchUpdatesSegment() throws Exception { } private void doPatchSuccessTest(DataKind kind, VersionedData item, String path) throws Exception { - expectNoStreamRestart(); - - createStreamProcessor(STREAM_URI).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - - String json = kind.serialize(new ItemDescriptor(item.getVersion(), item)); - MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"data\":" + json + "}"); - handler.onMessage("patch", event); + BlockingQueue events = new LinkedBlockingQueue<>(); + events.add(EMPTY_DATA_EVENT); - ItemDescriptor result = dataStore.get(kind, item.getKey()); - assertNotNull(result.getItem()); - assertEquals(item.getVersion(), result.getVersion()); + try (HttpServer server = HttpServer.start(streamResponseFromQueue(events))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + dataSourceUpdates.awaitInit(); + + events.add(makePatchEvent(path, kind, item)); + UpsertParams gotUpsert = dataSourceUpdates.awaitUpsert(); + + assertThat(gotUpsert.kind, equalTo(kind)); + assertThat(gotUpsert.key, equalTo(item.getKey())); + assertThat(gotUpsert.item.getVersion(), equalTo(item.getVersion())); + + ItemDescriptor result = dataStore.get(kind, item.getKey()); + assertNotNull(result.getItem()); + assertEquals(item.getVersion(), result.getVersion()); + } + } } @Test @@ -275,106 +325,100 @@ public void deleteDeletesSegment() throws Exception { } private void doDeleteSuccessTest(DataKind kind, VersionedData item, String path) throws Exception { - expectNoStreamRestart(); - - createStreamProcessor(STREAM_URI).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - dataStore.upsert(kind, item.getKey(), new ItemDescriptor(item.getVersion(), item)); + BlockingQueue events = new LinkedBlockingQueue<>(); + events.add(EMPTY_DATA_EVENT); - MessageEvent event = new MessageEvent("{\"path\":\"" + path + "\",\"version\":" + - (item.getVersion() + 1) + "}"); - handler.onMessage("delete", event); - - assertEquals(ItemDescriptor.deletedItem(item.getVersion() + 1), dataStore.get(kind, item.getKey())); + try (HttpServer server = HttpServer.start(streamResponseFromQueue(events))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + dataSourceUpdates.awaitInit(); + + dataStore.upsert(kind, item.getKey(), new ItemDescriptor(item.getVersion(), item)); + + events.add(makeDeleteEvent(path, item.getVersion() + 1)); + UpsertParams gotUpsert = dataSourceUpdates.awaitUpsert(); + + assertThat(gotUpsert.kind, equalTo(kind)); + assertThat(gotUpsert.key, equalTo(item.getKey())); + assertThat(gotUpsert.item.getVersion(), equalTo(item.getVersion() + 1)); + + assertEquals(ItemDescriptor.deletedItem(item.getVersion() + 1), dataStore.get(kind, item.getKey())); + } + } } @Test - public void unknownEventTypeDoesNotThrowException() throws Exception { - createStreamProcessor(STREAM_URI).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("what", new MessageEvent("")); + public void unknownEventTypeDoesNotCauseError() throws Exception { + verifyEventCausesNoStreamRestart("what", ""); } @Test public void streamWillReconnectAfterGeneralIOException() throws Exception { - createStreamProcessor(STREAM_URI).start(); - ConnectionErrorHandler errorHandler = mockEventSourceCreator.getNextReceivedParams().errorHandler; - ConnectionErrorHandler.Action action = errorHandler.onConnectionError(new IOException()); - assertEquals(ConnectionErrorHandler.Action.PROCEED, action); + Handler errorHandler = Handlers.malformedResponse(); + Handler streamHandler = streamResponse(EMPTY_DATA_EVENT); + Handler errorThenSuccess = Handlers.sequential(errorHandler, streamHandler); - assertNotNull(dataSourceUpdates.getLastStatus().getLastError()); - assertEquals(ErrorKind.NETWORK_ERROR, dataSourceUpdates.getLastStatus().getLastError().getKind()); - } - - @Test - public void streamWillReconnectAfterHttpError() throws Exception { - createStreamProcessor(STREAM_URI).start(); - ConnectionErrorHandler errorHandler = mockEventSourceCreator.getNextReceivedParams().errorHandler; - ConnectionErrorHandler.Action action = errorHandler.onConnectionError(new UnsuccessfulResponseException(500)); - assertEquals(ConnectionErrorHandler.Action.PROCEED, action); - - assertNotNull(dataSourceUpdates.getLastStatus().getLastError()); - assertEquals(ErrorKind.ERROR_RESPONSE, dataSourceUpdates.getLastStatus().getLastError().getKind()); - assertEquals(500, dataSourceUpdates.getLastStatus().getLastError().getStatusCode()); - } + try (HttpServer server = HttpServer.start(errorThenSuccess)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + startAndWait(sp); - @Test - public void streamWillReconnectAfterUnknownError() throws Exception { - createStreamProcessor(STREAM_URI).start(); - ConnectionErrorHandler errorHandler = mockEventSourceCreator.getNextReceivedParams().errorHandler; - ConnectionErrorHandler.Action action = errorHandler.onConnectionError(new RuntimeException("what?")); - assertEquals(ConnectionErrorHandler.Action.PROCEED, action); - - assertNotNull(dataSourceUpdates.getLastStatus().getLastError()); - assertEquals(ErrorKind.UNKNOWN, dataSourceUpdates.getLastStatus().getLastError().getKind()); + assertThat(server.getRecorder().count(), equalTo(2)); + assertThat(dataSourceUpdates.getLastStatus().getLastError(), notNullValue()); + assertThat(dataSourceUpdates.getLastStatus().getLastError().getKind(), equalTo(ErrorKind.NETWORK_ERROR)); + } + } } @Test public void streamInitDiagnosticRecordedOnOpen() throws Exception { DiagnosticAccumulator acc = new DiagnosticAccumulator(new DiagnosticId(SDK_KEY)); long startTime = System.currentTimeMillis(); - createStreamProcessor(LDConfig.DEFAULT, STREAM_URI, acc).start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - long timeAfterOpen = System.currentTimeMillis(); - DiagnosticEvent.Statistics event = acc.createEventAndReset(0, 0); - assertEquals(1, event.streamInits.size()); - DiagnosticEvent.StreamInit init = event.streamInits.get(0); - assertFalse(init.failed); - assertThat(init.timestamp, greaterThanOrEqualTo(startTime)); - assertThat(init.timestamp, lessThanOrEqualTo(timeAfterOpen)); - assertThat(init.durationMillis, lessThanOrEqualTo(timeAfterOpen - startTime)); + + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri(), acc)) { + startAndWait(sp); + + long timeAfterOpen = System.currentTimeMillis(); + DiagnosticEvent.Statistics event = acc.createEventAndReset(0, 0); + assertEquals(1, event.streamInits.size()); + DiagnosticEvent.StreamInit init = event.streamInits.get(0); + assertFalse(init.failed); + assertThat(init.timestamp, greaterThanOrEqualTo(startTime)); + assertThat(init.timestamp, lessThanOrEqualTo(timeAfterOpen)); + assertThat(init.durationMillis, lessThanOrEqualTo(timeAfterOpen - startTime)); + } + } } @Test public void streamInitDiagnosticRecordedOnErrorDuringInit() throws Exception { DiagnosticAccumulator acc = new DiagnosticAccumulator(new DiagnosticId(SDK_KEY)); long startTime = System.currentTimeMillis(); - createStreamProcessor(LDConfig.DEFAULT, STREAM_URI, acc).start(); - ConnectionErrorHandler errorHandler = mockEventSourceCreator.getNextReceivedParams().errorHandler; - errorHandler.onConnectionError(new IOException()); - long timeAfterOpen = System.currentTimeMillis(); - DiagnosticEvent.Statistics event = acc.createEventAndReset(0, 0); - assertEquals(1, event.streamInits.size()); - DiagnosticEvent.StreamInit init = event.streamInits.get(0); - assertTrue(init.failed); - assertThat(init.timestamp, greaterThanOrEqualTo(startTime)); - assertThat(init.timestamp, lessThanOrEqualTo(timeAfterOpen)); - assertThat(init.durationMillis, lessThanOrEqualTo(timeAfterOpen - startTime)); - } - - @Test - public void streamInitDiagnosticNotRecordedOnErrorAfterInit() throws Exception { - DiagnosticAccumulator acc = new DiagnosticAccumulator(new DiagnosticId(SDK_KEY)); - createStreamProcessor(LDConfig.DEFAULT, STREAM_URI, acc).start(); - StreamProcessor.EventSourceParams params = mockEventSourceCreator.getNextReceivedParams(); - params.handler.onMessage("put", emptyPutEvent()); - // Drop first stream init from stream open - acc.createEventAndReset(0, 0); - params.errorHandler.onConnectionError(new IOException()); - DiagnosticEvent.Statistics event = acc.createEventAndReset(0, 0); - assertEquals(0, event.streamInits.size()); + + Handler errorHandler = Handlers.status(503); + Handler streamHandler = streamResponse(EMPTY_DATA_EVENT); + Handler errorThenSuccess = Handlers.sequential(errorHandler, streamHandler); + + try (HttpServer server = HttpServer.start(errorThenSuccess)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri(), acc)) { + startAndWait(sp); + + long timeAfterOpen = System.currentTimeMillis(); + DiagnosticEvent.Statistics event = acc.createEventAndReset(0, 0); + + assertEquals(2, event.streamInits.size()); + DiagnosticEvent.StreamInit init0 = event.streamInits.get(0); + assertTrue(init0.failed); + assertThat(init0.timestamp, greaterThanOrEqualTo(startTime)); + assertThat(init0.timestamp, lessThanOrEqualTo(timeAfterOpen)); + assertThat(init0.durationMillis, lessThanOrEqualTo(timeAfterOpen - startTime)); + + DiagnosticEvent.StreamInit init1 = event.streamInits.get(1); + assertFalse(init1.failed); + assertThat(init1.timestamp, greaterThanOrEqualTo(init0.timestamp)); + assertThat(init1.timestamp, lessThanOrEqualTo(timeAfterOpen)); + } + } } @Test @@ -409,22 +453,22 @@ public void http500ErrorIsRecoverable() throws Exception { @Test public void putEventWithInvalidJsonCausesStreamRestart() throws Exception { - verifyInvalidDataEvent("put", "{sorry"); + verifyEventCausesStreamRestart("put", "{sorry", ErrorKind.INVALID_DATA); } @Test public void putEventWithWellFormedJsonButInvalidDataCausesStreamRestart() throws Exception { - verifyInvalidDataEvent("put", "{\"data\":{\"flags\":3}}"); + verifyEventCausesStreamRestart("put", "{\"data\":{\"flags\":3}}", ErrorKind.INVALID_DATA); } @Test public void patchEventWithInvalidJsonCausesStreamRestart() throws Exception { - verifyInvalidDataEvent("patch", "{sorry"); + verifyEventCausesStreamRestart("patch", "{sorry", ErrorKind.INVALID_DATA); } @Test public void patchEventWithWellFormedJsonButInvalidDataCausesStreamRestart() throws Exception { - verifyInvalidDataEvent("patch", "{\"path\":\"/flags/flagkey\", \"data\":{\"rules\":3}}"); + verifyEventCausesStreamRestart("patch", "{\"path\":\"/flags/flagkey\", \"data\":{\"rules\":3}}", ErrorKind.INVALID_DATA); } @Test @@ -434,12 +478,12 @@ public void patchEventWithInvalidPathCausesNoStreamRestart() throws Exception { @Test public void patchEventWithNullPathCausesStreamRestart() throws Exception { - verifyInvalidDataEvent("patch", "{\"path\":null, \"data\":{\"key\":\"flagkey\"}}"); + verifyEventCausesStreamRestart("patch", "{\"path\":null, \"data\":{\"key\":\"flagkey\"}}", ErrorKind.INVALID_DATA); } @Test public void deleteEventWithInvalidJsonCausesStreamRestart() throws Exception { - verifyInvalidDataEvent("delete", "{sorry"); + verifyEventCausesStreamRestart("delete", "{sorry", ErrorKind.INVALID_DATA); } @Test @@ -454,263 +498,168 @@ public void indirectPatchEventWithInvalidPathDoesNotCauseStreamRestart() throws @Test public void restartsStreamIfStoreNeedsRefresh() throws Exception { - CompletableFuture restarted = new CompletableFuture<>(); - mockEventSource.start(); - expectLastCall(); - mockEventSource.restart(); - expectLastCall().andAnswer(() -> { - restarted.complete(null); - return null; - }); - mockEventSource.close(); - expectLastCall(); - - replayAll(); - - try (StreamProcessor sp = createStreamProcessor(STREAM_URI)) { - sp.start(); - - dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(false, false)); - dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(true, true)); + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + startAndWait(sp); + dataSourceUpdates.awaitInit(); + server.getRecorder().requireRequest(); + + dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(false, false)); + dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(true, true)); - restarted.get(); + dataSourceUpdates.awaitInit(); + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); + } } } @Test public void doesNotRestartStreamIfStoreHadOutageButDoesNotNeedRefresh() throws Exception { - CompletableFuture restarted = new CompletableFuture<>(); - mockEventSource.start(); - expectLastCall(); - mockEventSource.restart(); - expectLastCall().andAnswer(() -> { - restarted.complete(null); - return null; - }); - mockEventSource.close(); - expectLastCall(); - - replayAll(); - - try (StreamProcessor sp = createStreamProcessor(STREAM_URI)) { - sp.start(); - - dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(false, false)); - dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(true, false)); - - Thread.sleep(500); - assertFalse(restarted.isDone()); + try (HttpServer server = HttpServer.start(streamResponse(EMPTY_DATA_EVENT))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + startAndWait(sp); + dataSourceUpdates.awaitInit(); + server.getRecorder().requireRequest(); + + dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(false, false)); + dataStoreStatusProvider.updateStatus(new DataStoreStatusProvider.Status(true, false)); + + server.getRecorder().requireNoRequests(Duration.ofMillis(100)); + } } } + private void verifyStoreErrorCausesStreamRestart(String eventName, String eventData) throws Exception { + AtomicInteger updateCount = new AtomicInteger(0); + Runnable preUpdateHook = () -> { + int count = updateCount.incrementAndGet(); + if (count == 2) { + // only fail on the 2nd update - the first is the one caused by the initial "put" in the test setup + throw new RuntimeException("sorry"); + } + }; + DelegatingDataStore delegatingStore = new DelegatingDataStore(dataStore, preUpdateHook); + dataStoreStatusProvider = new MockDataStoreStatusProvider(false); // false = the store does not provide status monitoring + dataSourceUpdates = TestComponents.dataSourceUpdates(delegatingStore, dataStoreStatusProvider); + + verifyEventCausesStreamRestart(eventName, eventData, ErrorKind.STORE_ERROR); + } + @Test public void storeFailureOnPutCausesStreamRestart() throws Exception { - MockDataSourceUpdates badUpdates = dataSourceUpdatesThatMakesUpdatesFailAndDoesNotSupportStatusMonitoring(); - expectStreamRestart(); - replayAll(); - - try (StreamProcessor sp = createStreamProcessorWithStoreUpdates(badUpdates)) { - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("put", emptyPutEvent()); - - assertNotNull(badUpdates.getLastStatus().getLastError()); - assertEquals(ErrorKind.STORE_ERROR, badUpdates.getLastStatus().getLastError().getKind()); - } - verifyAll(); + verifyStoreErrorCausesStreamRestart("put", emptyPutEvent().getData()); } @Test public void storeFailureOnPatchCausesStreamRestart() throws Exception { - MockDataSourceUpdates badUpdates = dataSourceUpdatesThatMakesUpdatesFailAndDoesNotSupportStatusMonitoring(); - expectStreamRestart(); - replayAll(); - - try (StreamProcessor sp = createStreamProcessorWithStoreUpdates(badUpdates)) { - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("patch", - new MessageEvent("{\"path\":\"/flags/flagkey\",\"data\":{\"key\":\"flagkey\",\"version\":1}}")); - } - verifyAll(); + String patchData = "{\"path\":\"/flags/flagkey\",\"data\":{\"key\":\"flagkey\",\"version\":1}}"; + verifyStoreErrorCausesStreamRestart("patch", patchData); } @Test public void storeFailureOnDeleteCausesStreamRestart() throws Exception { - MockDataSourceUpdates badUpdates = dataSourceUpdatesThatMakesUpdatesFailAndDoesNotSupportStatusMonitoring(); - expectStreamRestart(); - replayAll(); - - try (StreamProcessor sp = createStreamProcessorWithStoreUpdates(badUpdates)) { - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage("delete", - new MessageEvent("{\"path\":\"/flags/flagkey\",\"version\":1}")); - } - verifyAll(); - } - - @Test - public void onCommentIsIgnored() throws Exception { - // This just verifies that we are not doing anything with comment data, by passing a null instead of a string - try (StreamProcessor sp = createStreamProcessor(STREAM_URI)) { - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onComment(null); - } + String deleteData = "{\"path\":\"/flags/flagkey\",\"version\":1}"; + verifyStoreErrorCausesStreamRestart("delete", deleteData); } @Test - public void onErrorIsIgnored() throws Exception { - expectNoStreamRestart(); - replayAll(); + public void sseCommentIsIgnored() throws Exception { + BlockingQueue events = new LinkedBlockingQueue<>(); + events.add(EMPTY_DATA_EVENT); - // EventSource won't call our onError() method because we are using a ConnectionErrorHandler instead. - try (StreamProcessor sp = createStreamProcessor(STREAM_URI)) { - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onError(new Exception("sorry")); + try (HttpServer server = HttpServer.start(streamResponseFromQueue(events))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + startAndWait(sp); + + events.add(": this is a comment"); + + // Do something after the comment, just to verify that the stream is still working + events.add(makePatchEvent("/flags/" + FEATURE.getKey(), FEATURES, FEATURE)); + dataSourceUpdates.awaitUpsert(); + } + assertThat(server.getRecorder().count(), equalTo(1)); // did not restart + assertThat(dataSourceUpdates.getLastStatus().getLastError(), nullValue()); } } - - private MockDataSourceUpdates dataSourceUpdatesThatMakesUpdatesFailAndDoesNotSupportStatusMonitoring() { - DataStore badStore = dataStoreThatThrowsException(new RuntimeException("sorry")); - DataStoreStatusProvider badStoreStatusProvider = new MockDataStoreStatusProvider(false); - return TestComponents.dataSourceUpdates(badStore, badStoreStatusProvider); - } private void verifyEventCausesNoStreamRestart(String eventName, String eventData) throws Exception { - expectNoStreamRestart(); - verifyEventBehavior(eventName, eventData); - } - - private void verifyEventCausesStreamRestartWithInMemoryStore(String eventName, String eventData) throws Exception { - expectStreamRestart(); - verifyEventBehavior(eventName, eventData); - } - - private void verifyEventBehavior(String eventName, String eventData) throws Exception { - replayAll(); - try (StreamProcessor sp = createStreamProcessor(LDConfig.DEFAULT, STREAM_URI, null)) { - sp.start(); - EventHandler handler = mockEventSourceCreator.getNextReceivedParams().handler; - handler.onMessage(eventName, new MessageEvent(eventData)); - } - verifyAll(); - } - - private void verifyInvalidDataEvent(String eventName, String eventData) throws Exception { - BlockingQueue statuses = new LinkedBlockingQueue<>(); - dataSourceUpdates.statusBroadcaster.register(statuses::add); + BlockingQueue events = new LinkedBlockingQueue<>(); + events.add(EMPTY_DATA_EVENT); - verifyEventCausesStreamRestartWithInMemoryStore(eventName, eventData); - - // We did not allow the stream to successfully process an event before causing the error, so the - // state will still be INITIALIZING, but we should be able to see that an error happened. - Status status = requireDataSourceStatus(statuses, State.INITIALIZING); - assertNotNull(status.getLastError()); - assertEquals(ErrorKind.INVALID_DATA, status.getLastError().getKind()); - } - - private void expectNoStreamRestart() throws Exception { - mockEventSource.start(); - expectLastCall().times(1); - mockEventSource.close(); - expectLastCall().times(1); - } - - private void expectStreamRestart() throws Exception { - mockEventSource.start(); - expectLastCall().times(1); - mockEventSource.restart(); - expectLastCall().times(1); - mockEventSource.close(); - expectLastCall().times(1); - } - - // There are already end-to-end tests against an HTTP server in okhttp-eventsource, so we won't retest the - // basic stream mechanism in detail. However, we do want to make sure that the LDConfig options are correctly - // applied to the EventSource for things like TLS configuration. - - @Test - public void httpClientDoesNotAllowSelfSignedCertByDefault() throws Exception { - final ConnectionErrorSink errorSink = new ConnectionErrorSink(); - try (TestHttpUtil.ServerWithCert server = new TestHttpUtil.ServerWithCert()) { - server.server.enqueue(eventStreamResponse(STREAM_RESPONSE_WITH_EMPTY_DATA)); - - try (StreamProcessor sp = createStreamProcessorWithRealHttp(LDConfig.DEFAULT, server.uri())) { - sp.connectionErrorHandler = errorSink; - Future ready = sp.start(); - ready.get(); + try (HttpServer server = HttpServer.start(streamResponseFromQueue(events))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + startAndWait(sp); + + events.add(makeEvent(eventName, eventData)); - Throwable error = errorSink.errors.peek(); - assertNotNull(error); - assertEquals(SSLHandshakeException.class, error.getClass()); + // Do something after the test event, just to verify that the stream is still working + events.add(makePatchEvent("/flags/" + FEATURE.getKey(), FEATURES, FEATURE)); + dataSourceUpdates.awaitUpsert(); } + assertThat(server.getRecorder().count(), equalTo(1)); // did not restart + assertThat(dataSourceUpdates.getLastStatus().getLastError(), nullValue()); } } - - @Test - public void httpClientCanUseCustomTlsConfig() throws Exception { - final ConnectionErrorSink errorSink = new ConnectionErrorSink(); - try (TestHttpUtil.ServerWithCert server = new TestHttpUtil.ServerWithCert()) { - server.server.enqueue(eventStreamResponse(STREAM_RESPONSE_WITH_EMPTY_DATA)); - - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().sslSocketFactory(server.socketFactory, server.trustManager)) - // allows us to trust the self-signed cert - .build(); - - try (StreamProcessor sp = createStreamProcessorWithRealHttp(config, server.uri())) { - sp.connectionErrorHandler = errorSink; - Future ready = sp.start(); - ready.get(); + + private void verifyEventCausesStreamRestart(String eventName, String eventData, ErrorKind expectedError) throws Exception { + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataSourceUpdates.statusBroadcaster.register(statuses::add); + + BlockingQueue events = new LinkedBlockingQueue<>(); + events.add(EMPTY_DATA_EVENT); + + try (HttpServer server = HttpServer.start(streamResponseFromQueue(events))) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + dataSourceUpdates.awaitInit(); + server.getRecorder().requireRequest(); + + requireDataSourceStatus(statuses, State.VALID); + + events.add(makeEvent(eventName, eventData)); + events.add(EMPTY_DATA_EVENT); - assertNull(errorSink.errors.peek()); + server.getRecorder().requireRequest(); + dataSourceUpdates.awaitInit(); + + Status status = requireDataSourceStatus(statuses, State.INTERRUPTED); + assertThat(status.getLastError(), notNullValue()); + assertThat(status.getLastError().getKind(), equalTo(expectedError)); + + requireDataSourceStatus(statuses, State.VALID); } } } @Test - public void httpClientCanUseCustomSocketFactory() throws Exception { - final ConnectionErrorSink errorSink = new ConnectionErrorSink(); - try (MockWebServer server = makeStartedServer(eventStreamResponse(STREAM_RESPONSE_WITH_EMPTY_DATA))) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().socketFactory(makeSocketFactorySingleHost(serverUrl.host(), serverUrl.port()))) - .build(); - - URI uriWithWrongPort = URI.create("http://localhost:1"); - try (StreamProcessor sp = createStreamProcessorWithRealHttp(config, uriWithWrongPort)) { - sp.connectionErrorHandler = errorSink; - Future ready = sp.start(); - ready.get(); + public void testSpecialHttpConfigurations() throws Exception { + Handler handler = streamResponse(EMPTY_DATA_EVENT); + + TestHttpUtil.testWithSpecialHttpConfigurations(handler, + (targetUri, goodHttpConfig) -> { + LDConfig config = new LDConfig.Builder().http(goodHttpConfig).build(); + ConnectionErrorSink errorSink = new ConnectionErrorSink(); - assertNull(errorSink.errors.peek()); - assertEquals(1, server.getRequestCount()); - } - } - } - - @Test - public void httpClientCanUseProxyConfig() throws Exception { - final ConnectionErrorSink errorSink = new ConnectionErrorSink(); - URI fakeStreamUri = URI.create("http://not-a-real-host"); - try (MockWebServer server = makeStartedServer(eventStreamResponse(STREAM_RESPONSE_WITH_EMPTY_DATA))) { - HttpUrl serverUrl = server.url("/"); - LDConfig config = new LDConfig.Builder() - .http(Components.httpConfiguration().proxyHostAndPort(serverUrl.host(), serverUrl.port())) - .build(); - - try (StreamProcessor sp = createStreamProcessorWithRealHttp(config, fakeStreamUri)) { - sp.connectionErrorHandler = errorSink; - Future ready = sp.start(); - ready.get(); - - assertNull(errorSink.errors.peek()); - assertEquals(1, server.getRequestCount()); - } - } + try (StreamProcessor sp = createStreamProcessor(config, targetUri)) { + sp.connectionErrorHandler = errorSink; + startAndWait(sp); + assertNull(errorSink.errors.peek()); + } + }, + (targetUri, badHttpConfig) -> { + LDConfig config = new LDConfig.Builder().http(badHttpConfig).build(); + ConnectionErrorSink errorSink = new ConnectionErrorSink(); + + try (StreamProcessor sp = createStreamProcessor(config, targetUri)) { + sp.connectionErrorHandler = errorSink; + startAndWait(sp); + + Throwable error = errorSink.errors.peek(); + assertNotNull(error); + } + } + ); } static class ConnectionErrorSink implements ConnectionErrorHandler { @@ -725,118 +674,100 @@ public Action onConnectionError(Throwable t) { } private void testUnrecoverableHttpError(int statusCode) throws Exception { - UnsuccessfulResponseException e = new UnsuccessfulResponseException(statusCode); - long startTime = System.currentTimeMillis(); - StreamProcessor sp = createStreamProcessor(STREAM_URI); - Future initFuture = sp.start(); + Handler errorResp = Handlers.status(statusCode); BlockingQueue statuses = new LinkedBlockingQueue<>(); dataSourceUpdates.statusBroadcaster.register(statuses::add); - ConnectionErrorHandler errorHandler = mockEventSourceCreator.getNextReceivedParams().errorHandler; - ConnectionErrorHandler.Action action = errorHandler.onConnectionError(e); - assertEquals(ConnectionErrorHandler.Action.SHUTDOWN, action); - - shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); - assertTrue((System.currentTimeMillis() - startTime) < 9000); - assertTrue(initFuture.isDone()); - assertFalse(sp.isInitialized()); - - Status newStatus = requireDataSourceStatus(statuses, State.OFF); - assertEquals(ErrorKind.ERROR_RESPONSE, newStatus.getLastError().getKind()); - assertEquals(statusCode, newStatus.getLastError().getStatusCode()); + try (HttpServer server = HttpServer.start(errorResp)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + Future initFuture = sp.start(); + shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); + + assertFalse(sp.isInitialized()); + + Status newStatus = requireDataSourceStatus(statuses, State.OFF); + assertEquals(ErrorKind.ERROR_RESPONSE, newStatus.getLastError().getKind()); + assertEquals(statusCode, newStatus.getLastError().getStatusCode()); + + server.getRecorder().requireRequest(); + server.getRecorder().requireNoRequests(Duration.ofMillis(50)); + } + } } private void testRecoverableHttpError(int statusCode) throws Exception { - UnsuccessfulResponseException e = new UnsuccessfulResponseException(statusCode); - long startTime = System.currentTimeMillis(); - StreamProcessor sp = createStreamProcessor(STREAM_URI); - Future initFuture = sp.start(); + Semaphore closeFirstStreamSignal = new Semaphore(0); + Handler errorResp = Handlers.status(statusCode); + Handler stream1Resp = closableStreamResponse(EMPTY_DATA_EVENT, closeFirstStreamSignal); + Handler stream2Resp = streamResponse(EMPTY_DATA_EVENT); + + // Set up the sequence of responses that we'll receive below. + Handler seriesOfResponses = Handlers.sequential(errorResp, stream1Resp, errorResp, stream2Resp); BlockingQueue statuses = new LinkedBlockingQueue<>(); dataSourceUpdates.statusBroadcaster.register(statuses::add); - // simulate error - EventSourceParams eventSourceParams = mockEventSourceCreator.getNextReceivedParams(); - ConnectionErrorHandler errorHandler = eventSourceParams.errorHandler; - ConnectionErrorHandler.Action action = errorHandler.onConnectionError(e); - assertEquals(ConnectionErrorHandler.Action.PROCEED, action); - - shouldTimeOut(initFuture, Duration.ofMillis(200)); - assertTrue((System.currentTimeMillis() - startTime) >= 200); - assertFalse(initFuture.isDone()); - assertFalse(sp.isInitialized()); - - Status failureStatus1 = requireDataSourceStatus(statuses, State.INITIALIZING); - assertEquals(ErrorKind.ERROR_RESPONSE, failureStatus1.getLastError().getKind()); - assertEquals(statusCode, failureStatus1.getLastError().getStatusCode()); - - // simulate successful retry - eventSourceParams.handler.onMessage("put", emptyPutEvent()); - - shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); - assertTrue(initFuture.isDone()); - assertTrue(sp.isInitialized()); - - Status successStatus = requireDataSourceStatus(statuses, State.VALID); - assertSame(failureStatus1.getLastError(), successStatus.getLastError()); - - // simulate another error of the same kind - the difference is now the state will be INTERRUPTED - action = errorHandler.onConnectionError(e); - assertEquals(ConnectionErrorHandler.Action.PROCEED, action); - - Status failureStatus2 = requireDataSourceStatus(statuses, State.INTERRUPTED); - assertEquals(ErrorKind.ERROR_RESPONSE, failureStatus2.getLastError().getKind()); - assertEquals(statusCode, failureStatus2.getLastError().getStatusCode()); - assertNotSame(failureStatus2.getLastError(), failureStatus1.getLastError()); // a new instance of the same kind of error + try (HttpServer server = HttpServer.start(seriesOfResponses)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + Future initFuture = sp.start(); + shouldNotTimeOut(initFuture, Duration.ofSeconds(2)); + + assertTrue(sp.isInitialized()); + + // The first stream request receives an error response (errorResp). + Status failureStatus1 = requireDataSourceStatus(statuses, State.INITIALIZING); + assertEquals(ErrorKind.ERROR_RESPONSE, failureStatus1.getLastError().getKind()); + assertEquals(statusCode, failureStatus1.getLastError().getStatusCode()); + + // It tries to reconnect, and gets a valid response (stream1Resp). Now the stream is active. + Status successStatus1 = requireDataSourceStatus(statuses, State.VALID); + assertSame(failureStatus1.getLastError(), successStatus1.getLastError()); + + // Now we'll trigger a disconnection of the stream. The SDK detects that as a + // NETWORK_ERROR. The state changes to INTERRUPTED because it was previously connected. + closeFirstStreamSignal.release(); + Status failureStatus2 = requireDataSourceStatus(statuses, State.INTERRUPTED); + assertEquals(ErrorKind.NETWORK_ERROR, failureStatus2.getLastError().getKind()); + + // It tries to reconnect, and gets another errorResp. The state is still INTERRUPTED. + Status failureStatus3 = requireDataSourceStatus(statuses, State.INTERRUPTED); + assertEquals(ErrorKind.ERROR_RESPONSE, failureStatus3.getLastError().getKind()); + assertEquals(statusCode, failureStatus3.getLastError().getStatusCode()); + + // It tries again, and finally gets a valid response (stream2Resp). + Status successStatus2 = requireDataSourceStatus(statuses, State.VALID); + assertSame(failureStatus3.getLastError(), successStatus2.getLastError()); + } + } } private StreamProcessor createStreamProcessor(URI streamUri) { return createStreamProcessor(LDConfig.DEFAULT, streamUri, null); } - private StreamProcessor createStreamProcessor(LDConfig config, URI streamUri, DiagnosticAccumulator diagnosticAccumulator) { + private StreamProcessor createStreamProcessor(LDConfig config, URI streamUri, DiagnosticAccumulator acc) { return new StreamProcessor( - clientContext(SDK_KEY, config).getHttp(), + clientContext(SDK_KEY, config == null ? LDConfig.DEFAULT : config).getHttp(), dataSourceUpdates, - mockEventSourceCreator, Thread.MIN_PRIORITY, - diagnosticAccumulator, + acc, streamUri, - DEFAULT_INITIAL_RECONNECT_DELAY + BRIEF_RECONNECT_DELAY ); } - private StreamProcessor createStreamProcessorWithRealHttp(LDConfig config, URI streamUri) { - return new StreamProcessor( - clientContext(SDK_KEY, config).getHttp(), - dataSourceUpdates, - null, - Thread.MIN_PRIORITY, - null, - streamUri, - DEFAULT_INITIAL_RECONNECT_DELAY - ); - } - - private StreamProcessor createStreamProcessorWithStoreUpdates(DataSourceUpdates storeUpdates) { - return new StreamProcessor( - clientContext(SDK_KEY, LDConfig.DEFAULT).getHttp(), - storeUpdates, - mockEventSourceCreator, - Thread.MIN_PRIORITY, - null, - STREAM_URI, - DEFAULT_INITIAL_RECONNECT_DELAY - ); + private StreamProcessor createStreamProcessor(LDConfig config, URI streamUri) { + return createStreamProcessor(config, streamUri, null); } - private String featureJson(String key, int version) { - return gsonInstance().toJson(flagBuilder(key).version(version).build()); - } - - private String segmentJson(String key, int version) { - return gsonInstance().toJson(ModelBuilders.segmentBuilder(key).version(version).build()); + private static void startAndWait(StreamProcessor sp) { + Future ready = sp.start(); + try { + ready.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } } private MessageEvent emptyPutEvent() { diff --git a/src/test/java/com/launchdarkly/sdk/server/TestComponents.java b/src/test/java/com/launchdarkly/sdk/server/TestComponents.java index 342500bac..2c2519d9d 100644 --- a/src/test/java/com/launchdarkly/sdk/server/TestComponents.java +++ b/src/test/java/com/launchdarkly/sdk/server/TestComponents.java @@ -1,7 +1,6 @@ package com.launchdarkly.sdk.server; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.launchdarkly.eventsource.EventSource; import com.launchdarkly.sdk.UserAttribute; import com.launchdarkly.sdk.server.integrations.EventProcessorBuilder; import com.launchdarkly.sdk.server.interfaces.ClientContext; @@ -25,6 +24,7 @@ import com.launchdarkly.sdk.server.interfaces.EventProcessorFactory; import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent; import com.launchdarkly.sdk.server.interfaces.FlagChangeListener; +import com.launchdarkly.sdk.server.interfaces.HttpConfiguration; import com.launchdarkly.sdk.server.interfaces.PersistentDataStore; import com.launchdarkly.sdk.server.interfaces.PersistentDataStoreFactory; @@ -55,6 +55,10 @@ public static ClientContext clientContext(final String sdkKey, final LDConfig co return new ClientContextImpl(sdkKey, config, sharedExecutor, diagnosticAccumulator); } + public static HttpConfiguration defaultHttpConfiguration() { + return clientContext("", LDConfig.DEFAULT).getHttp(); + } + public static DataStore dataStoreThatThrowsException(RuntimeException e) { return new DataStoreThatThrowsException(e); } @@ -149,12 +153,26 @@ public void close() throws IOException { }; public static class MockDataSourceUpdates implements DataSourceUpdates { + public static class UpsertParams { + public final DataKind kind; + public final String key; + public final ItemDescriptor item; + + UpsertParams(DataKind kind, String key, ItemDescriptor item) { + super(); + this.kind = kind; + this.key = key; + this.item = item; + } + } + private final DataSourceUpdatesImpl wrappedInstance; private final DataStoreStatusProvider dataStoreStatusProvider; public final EventBroadcasterImpl flagChangeEventBroadcaster; public final EventBroadcasterImpl statusBroadcaster; public final BlockingQueue> receivedInits = new LinkedBlockingQueue<>(); + public final BlockingQueue receivedUpserts = new LinkedBlockingQueue<>(); public MockDataSourceUpdates(DataStore store, DataStoreStatusProvider dataStoreStatusProvider) { this.dataStoreStatusProvider = dataStoreStatusProvider; @@ -172,13 +190,16 @@ public MockDataSourceUpdates(DataStore store, DataStoreStatusProvider dataStoreS @Override public boolean init(FullDataSet allData) { + boolean result = wrappedInstance.init(allData); receivedInits.add(allData); - return wrappedInstance.init(allData); + return result; } @Override public boolean upsert(DataKind kind, String key, ItemDescriptor item) { - return wrappedInstance.upsert(kind, key, item); + boolean result = wrappedInstance.upsert(kind, key, item); + receivedUpserts.add(new UpsertParams(kind, key, item)); + return result; } @Override @@ -209,6 +230,16 @@ public FullDataSet awaitInit() { } catch (InterruptedException e) {} throw new RuntimeException("did not receive expected init call"); } + + public UpsertParams awaitUpsert() { + try { + UpsertParams value = receivedUpserts.poll(5, TimeUnit.SECONDS); + if (value != null) { + return value; + } + } catch (InterruptedException e) {} + throw new RuntimeException("did not receive expected upsert call"); + } } public static class DataStoreFactoryThatExposesUpdater implements DataStoreFactory { @@ -264,6 +295,62 @@ public CacheStats getCacheStats() { } } + public static class DelegatingDataStore implements DataStore { + private final DataStore store; + private final Runnable preUpdateHook; + + public DelegatingDataStore(DataStore store, Runnable preUpdateHook) { + this.store = store; + this.preUpdateHook = preUpdateHook; + } + + @Override + public void close() throws IOException { + store.close(); + } + + @Override + public void init(FullDataSet allData) { + if (preUpdateHook != null) { + preUpdateHook.run(); + } + store.init(allData); + } + + @Override + public ItemDescriptor get(DataKind kind, String key) { + return store.get(kind, key); + } + + @Override + public KeyedItems getAll(DataKind kind) { + return store.getAll(kind); + } + + @Override + public boolean upsert(DataKind kind, String key, ItemDescriptor item) { + if (preUpdateHook != null) { + preUpdateHook.run(); + } + return store.upsert(kind, key, item); + } + + @Override + public boolean isInitialized() { + return store.isInitialized(); + } + + @Override + public boolean isStatusMonitoringEnabled() { + return store.isStatusMonitoringEnabled(); + } + + @Override + public CacheStats getCacheStats() { + return store.getCacheStats(); + } + } + public static class MockDataStoreStatusProvider implements DataStoreStatusProvider { public final EventBroadcasterImpl statusBroadcaster; private final AtomicReference lastStatus; @@ -314,22 +401,4 @@ public CacheStats getCacheStats() { return null; } } - - public static class MockEventSourceCreator implements StreamProcessor.EventSourceCreator { - private final EventSource eventSource; - private final BlockingQueue receivedParams = new LinkedBlockingQueue<>(); - - MockEventSourceCreator(EventSource eventSource) { - this.eventSource = eventSource; - } - - public EventSource createEventSource(StreamProcessor.EventSourceParams params) { - receivedParams.add(params); - return eventSource; - } - - public StreamProcessor.EventSourceParams getNextReceivedParams() { - return receivedParams.poll(); - } - } } diff --git a/src/test/java/com/launchdarkly/sdk/server/TestHttpUtil.java b/src/test/java/com/launchdarkly/sdk/server/TestHttpUtil.java index fa45ea59d..a8d23b2e9 100644 --- a/src/test/java/com/launchdarkly/sdk/server/TestHttpUtil.java +++ b/src/test/java/com/launchdarkly/sdk/server/TestHttpUtil.java @@ -1,94 +1,157 @@ package com.launchdarkly.sdk.server; -import com.launchdarkly.sdk.server.Components; -import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder; -import com.launchdarkly.sdk.server.integrations.StreamingDataSourceBuilder; +import com.launchdarkly.sdk.server.interfaces.HttpConfigurationFactory; +import com.launchdarkly.testhelpers.httptest.Handler; +import com.launchdarkly.testhelpers.httptest.HttpServer; +import com.launchdarkly.testhelpers.httptest.RequestInfo; +import com.launchdarkly.testhelpers.httptest.ServerTLSConfiguration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; -import java.math.BigInteger; -import java.net.InetAddress; import java.net.URI; -import java.security.GeneralSecurityException; - -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.X509TrustManager; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.tls.HandshakeCertificates; -import okhttp3.tls.HeldCertificate; -import okhttp3.tls.internal.TlsUtil; +import static com.launchdarkly.sdk.server.TestUtil.makeSocketFactorySingleHost; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; class TestHttpUtil { - static MockWebServer makeStartedServer(MockResponse... responses) throws IOException { - MockWebServer server = new MockWebServer(); - for (MockResponse r: responses) { - server.enqueue(r); - } - server.start(); - return server; - } + static Logger logger = LoggerFactory.getLogger(TestHttpUtil.class); - static ServerWithCert httpsServerWithSelfSignedCert(MockResponse... responses) throws IOException, GeneralSecurityException { - ServerWithCert ret = new ServerWithCert(); - for (MockResponse r: responses) { - ret.server.enqueue(r); - } - ret.server.start(); - return ret; - } - - static StreamingDataSourceBuilder baseStreamingConfig(MockWebServer server) { - return Components.streamingDataSource().baseURI(server.url("").uri()); + // Used for testWithSpecialHttpConfigurations + static interface HttpConfigurationTestAction { + void accept(URI targetUri, HttpConfigurationFactory httpConfig) throws IOException; } - static PollingDataSourceBuilder basePollingConfig(MockWebServer server) { - return Components.pollingDataSource().baseURI(server.url("").uri()); + /** + * A test suite for all SDK components that support our standard HTTP configuration options. + *

+ * Although all of our supported HTTP behaviors are implemented in shared code, there is no + * guarantee that all of our components are using that code, or using it correctly. So we + * should run this test suite on each component that can be affected by HttpConfigurationBuilder + * properties. It works as follows: + *

+ * + * @param handler the response that the server should provide for all requests + * @param testActionShouldSucceed an action that asserts that the component works + * @param testActionShouldFail an action that asserts that the component does not work + * @throws IOException + */ + static void testWithSpecialHttpConfigurations( + Handler handler, + HttpConfigurationTestAction testActionShouldSucceed, + HttpConfigurationTestAction testActionShouldFail + ) throws IOException { + + testHttpClientDoesNotAllowSelfSignedCertByDefault(handler, testActionShouldFail); + testHttpClientCanBeConfiguredToAllowSelfSignedCert(handler, testActionShouldSucceed); + testHttpClientCanUseCustomSocketFactory(handler, testActionShouldSucceed); + testHttpClientCanUseProxy(handler, testActionShouldSucceed); + testHttpClientCanUseProxyWithBasicAuth(handler, testActionShouldSucceed); } - static MockResponse jsonResponse(String body) { - return new MockResponse() - .setHeader("Content-Type", "application/json") - .setBody(body); + static void testHttpClientDoesNotAllowSelfSignedCertByDefault(Handler handler, + HttpConfigurationTestAction testActionShouldFail) { + logger.warn("testHttpClientDoesNotAllowSelfSignedCertByDefault"); + try { + ServerTLSConfiguration tlsConfig = ServerTLSConfiguration.makeSelfSignedCertificate(); + try (HttpServer secureServer = HttpServer.startSecure(tlsConfig, handler)) { + testActionShouldFail.accept(secureServer.getUri(), Components.httpConfiguration()); + assertThat(secureServer.getRecorder().count(), equalTo(0)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } - - static MockResponse eventStreamResponse(String data) { - return new MockResponse() - .setHeader("Content-Type", "text/event-stream") - .setChunkedBody(data, 1000); + + static void testHttpClientCanBeConfiguredToAllowSelfSignedCert(Handler handler, + HttpConfigurationTestAction testActionShouldSucceed) { + logger.warn("testHttpClientCanBeConfiguredToAllowSelfSignedCert"); + try { + ServerTLSConfiguration tlsConfig = ServerTLSConfiguration.makeSelfSignedCertificate(); + HttpConfigurationFactory httpConfig = Components.httpConfiguration() + .sslSocketFactory(tlsConfig.getSocketFactory(), tlsConfig.getTrustManager()); + try (HttpServer secureServer = HttpServer.startSecure(tlsConfig, handler)) { + testActionShouldSucceed.accept(secureServer.getUri(), httpConfig); + assertThat(secureServer.getRecorder().count(), equalTo(1)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } } - - static class ServerWithCert implements Closeable { - final MockWebServer server; - final HeldCertificate cert; - final SSLSocketFactory socketFactory; - final X509TrustManager trustManager; - - public ServerWithCert() throws IOException, GeneralSecurityException { - String hostname = InetAddress.getByName("localhost").getCanonicalHostName(); - - cert = new HeldCertificate.Builder() - .serialNumber(BigInteger.ONE) - .certificateAuthority(1) - .commonName(hostname) - .addSubjectAlternativeName(hostname) - .build(); - HandshakeCertificates hc = TlsUtil.localhost(); - socketFactory = hc.sslSocketFactory(); - trustManager = hc.trustManager(); - - server = new MockWebServer(); - server.useHttps(socketFactory, false); + static void testHttpClientCanUseCustomSocketFactory(Handler handler, + HttpConfigurationTestAction testActionShouldSucceed) { + logger.warn("testHttpClientCanUseCustomSocketFactory"); + try { + try (HttpServer server = HttpServer.start(handler)) { + HttpConfigurationFactory httpConfig = Components.httpConfiguration() + .socketFactory(makeSocketFactorySingleHost(server.getUri().getHost(), server.getPort())); + + URI uriWithWrongPort = URI.create("http://localhost:1"); + testActionShouldSucceed.accept(uriWithWrongPort, httpConfig); + assertThat(server.getRecorder().count(), equalTo(1)); + } + } catch (IOException e) { + throw new RuntimeException(e); } - - public URI uri() { - return server.url("/").uri(); + } + + static void testHttpClientCanUseProxy(Handler handler, + HttpConfigurationTestAction testActionShouldSucceed) { + logger.warn("testHttpClientCanUseProxy"); + try { + try (HttpServer server = HttpServer.start(handler)) { + HttpConfigurationFactory httpConfig = Components.httpConfiguration() + .proxyHostAndPort(server.getUri().getHost(), server.getPort()); + + URI fakeBaseUri = URI.create("http://not-a-real-host"); + testActionShouldSucceed.accept(fakeBaseUri, httpConfig); + assertThat(server.getRecorder().count(), equalTo(1)); + } + } catch (IOException e) { + throw new RuntimeException(e); } - - public void close() throws IOException { - server.close(); + } + + static void testHttpClientCanUseProxyWithBasicAuth(Handler handler, + HttpConfigurationTestAction testActionShouldSucceed) { + logger.warn("testHttpClientCanUseProxyWithBasicAuth"); + Handler proxyHandler = ctx -> { + if (ctx.getRequest().getHeader("Proxy-Authorization") == null) { + ctx.setStatus(407); + ctx.setHeader("Proxy-Authenticate", "Basic realm=x"); + } else { + handler.apply(ctx); + } + }; + try { + try (HttpServer server = HttpServer.start(proxyHandler)) { + HttpConfigurationFactory httpConfig = Components.httpConfiguration() + .proxyHostAndPort(server.getUri().getHost(), server.getPort()) + .proxyAuth(Components.httpBasicAuthentication("user", "pass")); + + URI fakeBaseUri = URI.create("http://not-a-real-host"); + testActionShouldSucceed.accept(fakeBaseUri, httpConfig); + + assertThat(server.getRecorder().count(), equalTo(2)); + RequestInfo req1 = server.getRecorder().requireRequest(); + assertThat(req1.getHeader("Proxy-Authorization"), nullValue()); + RequestInfo req2 = server.getRecorder().requireRequest(); + assertThat(req2.getHeader("Proxy-Authorization"), equalTo("Basic dXNlcjpwYXNz")); + } + } catch (IOException e) { + throw new RuntimeException(e); } } } diff --git a/src/test/java/com/launchdarkly/sdk/server/TestUtil.java b/src/test/java/com/launchdarkly/sdk/server/TestUtil.java index 9c647926a..d63ffe8f6 100644 --- a/src/test/java/com/launchdarkly/sdk/server/TestUtil.java +++ b/src/test/java/com/launchdarkly/sdk/server/TestUtil.java @@ -1,7 +1,10 @@ package com.launchdarkly.sdk.server; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.launchdarkly.sdk.EvaluationReason; import com.launchdarkly.sdk.LDValue; import com.launchdarkly.sdk.LDValueType; @@ -9,6 +12,8 @@ import com.launchdarkly.sdk.server.DataModel.Segment; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; import com.launchdarkly.sdk.server.interfaces.DataStore; +import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind; +import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.FullDataSet; import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.ItemDescriptor; import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent; @@ -37,6 +42,7 @@ import static com.launchdarkly.sdk.server.DataModel.FEATURES; import static com.launchdarkly.sdk.server.DataModel.SEGMENTS; +import static com.launchdarkly.sdk.server.DataStoreTestTypes.toDataMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -161,6 +167,26 @@ public static DataSourceStatusProvider.Status requireDataSourceStatusEventually( }); } + public static void assertDataSetEquals(FullDataSet expected, FullDataSet actual) { + JsonElement expectedJson = TEST_GSON_INSTANCE.toJsonTree(toDataMap(expected)); + JsonElement actualJson = TEST_GSON_INSTANCE.toJsonTree(toDataMap(actual)); + assertEquals(expectedJson, actualJson); + } + + public static String describeDataSet(FullDataSet data) { + return Joiner.on(", ").join( + Iterables.transform(data.getData(), entry -> { + DataKind kind = entry.getKey(); + return "{" + kind + ": [" + + Joiner.on(", ").join( + Iterables.transform(entry.getValue().getItems(), item -> + kind.serialize(item.getValue()) + ) + ) + + "]}"; + })); + } + public static interface ActionCanThrowAnyException { void apply(T param) throws Exception; } diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index 6be0de84e..f27284e9d 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -10,4 +10,7 @@ + + +