Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/schema-generator/execution/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ toSchema(
)
```

### Flow Support

`graphql-kotlin` provides support for Kotlin `Flow` through `FlowSubscriptionExecutionStrategy` that automatically converts
`Flow` to a `Publisher`.

### Subscription Hooks

#### `didGenerateSubscriptionType`
Expand Down
7 changes: 6 additions & 1 deletion docs/spring-server/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ title: Subscriptions
---

## Schema
To see more details of how to implement subscriptions in your schema, see [executing subscriptions](../execution/subscriptions).
To see more details of how to implement subscriptions in your schema, see [executing subscriptions](../schema-generator/execution/subscriptions).

## Flow Support

`graphql-kotlin-spring-server` provides automatic support for Kotlin `Flow` through `FlowSubscriptionExecutionStrategy`
that supports existing `Publisher`s and relies on Kotlin reactive-streams interop to convert `Flow` to a `Publisher`.

## `graphql-ws` subprotocol
### Overview
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,20 @@ import graphql.execution.instrumentation.parameters.InstrumentationFieldParamete
import graphql.execution.reactive.CompletionStageMappingPublisher
import graphql.schema.GraphQLObjectType
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.future.await
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import org.reactivestreams.Publisher
import java.util.Collections
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Function

/**
* [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult]
* that is a [Flow] instead of a [Publisher], and allows schema subscription functions
* [SubscriptionExecutionStrategy] replacement that and allows schema subscription functions
* to return either a [Flow] or a [Publisher].
*
* Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy],
* with [CompletionStageMappingPublisher] replaced by a [Flow] mapping, and [Flow] allowed
* as an additional return type. Any [Publisher]s returned will be converted to [Flow]s,
* which may lose meaningful context information, so users are encouraged to create and
* consume [Flow]s directly (see https://github.com/Kotlin/kotlinx.coroutines/issues/1825
* https://github.com/Kotlin/kotlinx.coroutines/issues/1860 for some examples of lost context)
* with updated [createSourceEventStream] that supports [Flow] and [Publisher]. Any returned
* [Flow]s will be automatically converted to corresponding [Publisher].
*/
class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) {
constructor() : this(SimpleDataFetcherExceptionHandler())
Expand All @@ -66,14 +62,20 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec

//
// when the upstream source event stream completes, subscribe to it and wire in our adapter
val overallResult: CompletableFuture<ExecutionResult> = sourceEventStream.thenApply { sourceFlow ->
if (sourceFlow == null) {
val overallResult: CompletableFuture<ExecutionResult> = sourceEventStream.thenApply { publisher ->
if (publisher == null) {
ExecutionResultImpl(null, executionContext.errors)
} else {
val returnFlow = sourceFlow.map {
executeSubscriptionEvent(executionContext, parameters, it).await()
val mapperFunction = Function<Any, CompletionStage<ExecutionResult>> { eventPayload: Any? ->
executeSubscriptionEvent(
executionContext,
parameters,
eventPayload
)
}
ExecutionResultImpl(returnFlow, executionContext.errors)
// we need explicit cast as Kotlin Flow is covariant (Flow<out T> vs Publisher<T>)
val mapSourceToResponse = CompletionStageMappingPublisher<ExecutionResult, Any>(publisher as Publisher<Any>, mapperFunction)
ExecutionResultImpl(mapSourceToResponse, executionContext.errors)
}
}

Expand All @@ -100,17 +102,18 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec
private fun createSourceEventStream(
executionContext: ExecutionContext,
parameters: ExecutionStrategyParameters
): CompletableFuture<Flow<*>> {
): CompletableFuture<Publisher<out Any>?> {
val newParameters = firstFieldOfSubscriptionSelection(parameters)

val fieldFetched = fetchField(executionContext, newParameters)
return fieldFetched.thenApply { fetchedValue ->
val flow = when (val publisherOrFlow = fetchedValue.fetchedValue) {
is Publisher<*> -> publisherOrFlow.asFlow()
is Flow<*> -> publisherOrFlow
val publisher = when (val publisherOrFlow: Any? = fetchedValue.fetchedValue) {
is Publisher<*> -> publisherOrFlow
// below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance
is Flow<*> -> (publisherOrFlow as? Flow<Any>)?.asPublisher()
else -> null
}
flow
publisher
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ import graphql.schema.GraphQLSchema
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher
Expand Down Expand Up @@ -68,9 +66,9 @@ class FlowSubscriptionExecutionStrategyTest {
fun `verify subscription to flow`() = runBlocking {
val request = ExecutionInput.newExecutionInput().query("subscription { ticker }").build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val list = mutableListOf<Int>()
flow.collect {
publisher.collect {
list.add(it.getData<Map<String, Int>>().getValue("ticker"))
assertEquals(it.extensions["testKey"], "testValue")
}
Expand All @@ -84,9 +82,9 @@ class FlowSubscriptionExecutionStrategyTest {
fun `verify subscription to datafetcher flow`() = runBlocking {
val request = ExecutionInput.newExecutionInput().query("subscription { datafetcher }").build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val list = mutableListOf<Int>()
flow.collect {
publisher.collect {
val intVal = it.getData<Map<String, Int>>().getValue("datafetcher")
list.add(intVal)
assertEquals(it.extensions["testKey"], "testValue")
Expand All @@ -101,9 +99,9 @@ class FlowSubscriptionExecutionStrategyTest {
fun `verify subscription to publisher`() = runBlocking {
val request = ExecutionInput.newExecutionInput().query("subscription { publisherTicker }").build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val list = mutableListOf<Int>()
flow.collect {
publisher.collect {
list.add(it.getData<Map<String, Int>>().getValue("publisherTicker"))
}
assertEquals(5, list.size)
Expand All @@ -119,9 +117,9 @@ class FlowSubscriptionExecutionStrategyTest {
.context(SubscriptionContext("junitHandler"))
.build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val list = mutableListOf<Int>()
flow.collect {
publisher.collect {
val contextValue = it.getData<Map<String, String>>().getValue("contextualTicker")
assertTrue(contextValue.startsWith("junitHandler:"))
list.add(contextValue.substringAfter("junitHandler:").toInt())
Expand All @@ -136,18 +134,21 @@ class FlowSubscriptionExecutionStrategyTest {
fun `verify subscription to failing flow`() = runBlocking {
val request = ExecutionInput.newExecutionInput().query("subscription { alwaysThrows }").build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val errors = mutableListOf<GraphQLError>()
val results = mutableListOf<Int>()
flow.onEach {
val dataMap = it.getData<Map<String, Int>>()
if (dataMap != null) {
results.add(dataMap.getValue("alwaysThrows"))
try {
publisher.collect {
val dataMap = it.getData<Map<String, Int>>()
if (dataMap != null) {
results.add(dataMap.getValue("alwaysThrows"))
}
errors.addAll(it.errors)
}
errors.addAll(it.errors)
}.catch {
errors.add(GraphqlErrorBuilder.newError().message(it.message).build())
}.collect()
} catch (e: Exception) {
errors.add(GraphqlErrorBuilder.newError().message(e.message).build())
}

assertEquals(2, results.size)
for (i in results.indices) {
assertEquals(i + 1, results[i])
Expand All @@ -160,9 +161,9 @@ class FlowSubscriptionExecutionStrategyTest {
fun `verify subscription to exploding flow`() = runBlocking {
val request = ExecutionInput.newExecutionInput().query("subscription { throwsFast }").build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val errors = response.errors
assertNull(flow)
assertNull(publisher)
assertEquals(1, errors.size)
assertEquals("JUNIT flow failure", errors[0].message.substringAfter(" : "))
}
Expand All @@ -171,9 +172,9 @@ class FlowSubscriptionExecutionStrategyTest {
fun `verify subscription alias`() = runBlocking {
val request = ExecutionInput.newExecutionInput().query("subscription { t: ticker }").build()
val response = testGraphQL.execute(request)
val flow = response.getData<Flow<ExecutionResult>>()
val publisher = response.getData<Publisher<ExecutionResult>>()
val list = mutableListOf<Int>()
flow.collect {
publisher.collect {
list.add(it.getData<Map<String, Int>>().getValue("t"))
}
assertEquals(5, list.size)
Expand Down Expand Up @@ -217,7 +218,7 @@ class FlowSubscriptionExecutionStrategyTest {
return flow {
for (i in 1..5) {
delay(100)
emit(DataFetcherResult(i, listOf()))
emit(DataFetcherResult.newResult<Int>().data(i).build())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.expediagroup.graphql.spring

import com.expediagroup.graphql.execution.FlowSubscriptionExecutionStrategy
import com.expediagroup.graphql.spring.execution.DataLoaderRegistryFactory
import com.expediagroup.graphql.spring.execution.QueryHandler
import com.expediagroup.graphql.spring.execution.SimpleQueryHandler
Expand All @@ -24,7 +25,6 @@ import graphql.execution.AsyncExecutionStrategy
import graphql.execution.AsyncSerialExecutionStrategy
import graphql.execution.DataFetcherExceptionHandler
import graphql.execution.ExecutionIdProvider
import graphql.execution.SubscriptionExecutionStrategy
import graphql.execution.instrumentation.ChainedInstrumentation
import graphql.execution.instrumentation.Instrumentation
import graphql.execution.preparsed.PreparsedDocumentProvider
Expand Down Expand Up @@ -69,7 +69,7 @@ class GraphQLSchemaConfiguration {
val graphQL = GraphQL.newGraphQL(schema)
.queryExecutionStrategy(AsyncExecutionStrategy(dataFetcherExceptionHandler))
.mutationExecutionStrategy(AsyncSerialExecutionStrategy(dataFetcherExceptionHandler))
.subscriptionExecutionStrategy(SubscriptionExecutionStrategy(dataFetcherExceptionHandler))
.subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy(dataFetcherExceptionHandler))

instrumentations.ifPresent { unordered ->
if (unordered.size == 1) {
Expand Down