diff --git a/src/main/kotlin/deployment/DeploymentRunner.kt b/src/main/kotlin/deployment/DeploymentRunner.kt index 5997df8..52b5471 100644 --- a/src/main/kotlin/deployment/DeploymentRunner.kt +++ b/src/main/kotlin/deployment/DeploymentRunner.kt @@ -20,7 +20,8 @@ import java.util.concurrent.Executors import java.util.concurrent.TimeUnit private const val MIN_COHORT_POLLING_INTERVAL = 60000L -private const val FLAG_POLLING_JITTER = 1000L +private const val FLAG_STREAMING_RETRY_DELAY = 15000L +private const val FLAG_RETRY_JITTER = 1000L internal class DeploymentRunner( private val config: LocalEvaluationConfig, @@ -51,14 +52,21 @@ internal class DeploymentRunner( FlagConfigFallbackRetryWrapper( FlagConfigStreamer(flagConfigStreamApi, flagConfigStorage, cohortLoader, cohortStorage, metrics), amplitudeFlagConfigPoller, - FLAG_POLLING_JITTER + FLAG_STREAMING_RETRY_DELAY, + FLAG_RETRY_JITTER, + config.flagConfigPollerIntervalMillis, + 0, ) else amplitudeFlagConfigPoller private val flagConfigUpdater = if (flagConfigProxyApi != null) FlagConfigFallbackRetryWrapper( FlagConfigPoller(flagConfigProxyApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics), - amplitudeFlagConfigPoller + amplitudeFlagConfigPoller, + config.flagConfigPollerIntervalMillis, + 0, + if (flagConfigStreamApi != null) FLAG_STREAMING_RETRY_DELAY else config.flagConfigPollerIntervalMillis, + if (flagConfigStreamApi != null) FLAG_RETRY_JITTER else 0, ) else amplitudeFlagConfigUpdater diff --git a/src/main/kotlin/flag/FlagConfigUpdater.kt b/src/main/kotlin/flag/FlagConfigUpdater.kt index 0412a48..c402386 100644 --- a/src/main/kotlin/flag/FlagConfigUpdater.kt +++ b/src/main/kotlin/flag/FlagConfigUpdater.kt @@ -122,7 +122,7 @@ internal class FlagConfigPoller( ) { private val lock: ReentrantLock = ReentrantLock() private val pool = Executors.newScheduledThreadPool(1, daemonFactory) - private var scheduledFuture: ScheduledFuture<*>? = null // @GuardedBy(lock) + private var scheduledFuture: ScheduledFuture<*>? = null /** * Start will fetch once, then start poller to poll flag configs. @@ -130,7 +130,7 @@ internal class FlagConfigPoller( override fun start(onError: (() -> Unit)?) { refresh() lock.withLock { - stopInternal() + stop() scheduledFuture = pool.scheduleWithFixedDelay( { try { @@ -148,16 +148,11 @@ internal class FlagConfigPoller( } } - // @GuardedBy(lock) - private fun stopInternal() { - // Pause only stop the task scheduled. It doesn't stop the executor. - scheduledFuture?.cancel(true) - scheduledFuture = null - } - override fun stop() { lock.withLock { - stopInternal() + // Pause only stop the task scheduled. It doesn't stop the executor. + scheduledFuture?.cancel(true) + scheduledFuture = null } } @@ -227,24 +222,21 @@ internal class FlagConfigStreamer( private const val RETRY_DELAY_MILLIS_DEFAULT = 15 * 1000L private const val MAX_JITTER_MILLIS_DEFAULT = 2000L -/** - * This is a wrapper class around flag config updaters. - * This provides retry capability in case errors encountered during update asynchronously, as well as fallbacks when an updater failed. - * - * `mainUpdater` cannot be a FlagConfigFallbackRetryWrapper. - * The developer should restructure arguments to make sure `mainUpdater` is never a `FlagConfigFallbackRetryWrapper`. - * All retry and fallback structures can be normalized into `mainUpdater`s not being `FlagConfigFallbackRetryWrapper`s. - */ internal class FlagConfigFallbackRetryWrapper( private val mainUpdater: FlagConfigUpdater, private val fallbackUpdater: FlagConfigUpdater?, retryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT, maxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT, + fallbackStartRetryDelayMillis: Long = RETRY_DELAY_MILLIS_DEFAULT, + fallbackMaxJitterMillis: Long = MAX_JITTER_MILLIS_DEFAULT, ) : FlagConfigUpdater { private val lock: ReentrantLock = ReentrantLock() private val reconnIntervalRange = max(0, retryDelayMillis - maxJitterMillis)..(min(retryDelayMillis, Long.MAX_VALUE - maxJitterMillis) + maxJitterMillis) - private val executor = Executors.newScheduledThreadPool(1, daemonFactory) - private var retryTask: ScheduledFuture<*>? = null // @GuardedBy(lock) + private val fallbackReconnIntervalRange = max(0, fallbackStartRetryDelayMillis - fallbackMaxJitterMillis)..(min(fallbackStartRetryDelayMillis, Long.MAX_VALUE - fallbackMaxJitterMillis) + fallbackMaxJitterMillis) + private val executor = Executors.newScheduledThreadPool(2, daemonFactory) + private var retryTask: ScheduledFuture<*>? = null + private var fallbackRetryTask: ScheduledFuture<*>? = null + private var isRunning = false /** * Since the wrapper retries for mainUpdater, so there will never be error case. Thus, onError will never be called. @@ -254,8 +246,9 @@ internal class FlagConfigFallbackRetryWrapper( * If main start failed, fallback updater tries to start. * If fallback start failed as well, throws exception. * If fallback start success, start success, main enters retry loop. - * After started, if main failed, fallback is started and main enters retry loop. - * Fallback success or failures status is not monitored. It's suggested to wrap fallback into a retry wrapper. + * After started, if main failed, main enters retry loop and fallback will start. + * If fallback start failed, fallback will enter start retry loop until it's successfully started. + * If fallback start success, but failed later, it's not monitored. It's recommended to wrap fallback with FlagConfigFallbackRetryWrapper. */ override fun start(onError: (() -> Unit)?) { if (mainUpdater is FlagConfigFallbackRetryWrapper) { @@ -268,58 +261,97 @@ internal class FlagConfigFallbackRetryWrapper( try { mainUpdater.start { lock.withLock { - scheduleRetry() // Don't care if poller start error or not, always retry. - try { - fallbackUpdater?.start() - } catch (_: Throwable) { + if (isRunning) { + scheduleRetry() // Don't care if poller start error or not, always retry. + fallbackStart() } } } - fallbackUpdater?.stop() + fallbackStop() } catch (t: Throwable) { - Logger.e("Primary flag configs start failed, start fallback. Error: ", t) if (fallbackUpdater == null) { // No fallback, main start failed is wrapper start fail + Logger.e("Main flag configs start failed, no fallback. Error: ", t) throw t } + Logger.w("Main flag configs start failed, starting fallback. Error: ", t) fallbackUpdater.start() scheduleRetry() } + isRunning = true } } override fun stop() { lock.withLock { - mainUpdater.stop() - fallbackUpdater?.stop() + isRunning = false retryTask?.cancel(true) + fallbackStop() + mainUpdater.stop() } } override fun shutdown() { lock.withLock { + isRunning = false + retryTask?.cancel(true) + fallbackStop() mainUpdater.shutdown() fallbackUpdater?.shutdown() - retryTask?.cancel(true) } } - // @GuardedBy(lock) private fun scheduleRetry() { - retryTask = executor.schedule({ - try { - mainUpdater.start { - scheduleRetry() // Don't care if poller start error or not, always retry stream. - try { - fallbackUpdater?.start() - } catch (_: Throwable) { + lock.withLock { + retryTask = executor.schedule( + { + lock.withLock { + if (!isRunning) { + return@schedule + } + try { + mainUpdater.start { + lock.withLock { + if (isRunning) { + scheduleRetry() // Don't care if poller start error or not, always retry. + fallbackStart() + } + } + } + fallbackStop() + } catch (_: Throwable) { + scheduleRetry() + } } - } - fallbackUpdater?.stop() + }, + reconnIntervalRange.random(), + TimeUnit.MILLISECONDS + ) + } + } + + private fun fallbackStart() { + lock.withLock { + try { + fallbackUpdater?.start() } catch (_: Throwable) { - scheduleRetry() + if (isRunning) { + fallbackRetryTask = executor.schedule( + { + fallbackStart() + }, + fallbackReconnIntervalRange.random(), + TimeUnit.MILLISECONDS + ) + } else {} } - }, reconnIntervalRange.random(), TimeUnit.MILLISECONDS) } } - \ No newline at end of file + + private fun fallbackStop() { + lock.withLock { + fallbackUpdater?.stop() + fallbackRetryTask?.cancel(true) + } + } +} diff --git a/src/test/kotlin/flag/FlagConfigUpdaterTest.kt b/src/test/kotlin/flag/FlagConfigUpdaterTest.kt index a0db63c..05fa273 100644 --- a/src/test/kotlin/flag/FlagConfigUpdaterTest.kt +++ b/src/test/kotlin/flag/FlagConfigUpdaterTest.kt @@ -220,7 +220,7 @@ class FlagConfigFallbackRetryWrapperTest { @Test fun `Test FallbackRetryWrapper main success no fallback updater`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0, 1100, 0) // Main starts wrapper.start() @@ -241,7 +241,7 @@ class FlagConfigFallbackRetryWrapperTest { @Test fun `Test FallbackRetryWrapper main start error and retries with no fallback updater`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0, 1100, 0) every { mainUpdater.start(capture(mainOnErrorCapture)) } answers { throw Error() } @@ -253,7 +253,7 @@ class FlagConfigFallbackRetryWrapperTest { verify(exactly = 1) { mainUpdater.start(any()) } // Start errors no retry - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 1) { mainUpdater.start(any()) } wrapper.shutdown() @@ -261,7 +261,7 @@ class FlagConfigFallbackRetryWrapperTest { @Test fun `Test FallbackRetryWrapper main error callback and retries with no fallback updater`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0, 1100, 0) // Main start success wrapper.start() @@ -273,24 +273,41 @@ class FlagConfigFallbackRetryWrapperTest { // Retry fail after 1s every { mainUpdater.start(capture(mainOnErrorCapture)) } answers { throw Error() } - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 2) { mainUpdater.start(any()) } // Retry success after 1s justRun { mainUpdater.start(capture(mainOnErrorCapture)) } - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 3) { mainUpdater.start(any()) } // No more start - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 3) { mainUpdater.start(any()) } wrapper.shutdown() } + @Test + fun `Test FallbackRetryWrapper onError callback doesn't start stopped wrapper`() { + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, null, 1000, 0) + + // Main start no error + wrapper.start() + verify(exactly = 1) { mainUpdater.start(any()) } + wrapper.stop() + + // Call onError + mainOnErrorCapture.captured() + Thread.sleep(1100) + verify(exactly = 1) { mainUpdater.start(any()) } + + wrapper.shutdown() + } + @Test fun `Test FallbackRetryWrapper main updater all success`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0, 1100, 0) // Main starts wrapper.start() @@ -317,7 +334,7 @@ class FlagConfigFallbackRetryWrapperTest { @Test fun `Test FallbackRetryWrapper main and fallback start error`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0, 1100, 0) every { mainUpdater.start(capture(mainOnErrorCapture)) } answers { throw Error() } every { fallbackUpdater.start() } answers { throw Error() } @@ -340,7 +357,7 @@ class FlagConfigFallbackRetryWrapperTest { @Test fun `Test FallbackRetryWrapper main start error and retries`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0, 1100, 0) every { mainUpdater.start(capture(mainOnErrorCapture)) } answers { throw Error() } @@ -350,7 +367,7 @@ class FlagConfigFallbackRetryWrapperTest { verify(exactly = 1) { fallbackUpdater.start(any()) } // Retries start - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 2) { mainUpdater.start(any()) } verify(exactly = 1) { fallbackUpdater.start(any()) } @@ -359,7 +376,7 @@ class FlagConfigFallbackRetryWrapperTest { @Test fun `Test FallbackRetryWrapper main error callback and retries`() { - val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0, 1100, 0) // Main start success wrapper.start() @@ -373,21 +390,21 @@ class FlagConfigFallbackRetryWrapperTest { // Retry fail after 1s every { mainUpdater.start(capture(mainOnErrorCapture)) } answers { throw Error() } - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 2) { mainUpdater.start(any()) } verify(exactly = 1) { fallbackUpdater.start(any()) } // Retry success justRun { mainUpdater.start(capture(mainOnErrorCapture)) } verify(exactly = 1) { fallbackUpdater.stop() } - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 3) { mainUpdater.start(any()) } verify(exactly = 1) { fallbackUpdater.start(any()) } verify(exactly = 0) { mainUpdater.stop() } verify(exactly = 2) { fallbackUpdater.stop() } // No more start - Thread.sleep(1100) + Thread.sleep(1200) verify(exactly = 3) { mainUpdater.start(any()) } verify(exactly = 1) { fallbackUpdater.start(any()) } verify(exactly = 0) { mainUpdater.stop() } @@ -396,9 +413,50 @@ class FlagConfigFallbackRetryWrapperTest { wrapper.shutdown() } + @Test + fun `Test FallbackRetryWrapper main error callback and retries, fallback start error and retries`() { + val wrapper = FlagConfigFallbackRetryWrapper(mainUpdater, fallbackUpdater, 1000, 0, 1100, 0) + every { fallbackUpdater.start() } answers { throw Error() } + + // Main start success + wrapper.start() + verify(exactly = 1) { mainUpdater.start(any()) } + verify(exactly = 0) { fallbackUpdater.start(any()) } + verify(exactly = 1) { fallbackUpdater.stop() } + + // Signal error + mainOnErrorCapture.captured() + verify(exactly = 1) { mainUpdater.start(any()) } + verify(exactly = 1) { fallbackUpdater.start(any()) } + + // Retry fail after 1s + every { mainUpdater.start(capture(mainOnErrorCapture)) } answers { throw Error() } + Thread.sleep(1200) + verify(exactly = 2) { mainUpdater.start(any()) } + verify(exactly = 2) { fallbackUpdater.start(any()) } + + // Retry success + justRun { mainUpdater.start(capture(mainOnErrorCapture)) } + verify(exactly = 1) { fallbackUpdater.stop() } + Thread.sleep(1200) + verify(exactly = 3) { mainUpdater.start(any()) } + verify(exactly = 2) { fallbackUpdater.start(any()) } + verify(exactly = 0) { mainUpdater.stop() } + verify(exactly = 2) { fallbackUpdater.stop() } + + // No more start + Thread.sleep(1200) + verify(exactly = 3) { mainUpdater.start(any()) } + verify(exactly = 2) { fallbackUpdater.start(any()) } + verify(exactly = 0) { mainUpdater.stop() } + verify(exactly = 2) { fallbackUpdater.stop() } + + wrapper.shutdown() + } + @Test fun `Test FallbackRetryWrapper main updater cannot be FlagConfigFallbackRetryWrapper`() { - val wrapper = FlagConfigFallbackRetryWrapper(FlagConfigFallbackRetryWrapper(mainUpdater, null), null, 1000, 0) + val wrapper = FlagConfigFallbackRetryWrapper(FlagConfigFallbackRetryWrapper(mainUpdater, null), null, 1000, 0, 1000, 0) try { wrapper.start() fail("Did not throw")