Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
prepare 5.0.2 release (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
LaunchDarklyCI authored Jun 25, 2020
1 parent e098c4b commit 37af42c
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 465 deletions.
22 changes: 2 additions & 20 deletions src/main/java/com/launchdarkly/sdk/server/ComponentsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,9 @@ public DataSource createDataSource(ClientContext context, DataSourceUpdates data
Loggers.DATA_SOURCE.info("Enabling streaming API");

URI streamUri = baseURI == null ? LDConfig.DEFAULT_STREAM_URI : baseURI;
URI pollUri;
if (pollingBaseURI != null) {
pollUri = pollingBaseURI;
} else {
// If they have set a custom base URI, and they did *not* set a custom polling URI, then we can
// assume they're using Relay in which case both of those values are the same.
pollUri = baseURI == null ? LDConfig.DEFAULT_BASE_URI : baseURI;
}

DefaultFeatureRequestor requestor = new DefaultFeatureRequestor(
context.getHttp(),
pollUri,
false
);

return new StreamProcessor(
context.getHttp(),
requestor,
dataSourceUpdates,
null,
context.getBasic().getThreadPriority(),
Expand All @@ -170,9 +155,7 @@ public DataSource createDataSource(ClientContext context, DataSourceUpdates data
public LDValue describeConfiguration(BasicConfiguration basicConfiguration) {
return LDValue.buildObject()
.put(ConfigProperty.STREAMING_DISABLED.name, false)
.put(ConfigProperty.CUSTOM_BASE_URI.name,
(pollingBaseURI != null && !pollingBaseURI.equals(LDConfig.DEFAULT_BASE_URI)) ||
(pollingBaseURI == null && baseURI != null && !baseURI.equals(LDConfig.DEFAULT_STREAM_URI)))
.put(ConfigProperty.CUSTOM_BASE_URI.name, false)
.put(ConfigProperty.CUSTOM_STREAM_URI.name,
baseURI != null && !baseURI.equals(LDConfig.DEFAULT_STREAM_URI))
.put(ConfigProperty.RECONNECT_TIME_MILLIS.name, initialReconnectDelay.toMillis())
Expand All @@ -191,8 +174,7 @@ public DataSource createDataSource(ClientContext context, DataSourceUpdates data

DefaultFeatureRequestor requestor = new DefaultFeatureRequestor(
context.getHttp(),
baseURI == null ? LDConfig.DEFAULT_BASE_URI : baseURI,
true
baseURI == null ? LDConfig.DEFAULT_BASE_URI : baseURI
);
return new PollingProcessor(
requestor,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.launchdarkly.sdk.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import com.launchdarkly.sdk.server.interfaces.HttpConfiguration;
import com.launchdarkly.sdk.server.interfaces.SerializationException;

import org.slf4j.Logger;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;

import static com.launchdarkly.sdk.server.Util.configureHttpClientBuilder;
import static com.launchdarkly.sdk.server.Util.getHeadersBuilderFor;
Expand All @@ -26,77 +26,67 @@
*/
final class DefaultFeatureRequestor implements FeatureRequestor {
private static final Logger logger = Loggers.DATA_SOURCE;
private static final String GET_LATEST_FLAGS_PATH = "/sdk/latest-flags";
private static final String GET_LATEST_SEGMENTS_PATH = "/sdk/latest-segments";
private static final String GET_LATEST_ALL_PATH = "/sdk/latest-all";
private static final long MAX_HTTP_CACHE_SIZE_BYTES = 10 * 1024 * 1024; // 10 MB

@VisibleForTesting final URI baseUri;
private final OkHttpClient httpClient;
private final URI pollingUri;
private final Headers headers;
private final boolean useCache;
private final Path cacheDir;

DefaultFeatureRequestor(HttpConfiguration httpConfig, URI baseUri, boolean useCache) {
DefaultFeatureRequestor(HttpConfiguration httpConfig, URI baseUri) {
this.baseUri = baseUri;
this.useCache = useCache;
this.pollingUri = baseUri.resolve(GET_LATEST_ALL_PATH);

OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();
configureHttpClientBuilder(httpConfig, httpBuilder);
this.headers = getHeadersBuilderFor(httpConfig).build();

// HTTP caching is used only for FeatureRequestor. However, when streaming is enabled, HTTP GETs
// made by FeatureRequester will always guarantee a new flag state, so we disable the cache.
if (useCache) {
File cacheDir = Files.createTempDir();
Cache cache = new Cache(cacheDir, MAX_HTTP_CACHE_SIZE_BYTES);
httpBuilder.cache(cache);
try {
cacheDir = Files.createTempDirectory("LaunchDarklySDK");
} catch (IOException e) {
throw new RuntimeException("unable to create cache directory for polling", e);
}
Cache cache = new Cache(cacheDir.toFile(), MAX_HTTP_CACHE_SIZE_BYTES);
httpBuilder.cache(cache);

httpClient = httpBuilder.build();
}

public void close() {
shutdownHttpClient(httpClient);
Util.deleteDirectory(cacheDir);
}

public DataModel.FeatureFlag getFlag(String featureKey) throws IOException, HttpErrorException, SerializationException {
String body = get(GET_LATEST_FLAGS_PATH + "/" + featureKey);
return JsonHelpers.deserialize(body, DataModel.FeatureFlag.class);
}

public DataModel.Segment getSegment(String segmentKey) throws IOException, HttpErrorException, SerializationException {
String body = get(GET_LATEST_SEGMENTS_PATH + "/" + segmentKey);
return JsonHelpers.deserialize(body, DataModel.Segment.class);
}

public AllData getAllData() throws IOException, HttpErrorException, SerializationException {
String body = get(GET_LATEST_ALL_PATH);
return JsonHelpers.deserialize(body, AllData.class);
}

private String get(String path) throws IOException, HttpErrorException {
public AllData getAllData(boolean returnDataEvenIfCached) throws IOException, HttpErrorException, SerializationException {
Request request = new Request.Builder()
.url(baseUri.resolve(path).toURL())
.url(pollingUri.toURL())
.headers(headers)
.get()
.build();

logger.debug("Making request: " + request);

try (Response response = httpClient.newCall(request).execute()) {
boolean wasCached = response.networkResponse() == null || response.networkResponse().code() == 304;
if (wasCached && !returnDataEvenIfCached) {
logger.debug("Get flag(s) got cached response, will not parse");
logger.debug("Cache hit count: " + httpClient.cache().hitCount() + " Cache network Count: " + httpClient.cache().networkCount());
return null;
}

String body = response.body().string();

if (!response.isSuccessful()) {
throw new HttpErrorException(response.code());
}
logger.debug("Get flag(s) response: " + response.toString() + " with body: " + body);
logger.debug("Network response: " + response.networkResponse());
if (useCache) {
logger.debug("Cache hit count: " + httpClient.cache().hitCount() + " Cache network Count: " + httpClient.cache().networkCount());
logger.debug("Cache response: " + response.cacheResponse());
}

return body;
logger.debug("Cache hit count: " + httpClient.cache().hitCount() + " Cache network Count: " + httpClient.cache().networkCount());
logger.debug("Cache response: " + response.cacheResponse());

return JsonHelpers.deserialize(body, AllData.class);
}
}
}
}
20 changes: 15 additions & 5 deletions src/main/java/com/launchdarkly/sdk/server/FeatureRequestor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@
import static com.launchdarkly.sdk.server.DataModel.FEATURES;
import static com.launchdarkly.sdk.server.DataModel.SEGMENTS;

/**
* Internal abstraction for polling requests. Currently this is only used by PollingProcessor, and
* the only implementation is DefaultFeatureRequestor, but using an interface allows us to mock out
* the HTTP behavior and test the rest of PollingProcessor separately.
*/
interface FeatureRequestor extends Closeable {
DataModel.FeatureFlag getFlag(String featureKey) throws IOException, HttpErrorException;

DataModel.Segment getSegment(String segmentKey) throws IOException, HttpErrorException;

AllData getAllData() throws IOException, HttpErrorException;
/**
* Makes a request to the LaunchDarkly server-side SDK polling endpoint,
*
* @param returnDataEvenIfCached true if the method should return non-nil data no matter what;
* false if it should return {@code null} when the latest data is already in the cache
* @return the data, or {@code null} as above
* @throws IOException for network errors
* @throws HttpErrorException for HTTP error responses
*/
AllData getAllData(boolean returnDataEvenIfCached) throws IOException, HttpErrorException;

static class AllData {
final Map<String, DataModel.FeatureFlag> flags;
Expand Down
27 changes: 16 additions & 11 deletions src/main/java/com/launchdarkly/sdk/server/PollingProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,23 @@ public Future<Void> start() {
}

private void poll() {
FeatureRequestor.AllData allData = null;

try {
allData = requestor.getAllData();
// If we already obtained data earlier, and the poll request returns a cached response, then we don't
// want to bother parsing the data or reinitializing the data store. But if we never succeeded in
// storing any data, then we would still want to parse and try to store it even if it's cached.
boolean alreadyInited = initialized.get();
FeatureRequestor.AllData allData = requestor.getAllData(!alreadyInited);
if (allData == null) {
// This means it was cached, and alreadyInited was true
dataSourceUpdates.updateStatus(State.VALID, null);
} else {
if (dataSourceUpdates.init(allData.toFullDataSet())) {
dataSourceUpdates.updateStatus(State.VALID, null);
logger.info("Initialized LaunchDarkly client.");
initialized.getAndSet(true);
initFuture.complete(null);
}
}
} catch (HttpErrorException e) {
ErrorInfo errorInfo = ErrorInfo.fromHttpError(e.getStatus());
boolean recoverable = checkIfErrorIsRecoverableAndLog(logger, httpErrorDescription(e.getStatus()),
Expand All @@ -109,13 +122,5 @@ private void poll() {
logger.debug(e.toString(), e);
dataSourceUpdates.updateStatus(State.INTERRUPTED, ErrorInfo.fromException(ErrorKind.UNKNOWN, e));
}

if (allData != null && dataSourceUpdates.init(allData.toFullDataSet())) {
if (!initialized.getAndSet(true)) {
logger.info("Initialized LaunchDarkly client.");
dataSourceUpdates.updateStatus(State.VALID, null);
initFuture.complete(null);
}
}
}
}
49 changes: 0 additions & 49 deletions src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ final class StreamProcessor implements DataSource {
private static final String PUT = "put";
private static final String PATCH = "patch";
private static final String DELETE = "delete";
private static final String INDIRECT_PUT = "indirect/put";
private static final String INDIRECT_PATCH = "indirect/patch";
private static final Logger logger = Loggers.DATA_SOURCE;
private static final Duration DEAD_CONNECTION_INTERVAL = Duration.ofSeconds(300);
private static final String ERROR_CONTEXT_MESSAGE = "in stream connection";
Expand All @@ -83,7 +81,6 @@ final class StreamProcessor implements DataSource {
private final Headers headers;
@VisibleForTesting final URI streamUri;
@VisibleForTesting final Duration initialReconnectDelay;
@VisibleForTesting final FeatureRequestor requestor;
private final DiagnosticAccumulator diagnosticAccumulator;
private final EventSourceCreator eventSourceCreator;
private final int threadPriority;
Expand Down Expand Up @@ -121,7 +118,6 @@ static interface EventSourceCreator {

StreamProcessor(
HttpConfiguration httpConfig,
FeatureRequestor requestor,
DataSourceUpdates dataSourceUpdates,
EventSourceCreator eventSourceCreator,
int threadPriority,
Expand All @@ -131,7 +127,6 @@ static interface EventSourceCreator {
) {
this.dataSourceUpdates = dataSourceUpdates;
this.httpConfig = httpConfig;
this.requestor = requestor;
this.diagnosticAccumulator = diagnosticAccumulator;
this.eventSourceCreator = eventSourceCreator != null ? eventSourceCreator : this::defaultEventSourceCreator;
this.threadPriority = threadPriority;
Expand Down Expand Up @@ -232,7 +227,6 @@ public void close() throws IOException {
if (es != null) {
es.close();
}
requestor.close();
dataSourceUpdates.updateStatus(State.OFF, null);
}

Expand Down Expand Up @@ -271,14 +265,6 @@ public void onMessage(String name, MessageEvent event) throws Exception {
case DELETE:
handleDelete(event.getData());
break;

case INDIRECT_PUT:
handleIndirectPut();
break;

case INDIRECT_PATCH:
handleIndirectPatch(event.getData());
break;

default:
logger.warn("Unexpected event found in stream: " + name);
Expand Down Expand Up @@ -356,41 +342,6 @@ private void handleDelete(String eventData) throws StreamInputException, StreamS
}
}

private void handleIndirectPut() throws StreamInputException, StreamStoreException {
FeatureRequestor.AllData putData;
try {
putData = requestor.getAllData();
} catch (Exception e) {
throw new StreamInputException(e);
}
FullDataSet<ItemDescriptor> allData = putData.toFullDataSet();
if (!dataSourceUpdates.init(allData)) {
throw new StreamStoreException();
}
if (!initialized.getAndSet(true)) {
initFuture.complete(null);
logger.info("Initialized LaunchDarkly client.");
}
}

private void handleIndirectPatch(String path) throws StreamInputException, StreamStoreException {
Map.Entry<DataKind, String> kindAndKey = getKindAndKeyFromStreamApiPath(path);
DataKind kind = kindAndKey.getKey();
String key = kindAndKey.getValue();
VersionedData item;
try {
item = kind == SEGMENTS ? requestor.getSegment(key) : requestor.getFlag(key);
} catch (Exception e) {
throw new StreamInputException(e);
// In this case, StreamInputException doesn't necessarily represent malformed data from the service - it
// could be that the request to the polling endpoint failed in some other way. But either way, we must
// assume that we did not get valid data from LD so we have missed an update.
}
if (!dataSourceUpdates.upsert(kind, key, new ItemDescriptor(item.getVersion(), item))) {
throw new StreamStoreException();
}
}

@Override
public void onComment(String comment) {
logger.debug("Received a heartbeat");
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/launchdarkly/sdk/server/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -150,4 +155,26 @@ static String describeDuration(Duration d) {
}
return d.toMillis() + " milliseconds";
}

static void deleteDirectory(Path path) {
try {
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
try {
Files.delete(file);
} catch (IOException e) {}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
try {
Files.delete(dir);
} catch (IOException e) {}
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {}
}
}
Loading

0 comments on commit 37af42c

Please sign in to comment.