Skip to content

Commit

Permalink
Merge pull request #46 from soil-kt/improve-prefetch-query
Browse files Browse the repository at this point in the history
Improve the results of synchronously executed Queries.
  • Loading branch information
ogaclejapan authored Jul 28, 2024
2 parents 1bf6e0d + 48b947e commit a105983
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ suspend fun <T, S> QueryCommand.Context<QueryChunks<T, S>>.revalidate(
*/
suspend inline fun <T, S> QueryCommand.Context<QueryChunks<T, S>>.dispatchFetchChunksResult(
key: InfiniteQueryKey<T, S>,
variable: S
variable: S,
noinline callback: QueryCallback<QueryChunks<T, S>>? = null
) {
fetch(key, variable)
.map { QueryChunk(it, variable) }
Expand All @@ -86,6 +87,7 @@ suspend inline fun <T, S> QueryCommand.Context<QueryChunks<T, S>>.dispatchFetchC
.onSuccess(::dispatchFetchSuccess)
.onFailure(::dispatchFetchFailure)
.onFailure { options.onError?.invoke(it, state, key.id) }
.also { callback?.invoke(it) }
}

/**
Expand All @@ -99,11 +101,13 @@ suspend inline fun <T, S> QueryCommand.Context<QueryChunks<T, S>>.dispatchFetchC
*/
suspend inline fun <T, S> QueryCommand.Context<QueryChunks<T, S>>.dispatchRevalidateChunksResult(
key: InfiniteQueryKey<T, S>,
chunks: QueryChunks<T, S>
chunks: QueryChunks<T, S>,
noinline callback: QueryCallback<QueryChunks<T, S>>? = null
) {
revalidate(key, chunks)
.run { key.onRecoverData()?.let(::recoverCatching) ?: this }
.onSuccess(::dispatchFetchSuccess)
.onFailure(::dispatchFetchFailure)
.onFailure { options.onError?.invoke(it, state, key.id) }
.also { callback?.invoke(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package soil.query

import soil.query.internal.vvv
import kotlin.coroutines.cancellation.CancellationException

/**
* Query command for [InfiniteQueryKey].
Expand All @@ -23,19 +24,21 @@ sealed class InfiniteQueryCommands<T, S> : QueryCommand<QueryChunks<T, S>> {
*/
data class Connect<T, S>(
val key: InfiniteQueryKey<T, S>,
val revision: String? = null
val revision: String? = null,
val callback: QueryCallback<QueryChunks<T, S>>? = null
) : InfiniteQueryCommands<T, S>() {
override suspend fun handle(ctx: QueryCommand.Context<QueryChunks<T, S>>) {
if (!ctx.shouldFetch(revision)) {
ctx.options.vvv(key.id) { "skip fetch(shouldFetch=false)" }
callback?.invoke(Result.failure(CancellationException("skip fetch")))
return
}
ctx.dispatch(QueryAction.Fetching())
val chunks = ctx.state.data
if (chunks.isNullOrEmpty() || ctx.state.isPlaceholderData) {
ctx.dispatchFetchChunksResult(key, key.initialParam())
ctx.dispatchFetchChunksResult(key, key.initialParam(), callback)
} else {
ctx.dispatchRevalidateChunksResult(key, chunks)
ctx.dispatchRevalidateChunksResult(key, chunks, callback)
}
}
}
Expand All @@ -50,19 +53,21 @@ sealed class InfiniteQueryCommands<T, S> : QueryCommand<QueryChunks<T, S>> {
*/
data class Invalidate<T, S>(
val key: InfiniteQueryKey<T, S>,
val revision: String
val revision: String,
val callback: QueryCallback<QueryChunks<T, S>>? = null
) : InfiniteQueryCommands<T, S>() {
override suspend fun handle(ctx: QueryCommand.Context<QueryChunks<T, S>>) {
if (ctx.state.revision != revision) {
ctx.options.vvv(key.id) { "skip fetch(revision is not matched)" }
callback?.invoke(Result.failure(CancellationException("skip fetch")))
return
}
ctx.dispatch(QueryAction.Fetching(isInvalidated = true))
val chunks = ctx.state.data
if (chunks.isNullOrEmpty() || ctx.state.isPlaceholderData) {
ctx.dispatchFetchChunksResult(key, key.initialParam())
ctx.dispatchFetchChunksResult(key, key.initialParam(), callback)
} else {
ctx.dispatchRevalidateChunksResult(key, chunks)
ctx.dispatchRevalidateChunksResult(key, chunks, callback)
}
}
}
Expand All @@ -75,17 +80,19 @@ sealed class InfiniteQueryCommands<T, S> : QueryCommand<QueryChunks<T, S>> {
*/
data class LoadMore<T, S>(
val key: InfiniteQueryKey<T, S>,
val param: S
val param: S,
val callback: QueryCallback<QueryChunks<T, S>>? = null
) : InfiniteQueryCommands<T, S>() {
override suspend fun handle(ctx: QueryCommand.Context<QueryChunks<T, S>>) {
val chunks = ctx.state.data
if (param != key.loadMoreParam(chunks.orEmpty())) {
ctx.options.vvv(key.id) { "skip fetch(param is changed)" }
callback?.invoke(Result.failure(CancellationException("skip fetch")))
return
}

ctx.dispatch(QueryAction.Fetching())
ctx.dispatchFetchChunksResult(key, param)
ctx.dispatchFetchChunksResult(key, param, callback)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

package soil.query

import kotlinx.coroutines.CompletableDeferred
import soil.query.internal.toResultCallback
import kotlin.coroutines.cancellation.CancellationException

/**
* A reference to an [Query] for [InfiniteQueryKey].
*
Expand All @@ -28,6 +32,22 @@ class InfiniteQueryRef<T, S>(
event.collect(::handleEvent)
}

/**
* Prefetches the [Query].
*/
suspend fun prefetch(): Boolean {
val deferred = CompletableDeferred<QueryChunks<T, S>>()
command.send(InfiniteQueryCommands.Connect(key, state.value.revision, deferred.toResultCallback()))
return try {
deferred.await()
true
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
false
}
}

/**
* Invalidates the [Query].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package soil.query

import kotlinx.coroutines.Job
import soil.query.internal.UniqueId

/**
Expand Down Expand Up @@ -32,7 +33,7 @@ interface QueryClient {
* Prefetch is executed within a [kotlinx.coroutines.CoroutineScope] associated with the instance of [QueryClient].
* After data retrieval, subscription is automatically unsubscribed, hence the caching period depends on [QueryOptions].
*/
fun <T> prefetchQuery(key: QueryKey<T>)
fun <T> prefetchQuery(key: QueryKey<T>): Job

/**
* Prefetches the infinite query by the specified [InfiniteQueryKey].
Expand All @@ -41,7 +42,7 @@ interface QueryClient {
* Prefetch is executed within a [kotlinx.coroutines.CoroutineScope] associated with the instance of [QueryClient].
* After data retrieval, subscription is automatically unsubscribed, hence the caching period depends on [QueryOptions].
*/
fun <T, S> prefetchInfiniteQuery(key: InfiniteQueryKey<T, S>)
fun <T, S> prefetchInfiniteQuery(key: InfiniteQueryKey<T, S>): Job
}

/**
Expand Down Expand Up @@ -127,3 +128,4 @@ typealias QueryEffect = QueryMutableClient.() -> Unit

typealias QueryRecoverData<T> = (error: Throwable) -> T
typealias QueryOptionsOverride = (QueryOptions) -> QueryOptions
typealias QueryCallback<T> = (Result<T>) -> Unit
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ suspend fun <T> QueryCommand.Context<T>.fetch(
*
* @param key Instance of a class implementing [QueryKey].
*/
suspend inline fun <T> QueryCommand.Context<T>.dispatchFetchResult(key: QueryKey<T>) {
suspend inline fun <T> QueryCommand.Context<T>.dispatchFetchResult(
key: QueryKey<T>,
noinline callback: QueryCallback<T>? = null
) {
fetch(key)
.run { key.onRecoverData()?.let(::recoverCatching) ?: this }
.onSuccess(::dispatchFetchSuccess)
.onFailure(::dispatchFetchFailure)
.onFailure { options.onError?.invoke(it, state, key.id) }
.also { callback?.invoke(it) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package soil.query

import soil.query.internal.vvv
import kotlin.coroutines.cancellation.CancellationException

/**
* Query command for [QueryKey].
Expand All @@ -22,16 +23,18 @@ sealed class QueryCommands<T> : QueryCommand<T> {
*/
data class Connect<T>(
val key: QueryKey<T>,
val revision: String? = null
val revision: String? = null,
val callback: QueryCallback<T>? = null
) : QueryCommands<T>() {

override suspend fun handle(ctx: QueryCommand.Context<T>) {
if (!ctx.shouldFetch(revision)) {
ctx.options.vvv(key.id) { "skip fetch(shouldFetch=false)" }
callback?.invoke(Result.failure(CancellationException("skip fetch")))
return
}
ctx.dispatch(QueryAction.Fetching())
ctx.dispatchFetchResult(key)
ctx.dispatchFetchResult(key, callback)
}
}

Expand All @@ -45,16 +48,18 @@ sealed class QueryCommands<T> : QueryCommand<T> {
*/
data class Invalidate<T>(
val key: QueryKey<T>,
val revision: String
val revision: String,
val callback: QueryCallback<T>? = null
) : QueryCommands<T>() {

override suspend fun handle(ctx: QueryCommand.Context<T>) {
if (ctx.state.revision != revision) {
ctx.options.vvv(key.id) { "skip fetch(revision is not matched)" }
callback?.invoke(Result.failure(CancellationException("skip fetch")))
return
}
ctx.dispatch(QueryAction.Fetching(isInvalidated = true))
ctx.dispatchFetchResult(key)
ctx.dispatchFetchResult(key, callback)
}
}
}
20 changes: 20 additions & 0 deletions soil-query-core/src/commonMain/kotlin/soil/query/QueryRef.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

package soil.query

import kotlinx.coroutines.CompletableDeferred
import soil.query.internal.toResultCallback
import kotlin.coroutines.cancellation.CancellationException

/**
* A reference to an [Query] for [QueryKey].
*
Expand All @@ -27,6 +31,22 @@ class QueryRef<T>(
event.collect(::handleEvent)
}

/**
* Prefetches the [Query].
*/
suspend fun prefetch(): Boolean {
val deferred = CompletableDeferred<T>()
command.send(QueryCommands.Connect(key, state.value.revision, deferred.toResultCallback()))
return try {
deferred.await()
true
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
false
}
}

/**
* Invalidates the [Query].
*
Expand Down
27 changes: 11 additions & 16 deletions soil-query-core/src/commonMain/kotlin/soil/query/SwrCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import soil.query.SwrCachePolicy.Companion.DEFAULT_GC_CHUNK_SIZE
import soil.query.SwrCachePolicy.Companion.DEFAULT_GC_INTERVAL
import soil.query.internal.ActorBlockRunner
Expand Down Expand Up @@ -401,34 +400,30 @@ class SwrCache(private val policy: SwrCachePolicy) : SwrClient, QueryMutableClie
)
}

override fun <T> prefetchQuery(key: QueryKey<T>) {
override fun <T> prefetchQuery(key: QueryKey<T>): Job {
val scope = CoroutineScope(policy.mainDispatcher)
val query = getQuery(key).also { it.launchIn(scope) }
coroutineScope.launch {
val revision = query.state.value.revision
val job = scope.launch { query.start() }
return coroutineScope.launch {
try {
withTimeout(query.options.prefetchWindowTime) {
query.state.first { it.revision != revision || !it.isStaled() }
withTimeoutOrNull(query.options.prefetchWindowTime) {
query.prefetch()
}
} finally {
job.cancel()
scope.cancel()
}
}
}

override fun <T, S> prefetchInfiniteQuery(key: InfiniteQueryKey<T, S>) {
override fun <T, S> prefetchInfiniteQuery(key: InfiniteQueryKey<T, S>): Job {
val scope = CoroutineScope(policy.mainDispatcher)
val query = getInfiniteQuery(key).also { it.launchIn(scope) }
coroutineScope.launch {
val revision = query.state.value.revision
val job = scope.launch { query.start() }
return coroutineScope.launch {
try {
withTimeout(query.options.prefetchWindowTime) {
query.state.first { it.revision != revision || !it.isStaled() }
withTimeoutOrNull(query.options.prefetchWindowTime) {
query.prefetch()
}
} finally {
job.cancel()
scope.cancel()
}
}
}
Expand Down

0 comments on commit a105983

Please sign in to comment.