Skip to content

Commit

Permalink
prevent adding executionInterval & postCreateDelay
Browse files Browse the repository at this point in the history
After we wait for the opRepoPostCreateDelay time call to waiter.wake()
would cause waitForNewOperationAndExecutionInterval() to wake up as
we expect, however it was then waiting for the full time on
opRepoExecutionInterval as well.
To solve this we simply subtract the time we already waited from
opRepoExecutionInterval. With the current values this means we
fully skip opRepoExecutionInterval so we only wait
opRepoPostCreateDelay.
  • Loading branch information
jkasten2 committed Apr 19, 2024
1 parent fb10758 commit 1c2bf82
Showing 1 changed file with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ internal class OperationRepo(
}
}

internal class LoopWaiterMessage(
val force: Boolean,
val previousWaitedTime: Long = 0,
)

private val executorsMap: Map<String, IOperationExecutor>
private val queue = mutableListOf<OperationQueueItem>()
private val waiter = WaiterWithValue<Boolean>()
private val waiter = WaiterWithValue<LoopWaiterMessage>()
private var paused = false
private var coroutineScope = CoroutineScope(newSingleThreadContext(name = "OpRepo"))

Expand Down Expand Up @@ -124,7 +129,7 @@ internal class OperationRepo(
}
}

waiter.wake(flush)
waiter.wake(LoopWaiterMessage(flush, 0))
}

/**
Expand Down Expand Up @@ -161,16 +166,16 @@ internal class OperationRepo(
*/
private suspend fun waitForNewOperationAndExecutionInterval() {
// 1. Wait for an operation to be enqueued
var force = waiter.waitForWake()
var wakeMessage = waiter.waitForWake()

// 2. Wait at least the time defined in opRepoExecutionInterval
// so operations can be grouped, unless one of them used
// flush=true (AKA force)
var lastTime = _time.currentTimeMillis
var remainingTime = _configModelStore.model.opRepoExecutionInterval
while (!force && remainingTime > 0) {
var remainingTime = _configModelStore.model.opRepoExecutionInterval - wakeMessage.previousWaitedTime
while (!wakeMessage.force && remainingTime > 0) {
withTimeoutOrNull(remainingTime) {
force = waiter.waitForWake()
wakeMessage = waiter.waitForWake()
}
remainingTime -= _time.currentTimeMillis - lastTime
lastTime = _time.currentTimeMillis
Expand Down Expand Up @@ -198,14 +203,10 @@ internal class OperationRepo(
}
response.idTranslations.values.forEach { _newRecordState.add(it) }
coroutineScope.launch {
delay(_configModelStore.model.opRepoPostCreateDelay)
val waitTime = _configModelStore.model.opRepoPostCreateDelay
delay(waitTime)
synchronized(queue) {
// NOTE: Even if the queue is not empty we may wake
// when not needed, as those operations may not have
// depended on these ids. This however should be very
// rare and the side-effect is only a bit less
// batching.
if (queue.isNotEmpty()) waiter.wake(false)
if (queue.isNotEmpty()) waiter.wake(LoopWaiterMessage(false, waitTime))
}
}
}
Expand Down

0 comments on commit 1c2bf82

Please sign in to comment.