Skip to content

Commit

Permalink
feat: SSE support
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitFicus committed Apr 18, 2024
1 parent 9e3b602 commit b406d8d
Show file tree
Hide file tree
Showing 16 changed files with 2,091 additions and 233 deletions.
36 changes: 36 additions & 0 deletions src/main/java/fr/maif/FeatureCacheConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,21 @@ public class FeatureCacheConfiguration {
*/
public final Duration refreshInterval;

/**
* Wether izanami client will received feature updates from remote server via SSE.
*/
public final boolean useServerSentEvent;

/**
* Maximum time between two Izanami heartbeats on SSE connection. Used only when {@link FeatureCacheConfiguration#useServerSentEvent} is true.
*/
public final Duration serverSentEventKeepAliveInterval;

private FeatureCacheConfiguration(Builder builder) {
enabled = builder.enabled;
useServerSentEvent = builder.useServerSentEvent;
refreshInterval = builder.refreshInterval;
serverSentEventKeepAliveInterval = builder.serverSentEventKeepAliveInterval;
}

public static Builder newBuilder() {
Expand All @@ -27,6 +39,8 @@ public static Builder newBuilder() {
public static final class Builder {
private boolean enabled = false;
private Duration refreshInterval = Duration.ofMinutes(10L);
private boolean useServerSentEvent = false;
public Duration serverSentEventKeepAliveInterval = Duration.ofSeconds(25L);

private Builder() {
}
Expand All @@ -49,6 +63,28 @@ public Builder withRefreshInterval(Duration val) {
return this;
}

/**
* @param val wether client should use SSE instead of polling to keep cache up to date. When using SSE,
* Izanami client will keep an http connection opened with Izanami backend, and get notified as soon
* as a feature is created / updated / deleted.
* @return updated builder
*/
public Builder shouldUseServerSentEvent(boolean val) {
this.useServerSentEvent = val;
return this;
}

/**
* @param val wether client should use SSE instead of polling to keep cache up to date. When using SSE,
* Izanami client will keep an http connection opened with Izanami backend, and get notified as soon
* as a feature is created / updated / deleted.
* @return updated builder
*/
public Builder withServerSentEventKeepAliveInterval(Duration val) {
this.serverSentEventKeepAliveInterval = val;
return this;
}

/**
* Build actual cache configuration
* @return a new FeatureCacheConfiguration with this builder values
Expand Down
39 changes: 31 additions & 8 deletions src/main/java/fr/maif/IzanamiClient.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package fr.maif;

import fr.maif.http.IzanamiHttpClient;
import fr.maif.requests.FeatureRequest;
import fr.maif.requests.FeatureService;
import fr.maif.requests.IzanamiConnectionInformation;
import fr.maif.requests.SingleFeatureRequest;
import fr.maif.requests.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
Expand All @@ -16,6 +15,7 @@
* This should be instantiated only once by application.
*/
public class IzanamiClient {
private static final Logger LOGGER = LoggerFactory.getLogger(IzanamiClient.class);
private final ClientConfiguration configuration;
private final FeatureService featureService;
private CompletableFuture<Void> loader;
Expand All @@ -36,12 +36,35 @@ public IzanamiClient(
duration.orElse(Duration.ofSeconds(10L))
);

this.featureService = new FeatureService(configuration);
if(this.configuration.cacheConfiguration.useServerSentEvent) {
LOGGER.info("Izanami client will use SSE to keep in sync");
var service = new SSEFeatureService(configuration);
if(Objects.nonNull(idsToPreload) && !idsToPreload.isEmpty()) {
this.loader = service.featureStates(FeatureRequest.newFeatureRequest().withFeatures(idsToPreload)).thenApply(osef -> null);
} else {
this.loader = CompletableFuture.completedFuture(null);
}
this.featureService = service;
} else {
if(configuration.cacheConfiguration.enabled) {
LOGGER.info("Izanami client will use polling to keep in sync");
} else {
LOGGER.info("Cache is disabled, Izanami client will query remote instance every time");
}
this.featureService = new FetchFeatureService(configuration);
if(Objects.nonNull(idsToPreload) && !idsToPreload.isEmpty()) {
this.loader = featureService.featureStates(FeatureRequest.newFeatureRequest().withFeatures(idsToPreload)).thenAccept(v -> {});
} else {
this.loader = CompletableFuture.completedFuture(null);
}
}
}

if(Objects.nonNull(idsToPreload) && !idsToPreload.isEmpty()) {
this.loader = featureService.featureStates(FeatureRequest.newFeatureRequest().withFeatures(idsToPreload)).thenAccept(v -> {});
public CompletableFuture<Void> close() {
if(this.featureService instanceof SSEFeatureService) {
return ((SSEFeatureService)this.featureService).disconnect();
} else {
this.loader = CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(null);
}
}

Expand Down
19 changes: 13 additions & 6 deletions src/main/java/fr/maif/http/HttpRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
public final class HttpRequester {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequester.class);

static String url(ClientConfiguration configuration, FeatureRequest request) {
var url = configuration.connectionInformation.url + "/v2/features";

public static TreeMap<String, String> queryParametersAsMap(FeatureRequest request) {
var maybeFeatures = request.getFeatures().stream().sorted(String::compareTo).collect(Collectors.joining(","));

var params = new TreeMap<>();
params.put("conditions", true);
var params = new TreeMap<String, String>();
params.put("conditions", "true");
if(!maybeFeatures.isBlank()) {
params.put("features", maybeFeatures);
}
Expand All @@ -33,9 +32,18 @@ static String url(ClientConfiguration configuration, FeatureRequest request) {
.map(user -> params.put("user", user));


String searchPart = params.entrySet().stream()
return params;
}
public static String queryParameters(FeatureRequest request) {
return queryParametersAsMap(request).entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining("&"));
}

static String url(ClientConfiguration configuration, FeatureRequest request) {
var url = configuration.connectionInformation.url + "/v2/features";

String searchPart = queryParameters(request);

url = !searchPart.isBlank() ? (url + "?" + searchPart) : url;

Expand All @@ -48,7 +56,6 @@ static <T> CompletableFuture<Result<T>> performCall(
Function<String, Result<T>> responseMapper
) {
return configuration.httpClient.apply(request)
// TODO handle error
.thenApply(resp -> responseMapper.apply(resp.body));
}
public static CompletableFuture<Result<Map<String, Feature>>> performRequest(
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/fr/maif/http/ResponseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public static Result<Map<String, Boolean>> parseBooleanResponse(String json) {
.map(Result::new)
.orElseGet(() -> new Result<>("Failed to parse response"));
} catch (JsonMappingException e) {
return new Result<>("Unexpected format received from Izanami");
return new Result<>("Unexpected format received from Izanami: " + json);
} catch (JsonProcessingException e) {
return new Result<>("Invalid JSON received from Izanami");
return new Result<>("Invalid JSON received from Izanami: " + json);
}
}

Expand All @@ -60,9 +60,9 @@ public static Result<Map<String, Feature>> parseFeatureResponse(String json) {
.map(Result::new)
.orElseGet(() -> new Result<>("Failed to parse response"));
} catch (JsonMappingException e) {
return new Result<>("Unexpected format received from Izanami");
return new Result<>("Unexpected format received from Izanami: " + json);
} catch (JsonProcessingException e) {
return new Result<>("Invalid JSON received from Izanami");
return new Result<>("Invalid JSON received from Izanami: " + json);
}
}

Expand All @@ -83,7 +83,7 @@ static FeatureOverload parseOverload(ObjectNode json) {
}
}

static Optional<Feature> parseFeature(String id, ObjectNode json) {
public static Optional<Feature> parseFeature(String id, ObjectNode json) {
if(json.isNull()) {
return Optional.empty();
}
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/fr/maif/requests/FeatureRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ public static SingleFeatureRequest newSingleFeatureRequest(String feature) {
return new SingleFeatureRequest(feature);
}


public FeatureRequest copy() {
var req = new FeatureRequest();
req.features = new HashMap<>(this.features);
req.payload = this.payload;
req.user = this.user;
req.callTimeout = this.callTimeout;
req.context = this.context;
req.errorStrategy= this.errorStrategy;
req.ignoreCache = this.ignoreCache;

return req;
}

/**
* Remove all features for this request, usefull after cpoying a query, to keep parameters but change features
*/
public FeatureRequest clearFeatures() {
this.features = new HashMap<>();
return this;
}

/**
* Add or update user for this request
* @param val user to use for this request
Expand Down
154 changes: 5 additions & 149 deletions src/main/java/fr/maif/requests/FeatureService.java
Original file line number Diff line number Diff line change
@@ -1,158 +1,14 @@
package fr.maif.requests;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import fr.maif.ClientConfiguration;
import fr.maif.errors.IzanamiError;
import fr.maif.features.Feature;
import fr.maif.http.HttpRequester;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static fr.maif.requests.FeatureRequest.newFeatureRequest;

public class FeatureService {
protected ClientConfiguration configuration;
private static final Logger LOGGER = LoggerFactory.getLogger(FeatureService.class);
private final Cache<String, Feature> cache;

public FeatureService(ClientConfiguration configuration) {
this.configuration = configuration;
this.cache = Caffeine.newBuilder()
.build();


if(configuration.cacheConfiguration.enabled) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(
this::refreshCache,
0,
configuration.cacheConfiguration.refreshInterval.getSeconds(), TimeUnit.SECONDS
);
}
}

private void putValuesInCache(Map<String, Feature> features) {
features.forEach(cache::put);
}

private void refreshCache() {
Set<String> features = cache.asMap().keySet();
LOGGER.debug("Refreshing cache for {}", String.join(",", features));
if(features.isEmpty()) {
return;
}
var request = new FeatureRequest().withFeatures(features);

HttpRequester
.performRequest(configuration, request)
.thenAccept(result -> {
if(!result.isError()) {
LOGGER.debug("Received following features for cache refresh {}", String.join("," + result.value.entrySet()));
cache.invalidateAll();
putValuesInCache(result.value);
} else {
LOGGER.error("Failed to refresh cache : {}", result.error.get());
}
});
}

public CompletableFuture<Map<String, Boolean>> featureStates(
public interface FeatureService {
CompletableFuture<Map<String, Boolean>> featureStates(
FeatureRequest request
) {
LOGGER.debug("Feature activation request for {}", String.join(",", request.features.keySet()));
Set<SpecificFeatureRequest> missingFeatures = new HashSet<>();
Map<String, Boolean> activation = new HashMap<>();

request.features.values()
.forEach(f -> {
boolean shouldIgnoreCache = request.isCacheIgnoredFor(f.feature)
.orElseGet(() -> !configuration.cacheConfiguration.enabled);
if(shouldIgnoreCache) {
missingFeatures.add(f);
} else {
Optional<Boolean> maybeActivation = Optional.ofNullable(cache.getIfPresent(f.feature))
.flatMap(cachedFeature -> cachedFeature.active(request.context.orElse(null), request.user));
maybeActivation.ifPresentOrElse(active -> activation.put(f.feature, active), () -> missingFeatures.add(f));
}
});

if(missingFeatures.isEmpty()) {
return CompletableFuture
.completedFuture(activation);
} else {
var missingRequest = newFeatureRequest().withSpecificFeatures(missingFeatures)
.withErrorStrategy(request.errorStrategy.orElse(null))
.withCallTimeout(request.getTimeout().orElse(null))
.withUser(request.user)
.withContext(request.context.orElse(null))
.withPayload(request.payload.orElse(null));
return HttpRequester.performRequest(configuration, missingRequest)
.thenApply(featureResponse -> {
if(featureResponse.isError()) {
String errorMsg = featureResponse.error.get();
LOGGER.error("Failed to retrieve features : {}", errorMsg);
missingFeatures.forEach(f -> {
var errorStrategy = missingRequest.errorStrategyFor(f.feature).orElseGet(() -> configuration.errorStrategy);
if(!errorStrategy.lastKnownFallbackAllowed) {
activation.put(f.feature, errorStrategy.handleError(new IzanamiError(errorMsg)).join());
} else {
Boolean active = Optional.ofNullable(cache.getIfPresent(f.feature))
.flatMap(feat -> feat.active(missingRequest.context.orElse(null), missingRequest.user))
.orElseGet(() -> errorStrategy.handleError(new IzanamiError(errorMsg)).join());
activation.put(f.feature, active);
}
});
} else {
Map<String, Feature> featuresById = featureResponse.value;
missingFeatures.forEach(f -> {
if(featuresById.containsKey(f.feature)) {
var feature = featuresById.get(f.feature);
cache.put(f.feature, feature);
activation.put(f.feature, feature.active);
} else {
// TODO deduplicate this
var errorStrategy = request.errorStrategyFor(f.feature).orElseGet(() -> configuration.errorStrategy);
String errorMessage = "Missing feature in Izanami response : " + f.feature +". Either this feature has been deleted or your key is not authorized for it.";
if(!errorStrategy.lastKnownFallbackAllowed) {
activation.put(f.feature, errorStrategy.handleError(new IzanamiError(errorMessage)).join());
} else {
Boolean active = Optional.ofNullable(cache.getIfPresent(f.feature))
.flatMap(feat -> feat.active(request.context.orElse(null), request.user))
.orElseGet(() -> errorStrategy.handleError(new IzanamiError(errorMessage)).join());
activation.put(f.feature, active);
}
}
});
}
return activation;
}).exceptionally(ex -> {
LOGGER.error("Failed to query remote Izanami", ex);
missingFeatures.forEach(f -> {
// TODO deduplicate this
var errorStrategy = request.errorStrategyFor(f.feature).orElseGet(() -> configuration.errorStrategy);
String errorMessage = "Missing feature in Izanami response : " + f.feature +". Either this feature has been deleted or your key is not authorized for it.";
if(!errorStrategy.lastKnownFallbackAllowed) {
activation.put(f.feature, errorStrategy.handleError(new IzanamiError(errorMessage)).join());
} else {
Boolean active = Optional.ofNullable(cache.getIfPresent(f.feature))
.flatMap(feat -> feat.active(request.context.orElse(null), request.user))
.orElseGet(() -> errorStrategy.handleError(new IzanamiError(errorMessage)).join());
activation.put(f.feature, active);
}
});
return activation;
});
}
}
);

public CompletableFuture<Boolean> featureStates(SingleFeatureRequest request) {
default CompletableFuture<Boolean> featureStates(SingleFeatureRequest request) {
return featureStates(request.toActivationRequest())
.thenApply(resp -> resp.get(request.feature));
}
Expand Down
Loading

0 comments on commit b406d8d

Please sign in to comment.