Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Support for indirect stream events #25

Merged
merged 6 commits into from
Nov 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {

allprojects {
group = 'com.launchdarkly'
version = "0.14.0"
version = "0.15.0-SNAPSHOT"
sourceCompatibility = 1.6
targetCompatibility = 1.6
}
Expand Down
158 changes: 158 additions & 0 deletions src/main/java/com/launchdarkly/client/FeatureRequestor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package com.launchdarkly.client;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.http.HttpStatus;
import org.apache.http.client.cache.CacheResponseStatus;
import org.apache.http.client.cache.HttpCacheContext;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.cache.CacheConfig;
import org.apache.http.impl.client.cache.CachingHttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Map;

class FeatureRequestor {

private final String apiKey;
private final LDConfig config;
private final CloseableHttpClient client;
private static final Logger logger = LoggerFactory.getLogger(FeatureRequestor.class);

FeatureRequestor(String apiKey, LDConfig config) {
this.apiKey = apiKey;
this.config = config;
this.client = createClient();
}

protected CloseableHttpClient createClient() {
CloseableHttpClient client;
PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
manager.setMaxTotal(100);
manager.setDefaultMaxPerRoute(20);

CacheConfig cacheConfig = CacheConfig.custom()
.setMaxCacheEntries(1000)
.setMaxObjectSize(131072)
.setSharedCache(false)
.build();

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.connectTimeout)
.setSocketTimeout(config.socketTimeout)
.setProxy(config.proxyHost)
.build();
client = CachingHttpClients.custom()
.setCacheConfig(cacheConfig)
.setConnectionManager(manager)
.setDefaultRequestConfig(requestConfig)
.build();
return client;
}

Map<String, FeatureRep<?>> makeAllRequest(boolean latest) throws IOException {
Gson gson = new Gson();
HttpCacheContext context = HttpCacheContext.create();

String resource = latest ? "/api/eval/latest-features" : "/api/eval/features";

HttpGet request = config.getRequest(apiKey, resource);

CloseableHttpResponse response = null;
try {
response = client.execute(request, context);

logCacheResponse(context.getCacheResponseStatus());

handleResponseStatus(response.getStatusLine().getStatusCode(), null);

Type type = new TypeToken<Map<String, FeatureRep<?>>>() {}.getType();

Map<String, FeatureRep<?>> result = gson.fromJson(EntityUtils.toString(response.getEntity()), type);
return result;
}
finally {
try {
if (response != null) response.close();
} catch (IOException e) {
}
}
}

void logCacheResponse(CacheResponseStatus status) {
switch (status) {
case CACHE_HIT:
logger.debug("A response was generated from the cache with " +
"no requests sent upstream");
break;
case CACHE_MODULE_RESPONSE:
logger.debug("The response was generated directly by the " +
"caching module");
break;
case CACHE_MISS:
logger.debug("The response came from an upstream server");
break;
case VALIDATED:
logger.debug("The response was generated from the cache " +
"after validating the entry with the origin server");
break;
}
}

void handleResponseStatus(int status, String featureKey) throws IOException {

if (status != HttpStatus.SC_OK) {
if (status == HttpStatus.SC_UNAUTHORIZED) {
logger.error("Invalid API key");
} else if (status == HttpStatus.SC_NOT_FOUND) {
if (featureKey != null) {
logger.error("Unknown feature key: " + featureKey);
}
else {
logger.error("Resource not found");
}
} else {
logger.error("Unexpected status code: " + status);
}
throw new IOException("Failed to fetch flag");
}

}

<T> FeatureRep<T> makeRequest(String featureKey, boolean latest) throws IOException {
Gson gson = new Gson();
HttpCacheContext context = HttpCacheContext.create();

String resource = latest ? "/api/eval/latest-features/" : "/api/eval/features/";

HttpGet request = config.getRequest(apiKey,resource + featureKey);

CloseableHttpResponse response = null;
try {
response = client.execute(request, context);

logCacheResponse(context.getCacheResponseStatus());

handleResponseStatus(response.getStatusLine().getStatusCode(), featureKey);

Type type = new TypeToken<FeatureRep<T>>() {}.getType();

FeatureRep<T> result = gson.fromJson(EntityUtils.toString(response.getEntity()), type);
return result;
}
finally {
try {
if (response != null) response.close();
} catch (IOException e) {
}
}
}
}
117 changes: 14 additions & 103 deletions src/main/java/com/launchdarkly/client/LDClient.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,17 @@
package com.launchdarkly.client;


import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.reflect.TypeToken;
import org.apache.http.HttpStatus;
import org.apache.http.annotation.ThreadSafe;
import org.apache.http.client.cache.CacheResponseStatus;
import org.apache.http.client.cache.HttpCacheContext;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.cache.CacheConfig;
import org.apache.http.impl.client.cache.CachingHttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
import java.util.jar.Attributes;
import java.util.jar.Manifest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* A client for the LaunchDarkly API. Client instances are thread-safe. Applications should instantiate
Expand All @@ -37,10 +22,9 @@
public class LDClient implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(LDClient.class);
private final LDConfig config;
private final CloseableHttpClient client;
private final FeatureRequestor requestor;
private final EventProcessor eventProcessor;
private final StreamProcessor streamProcessor;
private final String apiKey;
protected static final String CLIENT_VERSION = getClientVersion();
private volatile boolean offline = false;

Expand All @@ -63,53 +47,32 @@ public LDClient(String apiKey) {
* @param config a client configuration object
*/
public LDClient(String apiKey, LDConfig config) {
this.apiKey = apiKey;
this.config = config;
this.client = createClient();
this.requestor = createFeatureRequestor(apiKey, config);
this.eventProcessor = createEventProcessor(apiKey, config);

if (config.stream) {
logger.debug("Enabling streaming API");
this.streamProcessor = createStreamProcessor(apiKey, config);
this.streamProcessor = createStreamProcessor(apiKey, config, requestor);
this.streamProcessor.subscribe();
} else {
logger.debug("Streaming API disabled");
this.streamProcessor = null;
}
}

protected FeatureRequestor createFeatureRequestor(String apiKey, LDConfig config) {
return new FeatureRequestor(apiKey, config);
}

protected EventProcessor createEventProcessor(String apiKey, LDConfig config) {
return new EventProcessor(apiKey, config);
}

protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config) {
return new StreamProcessor(apiKey, config);
protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) {
return new StreamProcessor(apiKey, config, requestor);
}

protected CloseableHttpClient createClient() {
CloseableHttpClient client;
PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
manager.setMaxTotal(100);
manager.setDefaultMaxPerRoute(20);

CacheConfig cacheConfig = CacheConfig.custom()
.setMaxCacheEntries(1000)
.setMaxObjectSize(8192)
.setSharedCache(false)
.build();

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.connectTimeout)
.setSocketTimeout(config.socketTimeout)
.setProxy(config.proxyHost)
.build();
client = CachingHttpClients.custom()
.setCacheConfig(cacheConfig)
.setConnectionManager(manager)
.setDefaultRequestConfig(requestConfig)
.build();
return client;
}

/**
* Tracks that a user performed an event.
Expand Down Expand Up @@ -191,13 +154,14 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) {
logger.debug("Using feature flag stored from streaming API");
result = (FeatureRep<Boolean>) this.streamProcessor.getFeature(featureKey);
if (config.debugStreaming) {
FeatureRep<Boolean> pollingResult = fetchFeature(featureKey);
FeatureRep<Boolean> pollingResult = requestor.makeRequest(featureKey, true);
if (!result.equals(pollingResult)) {
logger.warn("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult);
}
}
} else {
result = fetchFeature(featureKey);
// If streaming is enabled, always get the latest version of the feature while polling
result = requestor.makeRequest(featureKey, this.config.stream);
}
if (result == null) {
logger.warn("Unknown feature flag " + featureKey + "; returning default value");
Expand All @@ -221,60 +185,7 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) {
}
}

private FeatureRep<Boolean> fetchFeature(String featureKey) throws IOException {
Gson gson = new Gson();
HttpCacheContext context = HttpCacheContext.create();
HttpGet request = config.getRequest(apiKey, "/api/eval/features/" + featureKey);

CloseableHttpResponse response = null;
try {
response = client.execute(request, context);

CacheResponseStatus responseStatus = context.getCacheResponseStatus();

switch (responseStatus) {
case CACHE_HIT:
logger.debug("A response was generated from the cache with " +
"no requests sent upstream");
break;
case CACHE_MODULE_RESPONSE:
logger.debug("The response was generated directly by the " +
"caching module");
break;
case CACHE_MISS:
logger.debug("The response came from an upstream server");
break;
case VALIDATED:
logger.debug("The response was generated from the cache " +
"after validating the entry with the origin server");
break;
}

int status = response.getStatusLine().getStatusCode();

if (status != HttpStatus.SC_OK) {
if (status == HttpStatus.SC_UNAUTHORIZED) {
logger.error("Invalid API key");
} else if (status == HttpStatus.SC_NOT_FOUND) {
logger.error("Unknown feature key: " + featureKey);
} else {
logger.error("Unexpected status code: " + status);
}
throw new IOException("Failed to fetch flag");
}

Type boolType = new TypeToken<FeatureRep<Boolean>>() {}.getType();

FeatureRep<Boolean> result = gson.fromJson(EntityUtils.toString(response.getEntity()), boolType);
return result;
}
finally {
try {
if (response != null) response.close();
} catch (IOException e) {
}
}
}

/**
* Closes the LaunchDarkly client event processing thread and flushes all pending events. This should only
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/com/launchdarkly/client/LDUser.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ public Builder email(String email) {
* @return the builder
*/
public Builder custom(String k, String v) {
custom.put(k, new JsonPrimitive(v));
if (key != null && v != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated bug fixes

custom.put(k, new JsonPrimitive(v));
}
return this;
}

Expand All @@ -265,7 +267,9 @@ public Builder custom(String k, String v) {
* @return the builder
*/
public Builder custom(String k, Number n) {
custom.put(k, new JsonPrimitive(n));
if (key != null && n != null) {
custom.put(k, new JsonPrimitive(n));
}
return this;
}

Expand All @@ -276,7 +280,9 @@ public Builder custom(String k, Number n) {
* @return the builder
*/
public Builder custom(String k, Boolean b) {
custom.put(k, new JsonPrimitive(b));
if (key != null && b != null) {
custom.put(k, new JsonPrimitive(b));
}
return this;
}

Expand All @@ -289,7 +295,9 @@ public Builder custom(String k, Boolean b) {
public Builder custom(String k, List<String> vs) {
JsonArray array = new JsonArray();
for (String v : vs) {
array.add(new JsonPrimitive(v));
if (v != null) {
array.add(new JsonPrimitive(v));
}
}
custom.put(k, array);
return this;
Expand Down
Loading