Skip to content
This repository was archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request #227 from launchdarkly/eb/ch67913/data-source-status
Browse files Browse the repository at this point in the history
(#6) implement data source status monitoring
  • Loading branch information
eli-darkly authored May 12, 2020
2 parents 2349290 + 9690842 commit 1398c7c
Show file tree
Hide file tree
Showing 27 changed files with 1,280 additions and 241 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/launchdarkly/sdk/server/Components.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.launchdarkly.sdk.server.interfaces.ClientContext;
import com.launchdarkly.sdk.server.interfaces.DataSource;
import com.launchdarkly.sdk.server.interfaces.DataSourceFactory;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates;
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreFactory;
Expand Down Expand Up @@ -338,6 +339,7 @@ public DataSource createDataSource(ClientContext context, DataSourceUpdates data
} else {
LDClient.logger.info("LaunchDarkly client will not connect to Launchdarkly for feature flag data");
}
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
return NullDataSource.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;

import java.util.function.Supplier;

final class DataSourceStatusProviderImpl implements DataSourceStatusProvider {
private final EventBroadcasterImpl<DataSourceStatusProvider.StatusListener, DataSourceStatusProvider.Status> dataSourceStatusNotifier;
private final Supplier<DataSourceStatusProvider.Status> statusSupplier;

DataSourceStatusProviderImpl(EventBroadcasterImpl<StatusListener, Status> dataSourceStatusNotifier,
Supplier<Status> statusSupplier) {
this.dataSourceStatusNotifier = dataSourceStatusNotifier;
this.statusSupplier = statusSupplier;
}

@Override
public Status getStatus() {
return statusSupplier.get();
}

@Override
public void addStatusListener(StatusListener listener) {
dataSourceStatusNotifier.register(listener);
}

@Override
public void removeStatusListener(StatusListener listener) {
dataSourceStatusNotifier.unregister(listener);
}
}
107 changes: 88 additions & 19 deletions src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.launchdarkly.sdk.server.DataModelDependencies.KindAndKey;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorKind;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status;
import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates;
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.FullDataSet;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.ItemDescriptor;
import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.KeyedItems;
import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates;
import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent;
import com.launchdarkly.sdk.server.interfaces.FlagChangeListener;

import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -33,34 +39,51 @@
final class DataSourceUpdatesImpl implements DataSourceUpdates {
private final DataStore store;
private final EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier;
private final EventBroadcasterImpl<DataSourceStatusProvider.StatusListener, DataSourceStatusProvider.Status> dataSourceStatusNotifier;
private final DataModelDependencies.DependencyTracker dependencyTracker = new DataModelDependencies.DependencyTracker();
private final DataStoreStatusProvider dataStoreStatusProvider;

private volatile DataSourceStatusProvider.Status currentStatus;
private volatile boolean lastStoreUpdateFailed = false;

DataSourceUpdatesImpl(
DataStore store,
DataStoreStatusProvider dataStoreStatusProvider,
EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier,
DataStoreStatusProvider dataStoreStatusProvider
EventBroadcasterImpl<DataSourceStatusProvider.StatusListener, DataSourceStatusProvider.Status> dataSourceStatusNotifier
) {
this.store = store;
this.flagChangeEventNotifier = flagChangeEventNotifier;
this.dataSourceStatusNotifier = dataSourceStatusNotifier;
this.dataStoreStatusProvider = dataStoreStatusProvider;

currentStatus = new DataSourceStatusProvider.Status(
DataSourceStatusProvider.State.INITIALIZING,
Instant.now(),
null
);
}

@Override
public void init(FullDataSet<ItemDescriptor> allData) {
public boolean init(FullDataSet<ItemDescriptor> allData) {
Map<DataKind, Map<String, ItemDescriptor>> oldData = null;

if (hasFlagChangeEventListeners()) {
// Query the existing data if any, so that after the update we can send events for whatever was changed
oldData = new HashMap<>();
for (DataKind kind: ALL_DATA_KINDS) {
KeyedItems<ItemDescriptor> items = store.getAll(kind);
oldData.put(kind, ImmutableMap.copyOf(items.getItems()));

try {
if (hasFlagChangeEventListeners()) {
// Query the existing data if any, so that after the update we can send events for whatever was changed
oldData = new HashMap<>();
for (DataKind kind: ALL_DATA_KINDS) {
KeyedItems<ItemDescriptor> items = store.getAll(kind);
oldData.put(kind, ImmutableMap.copyOf(items.getItems()));
}
}
store.init(DataModelDependencies.sortAllCollections(allData));
lastStoreUpdateFailed = false;
} catch (RuntimeException e) {
reportStoreFailure(e);
return false;
}

store.init(DataModelDependencies.sortAllCollections(allData));

// We must always update the dependency graph even if we don't currently have any event listeners, because if
// listeners are added later, we don't want to have to reread the whole data store to compute the graph
updateDependencyTrackerFromFullDataSet(allData);
Expand All @@ -70,11 +93,20 @@ public void init(FullDataSet<ItemDescriptor> allData) {
if (oldData != null) {
sendChangeEvents(computeChangedItemsForFullDataSet(oldData, fullDataSetToMap(allData)));
}

return true;
}

@Override
public void upsert(DataKind kind, String key, ItemDescriptor item) {
boolean successfullyUpdated = store.upsert(kind, key, item);
public boolean upsert(DataKind kind, String key, ItemDescriptor item) {
boolean successfullyUpdated;
try {
successfullyUpdated = store.upsert(kind, key, item);
lastStoreUpdateFailed = false;
} catch (RuntimeException e) {
reportStoreFailure(e);
return false;
}

if (successfullyUpdated) {
dependencyTracker.updateDependenciesFrom(kind, key, item);
Expand All @@ -84,21 +116,49 @@ public void upsert(DataKind kind, String key, ItemDescriptor item) {
sendChangeEvents(affectedItems);
}
}

return true;
}

@Override
public DataStoreStatusProvider getDataStoreStatusProvider() {
return dataStoreStatusProvider;
}

@Override
public void updateStatus(DataSourceStatusProvider.State newState, DataSourceStatusProvider.ErrorInfo newError) {
if (newState == null) {
return;
}
DataSourceStatusProvider.Status newStatus;
synchronized (this) {
if (newState == DataSourceStatusProvider.State.INTERRUPTED && currentStatus.getState() == DataSourceStatusProvider.State.INITIALIZING) {
newState = DataSourceStatusProvider.State.INITIALIZING; // see comment on updateStatus in the DataSourceUpdates interface
}
if (newState == currentStatus.getState() && newError == null) {
return;
}
currentStatus = new DataSourceStatusProvider.Status(
newState,
newState == currentStatus.getState() ? currentStatus.getStateSince() : Instant.now(),
newError == null ? currentStatus.getLastError() : newError
);
newStatus = currentStatus;
}
dataSourceStatusNotifier.broadcast(newStatus);
}

Status getLastStatus() {
synchronized (this) {
return currentStatus;
}
}

private boolean hasFlagChangeEventListeners() {
return flagChangeEventNotifier != null && flagChangeEventNotifier.hasListeners();
return flagChangeEventNotifier.hasListeners();
}

private void sendChangeEvents(Iterable<KindAndKey> affectedItems) {
if (flagChangeEventNotifier == null) {
return;
}
for (KindAndKey item: affectedItems) {
if (item.kind == FEATURES) {
flagChangeEventNotifier.broadcast(new FlagChangeEvent(item.key));
Expand Down Expand Up @@ -153,6 +213,15 @@ private Set<KindAndKey> computeChangedItemsForFullDataSet(Map<DataKind, Map<Stri
// version numbers are different, the higher one is the more recent version).
}
}
return affectedItems;
return affectedItems;
}

private void reportStoreFailure(RuntimeException e) {
if (!lastStoreUpdateFailed) {
LDClient.logger.warn("Unexpected data store error when trying to store an update received from the data source: {}", e.toString());
lastStoreUpdateFailed = true;
}
LDClient.logger.debug(e.toString(), e);
updateStatus(State.INTERRUPTED, ErrorInfo.fromException(ErrorKind.STORE_ERROR, e));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent;
import com.launchdarkly.sdk.server.interfaces.FlagChangeListener;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -30,6 +35,20 @@ final class EventBroadcasterImpl<ListenerT, EventT> {
this.executor = executor;
}

static EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> forFlagChangeEvents(ExecutorService executor) {
return new EventBroadcasterImpl<>(FlagChangeListener::onFlagChange, executor);
}

static EventBroadcasterImpl<DataSourceStatusProvider.StatusListener, DataSourceStatusProvider.Status>
forDataSourceStatus(ExecutorService executor) {
return new EventBroadcasterImpl<>(DataSourceStatusProvider.StatusListener::dataSourceStatusChanged, executor);
}

static EventBroadcasterImpl<DataStoreStatusProvider.StatusListener, DataStoreStatusProvider.Status>
forDataStoreStatus(ExecutorService executor) {
return new EventBroadcasterImpl<>(DataStoreStatusProvider.StatusListener::dataStoreStatusChanged, executor);
}

/**
* Registers a listener for this type of event. This method is thread-safe.
*
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/com/launchdarkly/sdk/server/LDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.launchdarkly.sdk.server.integrations.EventProcessorBuilder;
import com.launchdarkly.sdk.server.interfaces.DataSource;
import com.launchdarkly.sdk.server.interfaces.DataSourceFactory;
import com.launchdarkly.sdk.server.interfaces.DataSourceUpdates;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStore;
import com.launchdarkly.sdk.server.interfaces.DataStoreFactory;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
Expand Down Expand Up @@ -65,6 +65,7 @@ public final class LDClient implements LDClientInterface {
final DataSource dataSource;
final DataStore dataStore;
private final DataStoreStatusProviderImpl dataStoreStatusProvider;
private final DataSourceStatusProviderImpl dataSourceStatusProvider;
private final EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier;
private final ScheduledExecutorService sharedExecutor;

Expand Down Expand Up @@ -159,7 +160,7 @@ public LDClient(String sdkKey, LDConfig config) {
DataStoreFactory factory = config.dataStoreFactory == null ?
Components.inMemoryDataStore() : config.dataStoreFactory;
EventBroadcasterImpl<DataStoreStatusProvider.StatusListener, DataStoreStatusProvider.Status> dataStoreStatusNotifier =
new EventBroadcasterImpl<>(DataStoreStatusProvider.StatusListener::dataStoreStatusChanged, sharedExecutor);
EventBroadcasterImpl.forDataStoreStatus(sharedExecutor);
DataStoreUpdatesImpl dataStoreUpdates = new DataStoreUpdatesImpl(dataStoreStatusNotifier);
this.dataStore = factory.createDataStore(context, dataStoreUpdates);

Expand All @@ -173,19 +174,23 @@ public DataModel.Segment getSegment(String key) {
}
});

this.flagChangeEventNotifier = new EventBroadcasterImpl<>(FlagChangeListener::onFlagChange, sharedExecutor);
this.flagChangeEventNotifier = EventBroadcasterImpl.forFlagChangeEvents(sharedExecutor);

this.dataStoreStatusProvider = new DataStoreStatusProviderImpl(this.dataStore, dataStoreUpdates);

DataSourceFactory dataSourceFactory = config.dataSourceFactory == null ?
Components.streamingDataSource() : config.dataSourceFactory;
DataSourceUpdates dataSourceUpdates = new DataSourceUpdatesImpl(
EventBroadcasterImpl<DataSourceStatusProvider.StatusListener, DataSourceStatusProvider.Status> dataSourceStatusNotifier =
EventBroadcasterImpl.forDataSourceStatus(sharedExecutor);
DataSourceUpdatesImpl dataSourceUpdates = new DataSourceUpdatesImpl(
dataStore,
dataStoreStatusProvider,
flagChangeEventNotifier,
dataStoreStatusProvider
dataSourceStatusNotifier
);
this.dataSource = dataSourceFactory.createDataSource(context, dataSourceUpdates);

this.dataSource = dataSourceFactory.createDataSource(context, dataSourceUpdates);
this.dataSourceStatusProvider = new DataSourceStatusProviderImpl(dataSourceStatusNotifier, dataSourceUpdates::getLastStatus);

Future<Void> startFuture = dataSource.start();
if (!config.startWait.isZero() && !config.startWait.isNegative()) {
if (!(dataSource instanceof Components.NullDataSource)) {
Expand Down Expand Up @@ -460,6 +465,11 @@ public void unregisterFlagChangeListener(FlagChangeListener listener) {
public DataStoreStatusProvider getDataStoreStatusProvider() {
return dataStoreStatusProvider;
}

@Override
public DataSourceStatusProvider getDataSourceStatusProvider() {
return dataSourceStatusProvider;
}

@Override
public void close() throws IOException {
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/launchdarkly/sdk/server/LDClientInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.launchdarkly.sdk.EvaluationDetail;
import com.launchdarkly.sdk.LDUser;
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
import com.launchdarkly.sdk.server.interfaces.FlagChangeListener;

Expand Down Expand Up @@ -263,6 +264,19 @@ public interface LDClientInterface extends Closeable {
* @since 5.0.0
*/
void unregisterFlagChangeListener(FlagChangeListener listener);

/**
* Returns an interface for tracking the status of the data source.
* <p>
* The data source is the mechanism that the SDK uses to get feature flag configurations, such as a
* streaming connection (the default) or poll requests. The {@link DataSourceStatusProvider} has methods
* for checking whether the data source is (as far as the SDK knows) currently operational and tracking
* changes in this status.
*
* @return a {@link DataSourceStatusProvider}
* @since 5.0.0
*/
DataSourceStatusProvider getDataSourceStatusProvider();

/**
* Returns an interface for tracking the status of a persistent data store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ public void run() {
};
synchronized (this) {
if (pollerFuture == null) {
pollerFuture = scheduler.scheduleAtFixedRate(pollerTask, POLL_INTERVAL_MS, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
pollerFuture = scheduler.scheduleAtFixedRate(
pollerTask,
POLL_INTERVAL_MS,
POLL_INTERVAL_MS,
TimeUnit.MILLISECONDS
);
}
}
}
Expand Down
Loading

0 comments on commit 1398c7c

Please sign in to comment.