Skip to content

Commit

Permalink
fix: fix flag push fallback (#37)
Browse files Browse the repository at this point in the history
* fix stream fallback, fix stream retry delay

* added start retry wrapper

* fixed onError cb restarts wrapper

* added tests

* remove test code

* reorder stop order

* lint
  • Loading branch information
zhukaihan authored Nov 1, 2024
1 parent cc10582 commit 2cf5b04
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 63 deletions.
14 changes: 11 additions & 3 deletions src/main/kotlin/deployment/DeploymentRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

private const val MIN_COHORT_POLLING_INTERVAL = 60000L
private const val FLAG_POLLING_JITTER = 1000L
private const val FLAG_STREAMING_RETRY_DELAY = 15000L
private const val FLAG_RETRY_JITTER = 1000L

internal class DeploymentRunner(
private val config: LocalEvaluationConfig,
Expand Down Expand Up @@ -51,14 +52,21 @@ internal class DeploymentRunner(
FlagConfigFallbackRetryWrapper(
FlagConfigStreamer(flagConfigStreamApi, flagConfigStorage, cohortLoader, cohortStorage, metrics),
amplitudeFlagConfigPoller,
FLAG_POLLING_JITTER
FLAG_STREAMING_RETRY_DELAY,
FLAG_RETRY_JITTER,
config.flagConfigPollerIntervalMillis,
0,
)
else amplitudeFlagConfigPoller
private val flagConfigUpdater =
if (flagConfigProxyApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigProxyApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller
amplitudeFlagConfigPoller,
config.flagConfigPollerIntervalMillis,
0,
if (flagConfigStreamApi != null) FLAG_STREAMING_RETRY_DELAY else config.flagConfigPollerIntervalMillis,
if (flagConfigStreamApi != null) FLAG_RETRY_JITTER else 0,
)
else
amplitudeFlagConfigUpdater
Expand Down
120 changes: 76 additions & 44 deletions src/main/kotlin/flag/FlagConfigUpdater.kt
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ internal class FlagConfigPoller(
) {
private val lock: ReentrantLock = ReentrantLock()
private val pool = Executors.newScheduledThreadPool(1, daemonFactory)
private var scheduledFuture: ScheduledFuture<*>? = null // @GuardedBy(lock)
private var scheduledFuture: ScheduledFuture<*>? = null

/**
* Start will fetch once, then start poller to poll flag configs.
*/
override fun start(onError: (() -> Unit)?) {
refresh()
lock.withLock {
stopInternal()
stop()
scheduledFuture = pool.scheduleWithFixedDelay(
{
try {
Expand All @@ -148,16 +148,11 @@ internal class FlagConfigPoller(
}
}

// @GuardedBy(lock)
private fun stopInternal() {
// Pause only stop the task scheduled. It doesn't stop the executor.
scheduledFuture?.cancel(true)
scheduledFuture = null
}

override fun stop() {
lock.withLock {
stopInternal()
// Pause only stop the task scheduled. It doesn't stop the executor.
scheduledFuture?.cancel(true)
scheduledFuture = null
}
}

Expand Down Expand Up @@ -227,24 +222,21 @@ internal class FlagConfigStreamer(
private const val RETRY_DELAY_MILLIS_DEFAULT = 15 * 1000L
private const val MAX_JITTER_MILLIS_DEFAULT = 2000L

/**
* This is a wrapper class around flag config updaters.
* This provides retry capability in case errors encountered during update asynchronously, as well as fallbacks when an updater failed.
*
* `mainUpdater` cannot be a FlagConfigFallbackRetryWrapper.
* The developer should restructure arguments to make sure `mainUpdater` is never a `FlagConfigFallbackRetryWrapper`.
* All retry and fallback structures can be normalized into `mainUpdater`s not being `FlagConfigFallbackRetryWrapper`s.
*/
internal class FlagConfigFallbackRetryWrapper(
private val mainUpdater: FlagConfigUpdater,
private val fallbackUpdater: FlagConfigUpdater?,
retryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT,
maxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT,
fallbackStartRetryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT,
fallbackMaxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT,
) : FlagConfigUpdater {
private val lock: ReentrantLock = ReentrantLock()
private val reconnIntervalRange = max(0, retryDelayMillis - maxJitterMillis)..(min(retryDelayMillis, Long.MAX_VALUE - maxJitterMillis) + maxJitterMillis)
private val executor = Executors.newScheduledThreadPool(1, daemonFactory)
private var retryTask: ScheduledFuture<*>? = null // @GuardedBy(lock)
private val fallbackReconnIntervalRange = max(0, fallbackStartRetryDelayMillis - fallbackMaxJitterMillis)..(min(fallbackStartRetryDelayMillis, Long.MAX_VALUE - fallbackMaxJitterMillis) + fallbackMaxJitterMillis)
private val executor = Executors.newScheduledThreadPool(2, daemonFactory)
private var retryTask: ScheduledFuture<*>? = null
private var fallbackRetryTask: ScheduledFuture<*>? = null
private var isRunning = false

/**
* Since the wrapper retries for mainUpdater, so there will never be error case. Thus, onError will never be called.
Expand All @@ -254,8 +246,9 @@ internal class FlagConfigFallbackRetryWrapper(
* If main start failed, fallback updater tries to start.
* If fallback start failed as well, throws exception.
* If fallback start success, start success, main enters retry loop.
* After started, if main failed, fallback is started and main enters retry loop.
* Fallback success or failures status is not monitored. It's suggested to wrap fallback into a retry wrapper.
* After started, if main failed, main enters retry loop and fallback will start.
* If fallback start failed, fallback will enter start retry loop until it's successfully started.
* If fallback start success, but failed later, it's not monitored. It's recommended to wrap fallback with FlagConfigFallbackRetryWrapper.
*/
override fun start(onError: (() -> Unit)?) {
if (mainUpdater is FlagConfigFallbackRetryWrapper) {
Expand All @@ -268,58 +261,97 @@ internal class FlagConfigFallbackRetryWrapper(
try {
mainUpdater.start {
lock.withLock {
scheduleRetry() // Don't care if poller start error or not, always retry.
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
if (isRunning) {
scheduleRetry() // Don't care if poller start error or not, always retry.
fallbackStart()
}
}
}
fallbackUpdater?.stop()
fallbackStop()
} catch (t: Throwable) {
Logger.e("Primary flag configs start failed, start fallback. Error: ", t)
if (fallbackUpdater == null) {
// No fallback, main start failed is wrapper start fail
Logger.e("Main flag configs start failed, no fallback. Error: ", t)
throw t
}
Logger.w("Main flag configs start failed, starting fallback. Error: ", t)
fallbackUpdater.start()
scheduleRetry()
}
isRunning = true
}
}

override fun stop() {
lock.withLock {
mainUpdater.stop()
fallbackUpdater?.stop()
isRunning = false
retryTask?.cancel(true)
fallbackStop()
mainUpdater.stop()
}
}

override fun shutdown() {
lock.withLock {
isRunning = false
retryTask?.cancel(true)
fallbackStop()
mainUpdater.shutdown()
fallbackUpdater?.shutdown()
retryTask?.cancel(true)
}
}

// @GuardedBy(lock)
private fun scheduleRetry() {
retryTask = executor.schedule({
try {
mainUpdater.start {
scheduleRetry() // Don't care if poller start error or not, always retry stream.
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
lock.withLock {
retryTask = executor.schedule(
{
lock.withLock {
if (!isRunning) {
return@schedule
}
try {
mainUpdater.start {
lock.withLock {
if (isRunning) {
scheduleRetry() // Don't care if poller start error or not, always retry.
fallbackStart()
}
}
}
fallbackStop()
} catch (_: Throwable) {
scheduleRetry()
}
}
}
fallbackUpdater?.stop()
},
reconnIntervalRange.random(),
TimeUnit.MILLISECONDS
)
}
}

private fun fallbackStart() {
lock.withLock {
try {
fallbackUpdater?.start()
} catch (_: Throwable) {
scheduleRetry()
if (isRunning) {
fallbackRetryTask = executor.schedule(
{
fallbackStart()
},
fallbackReconnIntervalRange.random(),
TimeUnit.MILLISECONDS
)
} else {}
}
}, reconnIntervalRange.random(), TimeUnit.MILLISECONDS)
}
}


private fun fallbackStop() {
lock.withLock {
fallbackUpdater?.stop()
fallbackRetryTask?.cancel(true)
}
}
}
Loading

0 comments on commit 2cf5b04

Please sign in to comment.