diff --git a/docs/schema-generator/execution/subscriptions.md b/docs/schema-generator/execution/subscriptions.md index 998be17a0c..14a94ab427 100644 --- a/docs/schema-generator/execution/subscriptions.md +++ b/docs/schema-generator/execution/subscriptions.md @@ -26,6 +26,11 @@ toSchema( ) ``` +### Flow Support + +`graphql-kotlin` provides support for Kotlin `Flow` through `FlowSubscriptionExecutionStrategy` that automatically converts +`Flow` to a `Publisher`. + ### Subscription Hooks #### `didGenerateSubscriptionType` diff --git a/docs/spring-server/subscriptions.md b/docs/spring-server/subscriptions.md index 14024a1738..ef4e8bd27f 100644 --- a/docs/spring-server/subscriptions.md +++ b/docs/spring-server/subscriptions.md @@ -4,7 +4,12 @@ title: Subscriptions --- ## Schema -To see more details of how to implement subscriptions in your schema, see [executing subscriptions](../execution/subscriptions). +To see more details of how to implement subscriptions in your schema, see [executing subscriptions](../schema-generator/execution/subscriptions). + +## Flow Support + +`graphql-kotlin-spring-server` provides automatic support for Kotlin `Flow` through `FlowSubscriptionExecutionStrategy` +that supports existing `Publisher`s and relies on Kotlin reactive-streams interop to convert `Flow` to a `Publisher`. ## `graphql-ws` subprotocol ### Overview diff --git a/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt b/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt index 5c06be8329..6ae7d97c8e 100644 --- a/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt +++ b/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt @@ -31,24 +31,20 @@ import graphql.execution.instrumentation.parameters.InstrumentationFieldParamete import graphql.execution.reactive.CompletionStageMappingPublisher import graphql.schema.GraphQLObjectType import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.future.await -import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.asPublisher import org.reactivestreams.Publisher import java.util.Collections import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.function.Function /** - * [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult] - * that is a [Flow] instead of a [Publisher], and allows schema subscription functions + * [SubscriptionExecutionStrategy] replacement that and allows schema subscription functions * to return either a [Flow] or a [Publisher]. * * Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy], - * with [CompletionStageMappingPublisher] replaced by a [Flow] mapping, and [Flow] allowed - * as an additional return type. Any [Publisher]s returned will be converted to [Flow]s, - * which may lose meaningful context information, so users are encouraged to create and - * consume [Flow]s directly (see https://github.com/Kotlin/kotlinx.coroutines/issues/1825 - * https://github.com/Kotlin/kotlinx.coroutines/issues/1860 for some examples of lost context) + * with updated [createSourceEventStream] that supports [Flow] and [Publisher]. Any returned + * [Flow]s will be automatically converted to corresponding [Publisher]. */ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) { constructor() : this(SimpleDataFetcherExceptionHandler()) @@ -66,14 +62,20 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec // // when the upstream source event stream completes, subscribe to it and wire in our adapter - val overallResult: CompletableFuture = sourceEventStream.thenApply { sourceFlow -> - if (sourceFlow == null) { + val overallResult: CompletableFuture = sourceEventStream.thenApply { publisher -> + if (publisher == null) { ExecutionResultImpl(null, executionContext.errors) } else { - val returnFlow = sourceFlow.map { - executeSubscriptionEvent(executionContext, parameters, it).await() + val mapperFunction = Function> { eventPayload: Any? -> + executeSubscriptionEvent( + executionContext, + parameters, + eventPayload + ) } - ExecutionResultImpl(returnFlow, executionContext.errors) + // we need explicit cast as Kotlin Flow is covariant (Flow vs Publisher) + val mapSourceToResponse = CompletionStageMappingPublisher(publisher as Publisher, mapperFunction) + ExecutionResultImpl(mapSourceToResponse, executionContext.errors) } } @@ -100,17 +102,18 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec private fun createSourceEventStream( executionContext: ExecutionContext, parameters: ExecutionStrategyParameters - ): CompletableFuture> { + ): CompletableFuture?> { val newParameters = firstFieldOfSubscriptionSelection(parameters) val fieldFetched = fetchField(executionContext, newParameters) return fieldFetched.thenApply { fetchedValue -> - val flow = when (val publisherOrFlow = fetchedValue.fetchedValue) { - is Publisher<*> -> publisherOrFlow.asFlow() - is Flow<*> -> publisherOrFlow + val publisher = when (val publisherOrFlow: Any? = fetchedValue.fetchedValue) { + is Publisher<*> -> publisherOrFlow + // below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance + is Flow<*> -> (publisherOrFlow as? Flow)?.asPublisher() else -> null } - flow + publisher } } diff --git a/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt b/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt index f08e9deecd..334cc85d9a 100644 --- a/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt +++ b/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt @@ -34,11 +34,9 @@ import graphql.schema.GraphQLSchema import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.reactive.collect import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import org.reactivestreams.Publisher @@ -68,9 +66,9 @@ class FlowSubscriptionExecutionStrategyTest { fun `verify subscription to flow`() = runBlocking { val request = ExecutionInput.newExecutionInput().query("subscription { ticker }").build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val list = mutableListOf() - flow.collect { + publisher.collect { list.add(it.getData>().getValue("ticker")) assertEquals(it.extensions["testKey"], "testValue") } @@ -84,9 +82,9 @@ class FlowSubscriptionExecutionStrategyTest { fun `verify subscription to datafetcher flow`() = runBlocking { val request = ExecutionInput.newExecutionInput().query("subscription { datafetcher }").build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val list = mutableListOf() - flow.collect { + publisher.collect { val intVal = it.getData>().getValue("datafetcher") list.add(intVal) assertEquals(it.extensions["testKey"], "testValue") @@ -101,9 +99,9 @@ class FlowSubscriptionExecutionStrategyTest { fun `verify subscription to publisher`() = runBlocking { val request = ExecutionInput.newExecutionInput().query("subscription { publisherTicker }").build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val list = mutableListOf() - flow.collect { + publisher.collect { list.add(it.getData>().getValue("publisherTicker")) } assertEquals(5, list.size) @@ -119,9 +117,9 @@ class FlowSubscriptionExecutionStrategyTest { .context(SubscriptionContext("junitHandler")) .build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val list = mutableListOf() - flow.collect { + publisher.collect { val contextValue = it.getData>().getValue("contextualTicker") assertTrue(contextValue.startsWith("junitHandler:")) list.add(contextValue.substringAfter("junitHandler:").toInt()) @@ -136,18 +134,21 @@ class FlowSubscriptionExecutionStrategyTest { fun `verify subscription to failing flow`() = runBlocking { val request = ExecutionInput.newExecutionInput().query("subscription { alwaysThrows }").build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val errors = mutableListOf() val results = mutableListOf() - flow.onEach { - val dataMap = it.getData>() - if (dataMap != null) { - results.add(dataMap.getValue("alwaysThrows")) + try { + publisher.collect { + val dataMap = it.getData>() + if (dataMap != null) { + results.add(dataMap.getValue("alwaysThrows")) + } + errors.addAll(it.errors) } - errors.addAll(it.errors) - }.catch { - errors.add(GraphqlErrorBuilder.newError().message(it.message).build()) - }.collect() + } catch (e: Exception) { + errors.add(GraphqlErrorBuilder.newError().message(e.message).build()) + } + assertEquals(2, results.size) for (i in results.indices) { assertEquals(i + 1, results[i]) @@ -160,9 +161,9 @@ class FlowSubscriptionExecutionStrategyTest { fun `verify subscription to exploding flow`() = runBlocking { val request = ExecutionInput.newExecutionInput().query("subscription { throwsFast }").build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val errors = response.errors - assertNull(flow) + assertNull(publisher) assertEquals(1, errors.size) assertEquals("JUNIT flow failure", errors[0].message.substringAfter(" : ")) } @@ -171,9 +172,9 @@ class FlowSubscriptionExecutionStrategyTest { fun `verify subscription alias`() = runBlocking { val request = ExecutionInput.newExecutionInput().query("subscription { t: ticker }").build() val response = testGraphQL.execute(request) - val flow = response.getData>() + val publisher = response.getData>() val list = mutableListOf() - flow.collect { + publisher.collect { list.add(it.getData>().getValue("t")) } assertEquals(5, list.size) @@ -217,7 +218,7 @@ class FlowSubscriptionExecutionStrategyTest { return flow { for (i in 1..5) { delay(100) - emit(DataFetcherResult(i, listOf())) + emit(DataFetcherResult.newResult().data(i).build()) } } } diff --git a/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/GraphQLSchemaConfiguration.kt b/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/GraphQLSchemaConfiguration.kt index e613046d40..cb0068c512 100644 --- a/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/GraphQLSchemaConfiguration.kt +++ b/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/spring/GraphQLSchemaConfiguration.kt @@ -16,6 +16,7 @@ package com.expediagroup.graphql.spring +import com.expediagroup.graphql.execution.FlowSubscriptionExecutionStrategy import com.expediagroup.graphql.spring.execution.DataLoaderRegistryFactory import com.expediagroup.graphql.spring.execution.QueryHandler import com.expediagroup.graphql.spring.execution.SimpleQueryHandler @@ -24,7 +25,6 @@ import graphql.execution.AsyncExecutionStrategy import graphql.execution.AsyncSerialExecutionStrategy import graphql.execution.DataFetcherExceptionHandler import graphql.execution.ExecutionIdProvider -import graphql.execution.SubscriptionExecutionStrategy import graphql.execution.instrumentation.ChainedInstrumentation import graphql.execution.instrumentation.Instrumentation import graphql.execution.preparsed.PreparsedDocumentProvider @@ -69,7 +69,7 @@ class GraphQLSchemaConfiguration { val graphQL = GraphQL.newGraphQL(schema) .queryExecutionStrategy(AsyncExecutionStrategy(dataFetcherExceptionHandler)) .mutationExecutionStrategy(AsyncSerialExecutionStrategy(dataFetcherExceptionHandler)) - .subscriptionExecutionStrategy(SubscriptionExecutionStrategy(dataFetcherExceptionHandler)) + .subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy(dataFetcherExceptionHandler)) instrumentations.ifPresent { unordered -> if (unordered.size == 1) {