Skip to content

Commit

Permalink
Merge pull request #49 from embrace-io/lucas/refactor_delivery_retry
Browse files Browse the repository at this point in the history
Move Delivery Retry logic outside EmbraceApiService.
  • Loading branch information
lucaslabari authored Nov 7, 2023
2 parents 5b97785 + 4dae99c commit eded592
Show file tree
Hide file tree
Showing 8 changed files with 586 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import io.embrace.android.embracesdk.EmbraceEvent
import io.embrace.android.embracesdk.capture.connectivity.NetworkConnectivityListener
import io.embrace.android.embracesdk.capture.connectivity.NetworkConnectivityService
import io.embrace.android.embracesdk.comms.delivery.DeliveryCacheManager
import io.embrace.android.embracesdk.comms.delivery.DeliveryFailedApiCall
import io.embrace.android.embracesdk.comms.delivery.DeliveryFailedApiCalls
import io.embrace.android.embracesdk.comms.delivery.DeliveryRetryManager
import io.embrace.android.embracesdk.comms.delivery.NetworkStatus
import io.embrace.android.embracesdk.config.remote.RemoteConfig
import io.embrace.android.embracesdk.internal.EmbraceSerializer
Expand All @@ -20,11 +19,8 @@ import io.embrace.android.embracesdk.utils.exceptions.Unchecked
import java.io.StringReader
import java.net.HttpURLConnection
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import kotlin.math.max

internal class EmbraceApiService(
private val apiClient: ApiClient,
Expand All @@ -35,22 +31,18 @@ internal class EmbraceApiService(
private val scheduledExecutorService: ScheduledExecutorService,
networkConnectivityService: NetworkConnectivityService,
private val cacheManager: DeliveryCacheManager,
private val deliveryRetryManager: DeliveryRetryManager,
private val lazyDeviceId: Lazy<String>,
private val appId: String
) : ApiService, NetworkConnectivityListener {

private val retryQueue: DeliveryFailedApiCalls by lazy { cacheManager.loadFailedApiCalls() }
private var lastRetryTask: ScheduledFuture<*>? = null
private var lastNetworkStatus: NetworkStatus = NetworkStatus.UNKNOWN

init {
logger.logDeveloper(TAG, "start")

networkConnectivityService.addNetworkConnectivityListener(this)
lastNetworkStatus = networkConnectivityService.getCurrentNetworkStatus()
scheduledExecutorService.submit(
this::scheduleFailedApiCallsRetry
)
deliveryRetryManager.setPostExecutor(this::executePost)
}

/**
Expand Down Expand Up @@ -118,27 +110,6 @@ internal class EmbraceApiService(

override fun onNetworkConnectivityStatusChanged(status: NetworkStatus) {
lastNetworkStatus = status
logger.logDebug("Network status is now: $lastNetworkStatus")
when (status) {
NetworkStatus.UNKNOWN,
NetworkStatus.WIFI,
NetworkStatus.WAN -> {
scheduleFailedApiCallsRetry()
}

NetworkStatus.NOT_REACHABLE -> {
synchronized(this) {
lastRetryTask?.let { task ->
if (task.cancel(false)) {
logger.logDebug("Failed Calls Retry Action was stopped because there is no connection. ")
lastRetryTask = null
} else {
logger.logError("Failed Calls Retry Action could not be stopped.")
}
}
}
}
}
}

/**
Expand Down Expand Up @@ -263,19 +234,6 @@ internal class EmbraceApiService(
return postOnExecutor(sessionPayload, request, true, onFinish)
}

/**
* Returns true if there is an active pending retry task
*/
fun isRetryTaskActive(): Boolean =
lastRetryTask?.let { task ->
!task.isCancelled && !task.isDone
} ?: false

/**
* Returns the number of failed API calls that will be retried
*/
fun pendingRetriesCount() = retryQueue.size

private fun createRequest(eventMessage: EventMessage): ApiRequest {
logger.logDeveloper(TAG, "sendEvent")
checkNotNull(eventMessage.event) { "event must be set" }
Expand Down Expand Up @@ -355,17 +313,17 @@ internal class EmbraceApiService(
try {
if (lastNetworkStatus != NetworkStatus.NOT_REACHABLE) {
if (compress) {
apiClient.post(request, payload)
executePost(request, payload)
} else {
apiClient.rawPost(request, payload)
executeRawPost(request, payload)
}
} else {
scheduleForRetry(request, payload)
deliveryRetryManager.scheduleForRetry(request, payload)
logger.logWarning("No connection available. Request was queued to retry later.")
}
} catch (ex: Exception) {
logger.logWarning("Failed to post Embrace API call. Will retry.", ex)
scheduleForRetry(request, payload)
deliveryRetryManager.scheduleForRetry(request, payload)
throw ex
} finally {
onComplete?.invoke()
Expand Down Expand Up @@ -400,136 +358,14 @@ internal class EmbraceApiService(
return "$abbreviation:$stories"
}

private fun scheduleForRetry(request: ApiRequest, payload: ByteArray) {
logger.logDeveloper(TAG, "Scheduling api call for retry")
if (pendingRetriesCount() < MAX_FAILED_CALLS) {
val scheduleJob = retryQueue.isEmpty()
val cachedPayloadName = cacheManager.savePayload(payload)
val failedApiCall = DeliveryFailedApiCall(request, cachedPayloadName)
retryQueue.add(failedApiCall)
cacheManager.saveFailedApiCalls(retryQueue)

// By default there are no scheduled retry jobs pending. If the retry queue was initially empty, try to schedule a retry.
if (scheduleJob) {
scheduleFailedApiCallsRetry(RETRY_PERIOD)
}
}
}

/**
* Return true if the conditions are met that a retry should be scheduled
*/
private fun shouldScheduleRetry(): Boolean {
return !isRetryTaskActive() && retryQueue.isNotEmpty()
}

/**
* Schedules an action to retry failed API calls. If the retry doesn't send all the failed API requests, it will recursively schedule
* itself with an exponential backoff delay, starting with [RETRY_PERIOD], doubling after that until
* [MAX_EXPONENTIAL_RETRY_PERIOD] is reached, after which case it stops trying until the next cold start.
*/
private fun scheduleFailedApiCallsRetry(delayInSeconds: Long = 0L) {
try {
synchronized(this) {
if (shouldScheduleRetry()) {
lastRetryTask = scheduledExecutorService.schedule(
{
var noFailedRetries = true
if (lastNetworkStatus != NetworkStatus.NOT_REACHABLE) {
try {
logger.logInfo("Retrying failed API calls")
logger.logDeveloper(TAG, "Retrying failed API calls")
val retries = pendingRetriesCount()
repeat(retries) {
retryQueue.poll()?.let { failedApiCall ->
val callSucceeded = retryFailedApiCall(failedApiCall)
if (callSucceeded) {
// if the retry succeeded, save the modified queue in cache.
cacheManager.saveFailedApiCalls(retryQueue)
} else {
// if the retry failed, add the call back to the queue.
retryQueue.add(failedApiCall)
noFailedRetries = false
}
}
}
} catch (ex: Exception) {
logger.logDebug("Error when retrying failed API call", ex)
}
if (retryQueue.isNotEmpty()) {
scheduledExecutorService.submit {
scheduleNextFailedApiCallsRetry(
noFailedRetries,
delayInSeconds
)
}
}
} else {
logger.logInfo(
"Did not retry network calls as scheduled because the network is not reachable"
)
}
},
delayInSeconds,
TimeUnit.SECONDS
)
logger.logInfo(
"Scheduled failed API calls to retry ${if (delayInSeconds == 0L) "now" else "in $delayInSeconds seconds"}"
)
}
}
} catch (e: RejectedExecutionException) {
// This happens if the executor has shutdown previous to the schedule call
logger.logError("Cannot schedule retry failed calls.", e)
}
private fun executePost(request: ApiRequest, payload: ByteArray) {
apiClient.post(request, payload)
}

/**
* Executes the network call for a DeliveryFailedApiCall.
*/
private fun retryFailedApiCall(call: DeliveryFailedApiCall): Boolean {
val payload = cacheManager.loadPayload(call.cachedPayload)
if (payload != null) {
try {
logger.logDeveloper(TAG, "Retrying failed API call")
apiClient.post(call.apiRequest, payload)
cacheManager.deletePayload(call.cachedPayload)
} catch (ex: Exception) {
logger.logDeveloper(
TAG,
"retried call but fail again, scheduling to retry later",
ex
)
return false
}
} else {
logger.logError("Could not retrieve cached api payload")
// If payload is null, the file could have been removed.
// We don't need to retry sending in the future as we'd get the same result.
// That's the reason for returning true.
}
return true
}

/**
* Schedules the next call to retry sending the failed_api_calls again. The delay will be extended if the previous retry yielded
* at least one failed request.
*/
private fun scheduleNextFailedApiCallsRetry(noFailedRetries: Boolean, delay: Long) {
val nextDelay = if (noFailedRetries) {
RETRY_PERIOD
} else {
// if a network call failed, the retries will use exponential backoff
max(RETRY_PERIOD, delay * 2)
}
if (nextDelay <= MAX_EXPONENTIAL_RETRY_PERIOD) {
scheduleFailedApiCallsRetry(nextDelay)
}
private fun executeRawPost(request: ApiRequest, payload: ByteArray) {
apiClient.rawPost(request, payload)
}
}

private const val TAG = "DeliveryNetworkManager"
private const val TAG = "EmbraceApiService"
private const val CRASH_TIMEOUT = 1L // Seconds to wait before timing out when sending a crash
private const val RETRY_PERIOD = 120L // In seconds
private const val MAX_EXPONENTIAL_RETRY_PERIOD = 3600 // In seconds
private const val MAX_FAILED_CALLS = 200 // Max number of failed calls that will be cached for retry
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.embrace.android.embracesdk.comms.delivery

import io.embrace.android.embracesdk.comms.api.ApiRequest

/**
* Manages the retrying of failed API calls.
*/
internal interface DeliveryRetryManager {

/**
* Schedules a failed API call for retry.
*/
fun scheduleForRetry(request: ApiRequest, payload: ByteArray)

/**
* Sets the executor that will be used to retry failed API calls.
*/
fun setPostExecutor(postExecutor: (request: ApiRequest, payload: ByteArray) -> Unit)
}
Loading

0 comments on commit eded592

Please sign in to comment.