Skip to content

Commit

Permalink
feat!: Use grpc intern reconnections for rpc event stream (#1112)
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Warmuth <bernd.warmuth@dynatrace.com>
Co-authored-by: Todd Baert <todd.baert@dynatrace.com>
  • Loading branch information
warber and toddbaert authored Jan 9, 2025
1 parent ee91441 commit d66adc9
Show file tree
Hide file tree
Showing 19 changed files with 924 additions and 1,126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public final class Config {

static final int DEFAULT_DEADLINE = 500;
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
static final long DEFAULT_KEEP_ALIVE = 0;

Expand All @@ -35,6 +36,7 @@ public final class Config {
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS";
static final String TARGET_URI_ENV_VAR_NAME = "FLAGD_TARGET_URI";
static final String STREAM_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD";

static final String RESOLVER_RPC = "rpc";
static final String RESOLVER_IN_PROCESS = "in-process";
Expand All @@ -52,7 +54,6 @@ public final class Config {
public static final String LRU_CACHE = CacheType.LRU.getValue();
static final String DEFAULT_CACHE = LRU_CACHE;

static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 5;
static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000;

static String fallBackToEnvOrDefault(String key, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,114 +13,147 @@
import lombok.Builder;
import lombok.Getter;

/** FlagdOptions is a builder to build flagd provider options. */
/**
* FlagdOptions is a builder to build flagd provider options.
*/
@Builder
@Getter
@SuppressWarnings("PMD.TooManyStaticImports")
public class FlagdOptions {

/** flagd resolving type. */
/**
* flagd resolving type.
*/
private Config.EvaluatorType resolverType;

/** flagd connection host. */
/**
* flagd connection host.
*/
@Builder.Default
private String host = fallBackToEnvOrDefault(Config.HOST_ENV_VAR_NAME, Config.DEFAULT_HOST);

/** flagd connection port. */
/**
* flagd connection port.
*/
private int port;

/** Use TLS connectivity. */
/**
* Use TLS connectivity.
*/
@Builder.Default
private boolean tls = Boolean.parseBoolean(fallBackToEnvOrDefault(Config.TLS_ENV_VAR_NAME, Config.DEFAULT_TLS));

/** TLS certificate overriding if TLS connectivity is used. */
/**
* TLS certificate overriding if TLS connectivity is used.
*/
@Builder.Default
private String certPath = fallBackToEnvOrDefault(Config.SERVER_CERT_PATH_ENV_VAR_NAME, null);

/** Unix socket path to flagd. */
/**
* Unix socket path to flagd.
*/
@Builder.Default
private String socketPath = fallBackToEnvOrDefault(Config.SOCKET_PATH_ENV_VAR_NAME, null);

/** Cache type to use. Supports - lru, disabled. */
/**
* Cache type to use. Supports - lru, disabled.
*/
@Builder.Default
private String cacheType = fallBackToEnvOrDefault(Config.CACHE_ENV_VAR_NAME, Config.DEFAULT_CACHE);

/** Max cache size. */
/**
* Max cache size.
*/
@Builder.Default
private int maxCacheSize =
fallBackToEnvOrDefault(Config.MAX_CACHE_SIZE_ENV_VAR_NAME, Config.DEFAULT_MAX_CACHE_SIZE);

/** Max event stream connection retries. */
@Builder.Default
private int maxEventStreamRetries = fallBackToEnvOrDefault(
Config.MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, Config.DEFAULT_MAX_EVENT_STREAM_RETRIES);

/** Backoff interval in milliseconds. */
/**
* Backoff interval in milliseconds.
*/
@Builder.Default
private int retryBackoffMs = fallBackToEnvOrDefault(
Config.BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME, Config.BASE_EVENT_STREAM_RETRY_BACKOFF_MS);

/**
* Connection deadline in milliseconds. For RPC resolving, this is the deadline to connect to
* flagd for flag evaluation. For in-process resolving, this is the deadline for sync stream
* termination.
* Connection deadline in milliseconds.
* For RPC resolving, this is the deadline to connect to flagd for flag
* evaluation.
* For in-process resolving, this is the deadline for sync stream termination.
*/
@Builder.Default
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);

/**
* Streaming connection deadline in milliseconds. Set to 0 to disable the deadline. Defaults to
* 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
* Streaming connection deadline in milliseconds.
* Set to 0 to disable the deadline.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
*/
@Builder.Default
private int streamDeadlineMs =
fallBackToEnvOrDefault(Config.STREAM_DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_STREAM_DEADLINE_MS);

/** Selector to be used with flag sync gRPC contract. */
/**
* Grace time period in seconds before provider moves from STALE to ERROR.
* Defaults to 5
*/
@Builder.Default
private int retryGracePeriod =
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
/**
* Selector to be used with flag sync gRPC contract.
**/
@Builder.Default
private String selector = fallBackToEnvOrDefault(Config.SOURCE_SELECTOR_ENV_VAR_NAME, null);

/** gRPC client KeepAlive in milliseconds. Disabled with 0. Defaults to 0 (disabled). */
/**
* gRPC client KeepAlive in milliseconds. Disabled with 0.
* Defaults to 0 (disabled).
**/
@Builder.Default
private long keepAlive = fallBackToEnvOrDefault(
Config.KEEP_ALIVE_MS_ENV_VAR_NAME,
fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME_OLD, Config.DEFAULT_KEEP_ALIVE));

/**
* File source of flags to be used by offline mode. Setting this enables the offline mode of the
* in-process provider.
* File source of flags to be used by offline mode.
* Setting this enables the offline mode of the in-process provider.
*/
@Builder.Default
private String offlineFlagSourcePath = fallBackToEnvOrDefault(Config.OFFLINE_SOURCE_PATH, null);

/**
* gRPC custom target string.
*
* <p>Setting this will allow user to use custom gRPC name resolver at present we are supporting
* all core resolver along with a custom resolver for envoy proxy resolution. For more visit
* (https://grpc.io/docs/guides/custom-name-resolution/)
* <p>Setting this will allow user to use custom gRPC name resolver at present
* we are supporting all core resolver along with a custom resolver for envoy proxy
* resolution. For more visit (https://grpc.io/docs/guides/custom-name-resolution/)
*/
@Builder.Default
private String targetUri = fallBackToEnvOrDefault(Config.TARGET_URI_ENV_VAR_NAME, null);

/**
* Function providing an EvaluationContext to mix into every evaluations. The sync-metadata
* response
* Function providing an EvaluationContext to mix into every evaluations.
* The sync-metadata response
* (https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.GetMetadataResponse),
* represented as a {@link dev.openfeature.sdk.Structure}, is passed as an argument. This function
* runs every time the provider (re)connects, and its result is cached and used in every
* evaluation. By default, the entire sync response (converted to a Structure) is used.
* represented as a {@link dev.openfeature.sdk.Structure}, is passed as an
* argument.
* This function runs every time the provider (re)connects, and its result is cached and used in every evaluation.
* By default, the entire sync response (converted to a Structure) is used.
*/
@Builder.Default
private Function<Structure, EvaluationContext> contextEnricher =
(syncMetadata) -> new ImmutableContext(syncMetadata.asMap());

/** Inject a Custom Connector for fetching flags. */
/**
* Inject a Custom Connector for fetching flags.
*/
private Connector customConnector;

/**
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate distributed tracing
* for flagd grpc connectivity.
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate
* distributed tracing for flagd grpc
* connectivity.
*/
private OpenTelemetry openTelemetry;

Expand All @@ -139,11 +172,14 @@ public FlagdOptions build() {
};
}

/** Overload default lombok builder. */
/**
* Overload default lombok builder.
*/
public static class FlagdOptionsBuilder {
/**
* Enable OpenTelemetry instance extraction from GlobalOpenTelemetry. Note that, this is only
* useful if global configurations are registered.
* Enable OpenTelemetry instance extraction from GlobalOpenTelemetry. Note that,
* this is only useful if global
* configurations are registered.
*/
public FlagdOptionsBuilder withGlobalTelemetry(final boolean b) {
if (b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

/** OpenFeature provider for flagd. */
/**
* OpenFeature provider for flagd.
*/
@Slf4j
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider {
Expand All @@ -38,7 +40,9 @@ protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
}

/** Create a new FlagdProvider instance with default options. */
/**
* Create a new FlagdProvider instance with default options.
*/
public FlagdProvider() {
this(FlagdOptions.builder().build());
}
Expand All @@ -55,10 +59,7 @@ public FlagdProvider(final FlagdOptions options) {
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options,
new Cache(options.getCacheType(), options.getMaxCacheSize()),
this::isConnected,
this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
break;
default:
throw new IllegalStateException(
Expand All @@ -80,7 +81,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
}

this.flagResolver.init();
this.initialized = true;
this.initialized = this.connected = true;
}

@Override
Expand Down Expand Up @@ -129,8 +130,10 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
}

/**
* An unmodifiable view of a Structure representing the latest result of the SyncMetadata. Set on
* initial connection and updated with every reconnection. see:
* An unmodifiable view of a Structure representing the latest result of the
* SyncMetadata.
* Set on initial connection and updated with every reconnection.
* see:
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
*
* @return Object map representing sync metadata
Expand All @@ -153,38 +156,42 @@ private boolean isConnected() {
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
boolean previous = connected;
boolean current = connected = connectionEvent.isConnected();
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());

// configuration changed
if (initialized && previous && current) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
if (!initialized) {
return;
}
// there was an error
if (initialized && previous && !current) {
log.debug("There has been an error");

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.message("there has been an error")
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderError(details);
this.emitProviderReady(details);
return;
}
// we recovered from an error
if (initialized && !previous && current) {
log.debug("Recovered from error");

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.message("recovered from error")
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderReady(details);
this.emitProviderConfigurationChanged(details);
return;
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
}
}
}
Loading

0 comments on commit d66adc9

Please sign in to comment.