Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #25 from launchdarkly/jko/indirect-stream
Browse files Browse the repository at this point in the history
Support for indirect stream events
  • Loading branch information
jkodumal committed Nov 5, 2015
2 parents 6a8c04f + a3dbb7b commit 6c1d9b0
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 121 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {

allprojects {
group = 'com.launchdarkly'
version = "0.14.0"
version = "0.15.0-SNAPSHOT"
sourceCompatibility = 1.6
targetCompatibility = 1.6
}
Expand Down
158 changes: 158 additions & 0 deletions src/main/java/com/launchdarkly/client/FeatureRequestor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package com.launchdarkly.client;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.http.HttpStatus;
import org.apache.http.client.cache.CacheResponseStatus;
import org.apache.http.client.cache.HttpCacheContext;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.cache.CacheConfig;
import org.apache.http.impl.client.cache.CachingHttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Map;

class FeatureRequestor {

private final String apiKey;
private final LDConfig config;
private final CloseableHttpClient client;
private static final Logger logger = LoggerFactory.getLogger(FeatureRequestor.class);

FeatureRequestor(String apiKey, LDConfig config) {
this.apiKey = apiKey;
this.config = config;
this.client = createClient();
}

protected CloseableHttpClient createClient() {
CloseableHttpClient client;
PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
manager.setMaxTotal(100);
manager.setDefaultMaxPerRoute(20);

CacheConfig cacheConfig = CacheConfig.custom()
.setMaxCacheEntries(1000)
.setMaxObjectSize(131072)
.setSharedCache(false)
.build();

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.connectTimeout)
.setSocketTimeout(config.socketTimeout)
.setProxy(config.proxyHost)
.build();
client = CachingHttpClients.custom()
.setCacheConfig(cacheConfig)
.setConnectionManager(manager)
.setDefaultRequestConfig(requestConfig)
.build();
return client;
}

Map<String, FeatureRep<?>> makeAllRequest(boolean latest) throws IOException {
Gson gson = new Gson();
HttpCacheContext context = HttpCacheContext.create();

String resource = latest ? "/api/eval/latest-features" : "/api/eval/features";

HttpGet request = config.getRequest(apiKey, resource);

CloseableHttpResponse response = null;
try {
response = client.execute(request, context);

logCacheResponse(context.getCacheResponseStatus());

handleResponseStatus(response.getStatusLine().getStatusCode(), null);

Type type = new TypeToken<Map<String, FeatureRep<?>>>() {}.getType();

Map<String, FeatureRep<?>> result = gson.fromJson(EntityUtils.toString(response.getEntity()), type);
return result;
}
finally {
try {
if (response != null) response.close();
} catch (IOException e) {
}
}
}

void logCacheResponse(CacheResponseStatus status) {
switch (status) {
case CACHE_HIT:
logger.debug("A response was generated from the cache with " +
"no requests sent upstream");
break;
case CACHE_MODULE_RESPONSE:
logger.debug("The response was generated directly by the " +
"caching module");
break;
case CACHE_MISS:
logger.debug("The response came from an upstream server");
break;
case VALIDATED:
logger.debug("The response was generated from the cache " +
"after validating the entry with the origin server");
break;
}
}

void handleResponseStatus(int status, String featureKey) throws IOException {

if (status != HttpStatus.SC_OK) {
if (status == HttpStatus.SC_UNAUTHORIZED) {
logger.error("Invalid API key");
} else if (status == HttpStatus.SC_NOT_FOUND) {
if (featureKey != null) {
logger.error("Unknown feature key: " + featureKey);
}
else {
logger.error("Resource not found");
}
} else {
logger.error("Unexpected status code: " + status);
}
throw new IOException("Failed to fetch flag");
}

}

<T> FeatureRep<T> makeRequest(String featureKey, boolean latest) throws IOException {
Gson gson = new Gson();
HttpCacheContext context = HttpCacheContext.create();

String resource = latest ? "/api/eval/latest-features/" : "/api/eval/features/";

HttpGet request = config.getRequest(apiKey,resource + featureKey);

CloseableHttpResponse response = null;
try {
response = client.execute(request, context);

logCacheResponse(context.getCacheResponseStatus());

handleResponseStatus(response.getStatusLine().getStatusCode(), featureKey);

Type type = new TypeToken<FeatureRep<T>>() {}.getType();

FeatureRep<T> result = gson.fromJson(EntityUtils.toString(response.getEntity()), type);
return result;
}
finally {
try {
if (response != null) response.close();
} catch (IOException e) {
}
}
}
}
117 changes: 14 additions & 103 deletions src/main/java/com/launchdarkly/client/LDClient.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,17 @@
package com.launchdarkly.client;


import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.reflect.TypeToken;
import org.apache.http.HttpStatus;
import org.apache.http.annotation.ThreadSafe;
import org.apache.http.client.cache.CacheResponseStatus;
import org.apache.http.client.cache.HttpCacheContext;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.cache.CacheConfig;
import org.apache.http.impl.client.cache.CachingHttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
import java.util.jar.Attributes;
import java.util.jar.Manifest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* A client for the LaunchDarkly API. Client instances are thread-safe. Applications should instantiate
Expand All @@ -37,10 +22,9 @@
public class LDClient implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(LDClient.class);
private final LDConfig config;
private final CloseableHttpClient client;
private final FeatureRequestor requestor;
private final EventProcessor eventProcessor;
private final StreamProcessor streamProcessor;
private final String apiKey;
protected static final String CLIENT_VERSION = getClientVersion();
private volatile boolean offline = false;

Expand All @@ -63,53 +47,32 @@ public LDClient(String apiKey) {
* @param config a client configuration object
*/
public LDClient(String apiKey, LDConfig config) {
this.apiKey = apiKey;
this.config = config;
this.client = createClient();
this.requestor = createFeatureRequestor(apiKey, config);
this.eventProcessor = createEventProcessor(apiKey, config);

if (config.stream) {
logger.debug("Enabling streaming API");
this.streamProcessor = createStreamProcessor(apiKey, config);
this.streamProcessor = createStreamProcessor(apiKey, config, requestor);
this.streamProcessor.subscribe();
} else {
logger.debug("Streaming API disabled");
this.streamProcessor = null;
}
}

protected FeatureRequestor createFeatureRequestor(String apiKey, LDConfig config) {
return new FeatureRequestor(apiKey, config);
}

protected EventProcessor createEventProcessor(String apiKey, LDConfig config) {
return new EventProcessor(apiKey, config);
}

protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config) {
return new StreamProcessor(apiKey, config);
protected StreamProcessor createStreamProcessor(String apiKey, LDConfig config, FeatureRequestor requestor) {
return new StreamProcessor(apiKey, config, requestor);
}

protected CloseableHttpClient createClient() {
CloseableHttpClient client;
PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
manager.setMaxTotal(100);
manager.setDefaultMaxPerRoute(20);

CacheConfig cacheConfig = CacheConfig.custom()
.setMaxCacheEntries(1000)
.setMaxObjectSize(8192)
.setSharedCache(false)
.build();

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(config.connectTimeout)
.setSocketTimeout(config.socketTimeout)
.setProxy(config.proxyHost)
.build();
client = CachingHttpClients.custom()
.setCacheConfig(cacheConfig)
.setConnectionManager(manager)
.setDefaultRequestConfig(requestConfig)
.build();
return client;
}

/**
* Tracks that a user performed an event.
Expand Down Expand Up @@ -191,13 +154,14 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) {
logger.debug("Using feature flag stored from streaming API");
result = (FeatureRep<Boolean>) this.streamProcessor.getFeature(featureKey);
if (config.debugStreaming) {
FeatureRep<Boolean> pollingResult = fetchFeature(featureKey);
FeatureRep<Boolean> pollingResult = requestor.makeRequest(featureKey, true);
if (!result.equals(pollingResult)) {
logger.warn("Mismatch between streaming and polling feature! Streaming: {} Polling: {}", result, pollingResult);
}
}
} else {
result = fetchFeature(featureKey);
// If streaming is enabled, always get the latest version of the feature while polling
result = requestor.makeRequest(featureKey, this.config.stream);
}
if (result == null) {
logger.warn("Unknown feature flag " + featureKey + "; returning default value");
Expand All @@ -221,60 +185,7 @@ public boolean toggle(String featureKey, LDUser user, boolean defaultValue) {
}
}

private FeatureRep<Boolean> fetchFeature(String featureKey) throws IOException {
Gson gson = new Gson();
HttpCacheContext context = HttpCacheContext.create();
HttpGet request = config.getRequest(apiKey, "/api/eval/features/" + featureKey);

CloseableHttpResponse response = null;
try {
response = client.execute(request, context);

CacheResponseStatus responseStatus = context.getCacheResponseStatus();

switch (responseStatus) {
case CACHE_HIT:
logger.debug("A response was generated from the cache with " +
"no requests sent upstream");
break;
case CACHE_MODULE_RESPONSE:
logger.debug("The response was generated directly by the " +
"caching module");
break;
case CACHE_MISS:
logger.debug("The response came from an upstream server");
break;
case VALIDATED:
logger.debug("The response was generated from the cache " +
"after validating the entry with the origin server");
break;
}

int status = response.getStatusLine().getStatusCode();

if (status != HttpStatus.SC_OK) {
if (status == HttpStatus.SC_UNAUTHORIZED) {
logger.error("Invalid API key");
} else if (status == HttpStatus.SC_NOT_FOUND) {
logger.error("Unknown feature key: " + featureKey);
} else {
logger.error("Unexpected status code: " + status);
}
throw new IOException("Failed to fetch flag");
}

Type boolType = new TypeToken<FeatureRep<Boolean>>() {}.getType();

FeatureRep<Boolean> result = gson.fromJson(EntityUtils.toString(response.getEntity()), boolType);
return result;
}
finally {
try {
if (response != null) response.close();
} catch (IOException e) {
}
}
}

/**
* Closes the LaunchDarkly client event processing thread and flushes all pending events. This should only
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/com/launchdarkly/client/LDUser.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ public Builder email(String email) {
* @return the builder
*/
public Builder custom(String k, String v) {
custom.put(k, new JsonPrimitive(v));
if (key != null && v != null) {
custom.put(k, new JsonPrimitive(v));
}
return this;
}

Expand All @@ -265,7 +267,9 @@ public Builder custom(String k, String v) {
* @return the builder
*/
public Builder custom(String k, Number n) {
custom.put(k, new JsonPrimitive(n));
if (key != null && n != null) {
custom.put(k, new JsonPrimitive(n));
}
return this;
}

Expand All @@ -276,7 +280,9 @@ public Builder custom(String k, Number n) {
* @return the builder
*/
public Builder custom(String k, Boolean b) {
custom.put(k, new JsonPrimitive(b));
if (key != null && b != null) {
custom.put(k, new JsonPrimitive(b));
}
return this;
}

Expand All @@ -289,7 +295,9 @@ public Builder custom(String k, Boolean b) {
public Builder custom(String k, List<String> vs) {
JsonArray array = new JsonArray();
for (String v : vs) {
array.add(new JsonPrimitive(v));
if (v != null) {
array.add(new JsonPrimitive(v));
}
}
custom.put(k, array);
return this;
Expand Down
Loading

0 comments on commit 6c1d9b0

Please sign in to comment.