Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Obey HTTP Status codes from upstream. #221

Merged
merged 15 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_requests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
version: [8,11,17]
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Setup Java
uses: actions/setup-java@v3
with:
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/io/getunleash/metric/DefaultHttpMetricsSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,32 @@ public DefaultHttpMetricsSender(UnleashConfig unleashConfig) {
.create();
}

public void registerClient(ClientRegistration registration) {
public int registerClient(ClientRegistration registration) {
if (!unleashConfig.isDisableMetrics()) {
try {
post(clientRegistrationURL, registration);
int statusCode = post(clientRegistrationURL, registration);
eventDispatcher.dispatch(registration);
return statusCode;
} catch (UnleashException ex) {
eventDispatcher.dispatch(ex);
return -1;
}
}
return -1;
}

public void sendMetrics(ClientMetrics metrics) {
public int sendMetrics(ClientMetrics metrics) {
if (!unleashConfig.isDisableMetrics()) {
try {
post(clientMetricsURL, metrics);
int statusCode = post(clientMetricsURL, metrics);
eventDispatcher.dispatch(metrics);
return statusCode;
} catch (UnleashException ex) {
eventDispatcher.dispatch(ex);
return -1;
}
}
return -1;
}

private int post(URL url, Object o) throws UnleashException {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/getunleash/metric/MetricSender.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.getunleash.metric;

public interface MetricSender {
void registerClient(ClientRegistration registration);
int registerClient(ClientRegistration registration);

void sendMetrics(ClientMetrics metrics);
int sendMetrics(ClientMetrics metrics);
}
9 changes: 6 additions & 3 deletions src/main/java/io/getunleash/metric/OkHttpMetricsSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,21 @@ public OkHttpMetricsSender(UnleashConfig config) {
}

@Override
public void registerClient(ClientRegistration registration) {
public int registerClient(ClientRegistration registration) {
if (!config.isDisableMetrics()) {
try {
post(clientRegistrationUrl, registration);
int statusCode = post(clientRegistrationUrl, registration);
eventDispatcher.dispatch(registration);
return statusCode;
} catch (UnleashException ex) {
eventDispatcher.dispatch(ex);
}
}
return -1;
}

@Override
public void sendMetrics(ClientMetrics metrics) {
public int sendMetrics(ClientMetrics metrics) {
if (!config.isDisableMetrics()) {
try {
post(clientMetricsUrl, metrics);
Expand All @@ -81,6 +83,7 @@ public void sendMetrics(ClientMetrics metrics) {
eventDispatcher.dispatch(ex);
}
}
return -1;
}

private int post(HttpUrl url, Object o) {
Expand Down
96 changes: 91 additions & 5 deletions src/main/java/io/getunleash/metric/UnleashMetricServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import io.getunleash.util.UnleashConfig;
import io.getunleash.util.UnleashScheduledExecutor;
import java.net.HttpURLConnection;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnleashMetricServiceImpl implements UnleashMetricService {
private static final Logger LOGGER = LoggerFactory.getLogger(UnleashMetricServiceImpl.class);
private final LocalDateTime started;
private final UnleashConfig unleashConfig;

Expand All @@ -15,6 +20,10 @@ public class UnleashMetricServiceImpl implements UnleashMetricService {
// mutable
private volatile MetricsBucket currentMetricsBucket;

private final int maxSkips;
private final AtomicInteger failures = new AtomicInteger();
private final AtomicInteger skips = new AtomicInteger();

public UnleashMetricServiceImpl(
UnleashConfig unleashConfig, UnleashScheduledExecutor executor) {
this(unleashConfig, unleashConfig.getMetricSenderFactory().apply(unleashConfig), executor);
Expand All @@ -28,6 +37,8 @@ public UnleashMetricServiceImpl(
this.started = LocalDateTime.now(ZoneId.of("UTC"));
this.unleashConfig = unleashConfig;
this.metricSender = metricSender;
this.maxSkips =
Integer.max(20, 300 / Integer.max((int) unleashConfig.getSendMetricsInterval(), 1));
long metricsInterval = unleashConfig.getSendMetricsInterval();
executor.setInterval(sendMetrics(), metricsInterval, metricsInterval);
}
Expand All @@ -51,11 +62,86 @@ public void countVariant(String toggleName, String variantName) {

private Runnable sendMetrics() {
return () -> {
MetricsBucket metricsBucket = this.currentMetricsBucket;
this.currentMetricsBucket = new MetricsBucket();
metricsBucket.end();
ClientMetrics metrics = new ClientMetrics(unleashConfig, metricsBucket);
metricSender.sendMetrics(metrics);
if (skips.get() == 0) {
MetricsBucket metricsBucket = this.currentMetricsBucket;
this.currentMetricsBucket = new MetricsBucket();
metricsBucket.end();
ClientMetrics metrics = new ClientMetrics(unleashConfig, metricsBucket);
int statusCode = metricSender.sendMetrics(metrics);
if (statusCode >= 200 && statusCode < 400) {
decrementFailureCountAndResetSkips();
}
if (statusCode >= 400) {
handleHttpErrorCodes(statusCode);
}
} else {
skips.decrementAndGet();
}
};
}

/**
* We've had one successful call, so if we had 10 failures in a row, this will reduce the skips
* down to 9, so that we gradually start polling more often, instead of doing max load
* immediately after a sequence of errors.
*/
private void decrementFailureCountAndResetSkips() {
skips.set(Math.max(failures.decrementAndGet(), 0));
}

/**
* We've gotten the message to back off (usually a 429 or a 50x). If we have successive
* failures, failure count here will be incremented higher and higher which will handle
* increasing our backoff, since we set the skip count to the failure count after every reset
*/
private void increaseSkipCount() {
skips.set(Math.min(failures.incrementAndGet(), maxSkips));
}

/**
* We've received an error code that we don't expect to change, which means we've already logged
* an ERROR. To avoid hammering the server that just told us we did something wrong and to avoid
* flooding the logs, we'll increase our skip count to maximum
*/
private void maximizeSkips() {
skips.set(maxSkips);
failures.incrementAndGet();
}

private void handleHttpErrorCodes(int responseCode) {
if (responseCode == 404) {
maximizeSkips();
LOGGER.error(
"Server said that the Metrics receiving endpoint at {} does not exist. Backing off to {} times our poll interval to avoid overloading server",
unleashConfig.getUnleashURLs().getClientMetricsURL(),
maxSkips);
} else if (responseCode == 429) {
increaseSkipCount();
LOGGER.info(
"Client Metrics was RATE LIMITED for the {}. time. Further backing off. Current backoff at {} times our metrics post interval",
failures.get(),
skips.get());
} else if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED
|| responseCode == HttpURLConnection.HTTP_FORBIDDEN) {
maximizeSkips();
LOGGER.error(
"Client was not authorized to post metrics to the Unleash API at {}. Backing off to {} times our poll interval to avoid overloading server",
unleashConfig.getUnleashURLs().getClientMetricsURL(),
maxSkips);
} else if (responseCode >= 500) {
increaseSkipCount();
LOGGER.info(
"Server failed with a {} status code. Backing off. Current backoff at {} times our poll interval",
responseCode,
skips.get());
}
}

protected int getSkips() {
return this.skips.get();
}

protected int getFailures() {
return this.failures.get();
}
}
98 changes: 93 additions & 5 deletions src/main/java/io/getunleash/repository/FeatureRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import io.getunleash.lang.Nullable;
import io.getunleash.util.UnleashConfig;
import io.getunleash.util.UnleashScheduledExecutor;
import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FeatureRepository implements IFeatureRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureRepository.class);
private final UnleashConfig unleashConfig;
private final BackupHandler<FeatureCollection> featureBackupHandler;
private final FeatureBootstrapHandler featureBootstrapHandler;
Expand All @@ -24,6 +28,10 @@ public class FeatureRepository implements IFeatureRepository {
private FeatureCollection featureCollection;
private boolean ready;

private AtomicInteger failures = new AtomicInteger(0);
private AtomicInteger skips = new AtomicInteger(0);
private final Integer maxSkips;

public FeatureRepository(UnleashConfig unleashConfig) {
this(unleashConfig, new FeatureBackupHandlerFile(unleashConfig));
}
Expand All @@ -36,7 +44,7 @@ public FeatureRepository(
this.featureFetcher = unleashConfig.getUnleashFeatureFetcherFactory().apply(unleashConfig);
this.featureBootstrapHandler = new FeatureBootstrapHandler(unleashConfig);
this.eventDispatcher = new EventDispatcher(unleashConfig);

this.maxSkips = this.calculateMaxSkips((int) unleashConfig.getFetchTogglesInterval());
this.initCollections(unleashConfig.getScheduledExecutor());
}

Expand All @@ -52,6 +60,7 @@ protected FeatureRepository(
this.featureFetcher = featureFetcher;
this.featureBootstrapHandler = featureBootstrapHandler;
this.eventDispatcher = eventDispatcher;
this.maxSkips = this.calculateMaxSkips((int) unleashConfig.getFetchTogglesInterval());
this.initCollections(unleashConfig.getScheduledExecutor());
}

Expand All @@ -66,6 +75,7 @@ protected FeatureRepository(
this.featureFetcher = featureFetcher;
this.featureBootstrapHandler = featureBootstrapHandler;
this.eventDispatcher = new EventDispatcher(unleashConfig);
this.maxSkips = this.calculateMaxSkips((int) unleashConfig.getFetchTogglesInterval());
this.initCollections(executor);
}

Expand All @@ -90,8 +100,16 @@ private void initCollections(UnleashScheduledExecutor executor) {
}
}

private Integer calculateMaxSkips(int fetchTogglesInterval) {
return Integer.max(20, 300 / Integer.max(fetchTogglesInterval, 1));
}

private Runnable updateFeatures(@Nullable final Consumer<UnleashException> handler) {
return () -> {
return () -> updateFeaturesInternal(handler);
}

private void updateFeaturesInternal(@Nullable final Consumer<UnleashException> handler) {
if (skips.get() <= 0L) {
try {
ClientFeaturesResponse response = featureFetcher.fetchFeatures();
eventDispatcher.dispatch(response);
Expand All @@ -105,8 +123,11 @@ private Runnable updateFeatures(@Nullable final Consumer<UnleashException> handl
: new SegmentCollection(Collections.emptyList()));

featureBackupHandler.write(featureCollection);
} else if (response.getStatus() == ClientFeaturesResponse.Status.UNAVAILABLE) {
handleHttpErrorCodes(response.getHttpStatusCode());
return;
}

decrementFailureCountAndResetSkips();
if (!ready) {
eventDispatcher.dispatch(new UnleashReady());
ready = true;
Expand All @@ -118,7 +139,66 @@ private Runnable updateFeatures(@Nullable final Consumer<UnleashException> handl
throw e;
}
}
};
} else {
skips.decrementAndGet(); // We didn't do anything this iteration, just reduce the count
}
}

/**
* We've had one successful call, so if we had 10 failures in a row, this will reduce the skips
* down to 9, so that we gradually start polling more often, instead of doing max load
* immediately after a sequence of errors.
*/
private void decrementFailureCountAndResetSkips() {
skips.set(Math.max(failures.decrementAndGet(), 0));
}

/**
* We've gotten the message to back off (usually a 429 or a 50x). If we have successive
* failures, failure count here will be incremented higher and higher which will handle
* increasing our backoff, since we set the skip count to the failure count after every reset
*/
private void increaseSkipCount() {
skips.set(Math.min(failures.incrementAndGet(), maxSkips));
}

/**
* We've received an error code that we don't expect to change, which means we've already logged
* an ERROR. To avoid hammering the server that just told us we did something wrong and to avoid
* flooding the logs, we'll increase our skip count to maximum
*/
private void maximizeSkips() {
skips.set(maxSkips);
failures.incrementAndGet();
}

private void handleHttpErrorCodes(int responseCode) {
if (responseCode == 404) {
maximizeSkips();
LOGGER.error(
"Server said that the API at {} does not exist. Backing off to {} times our poll interval to avoid overloading server",
unleashConfig.getUnleashAPI(),
maxSkips);
} else if (responseCode == 429) {
increaseSkipCount();
LOGGER.info(
"Client was RATE LIMITED for the {}. time. Further backing off. Current backoff at {} times our poll interval",
failures.get(),
skips.get());
} else if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED
|| responseCode == HttpURLConnection.HTTP_FORBIDDEN) {
maximizeSkips();
LOGGER.error(
"Client failed to authenticate to the Unleash API at {}. Backing off to {} times our poll interval to avoid overloading server",
unleashConfig.getUnleashAPI(),
maxSkips);
} else if (responseCode >= 500) {
increaseSkipCount();
LOGGER.info(
"Server failed with a {} status code. Backing off. Current backoff at {} times our poll interval",
responseCode,
skips.get());
}
}

@Override
Expand All @@ -137,4 +217,12 @@ public List<String> getFeatureNames() {
public Segment getSegment(Integer id) {
return featureCollection.getSegmentCollection().getSegment(id);
}

public Integer getFailures() {
return failures.get();
}

public Integer getSkips() {
return skips.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class FeatureToggleResponse implements UnleashEvent {
public enum Status {
NOT_CHANGED,
CHANGED,
UNAVAILABLE
UNAVAILABLE,
}

private final Status status;
Expand Down
Loading