diff --git a/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/BaseFlowSubscriptionExecutionStrategy.kt b/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/BaseFlowSubscriptionExecutionStrategy.kt new file mode 100644 index 0000000000..bca74e5fbd --- /dev/null +++ b/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/BaseFlowSubscriptionExecutionStrategy.kt @@ -0,0 +1,192 @@ +/* + * Copyright 2020 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.generator.execution + +import graphql.ExecutionResult +import graphql.ExecutionResultImpl +import graphql.execution.DataFetcherExceptionHandler +import graphql.execution.ExecutionContext +import graphql.execution.ExecutionStepInfo +import graphql.execution.ExecutionStrategy +import graphql.execution.ExecutionStrategyParameters +import graphql.execution.SimpleDataFetcherExceptionHandler +import graphql.execution.SubscriptionExecutionStrategy +import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters +import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters +import graphql.execution.instrumentation.parameters.InstrumentationFieldParameters +import graphql.schema.GraphQLObjectType +import kotlinx.coroutines.flow.Flow +import org.reactivestreams.Publisher +import java.util.Collections +import java.util.concurrent.CompletableFuture + +/** + * Abstract [SubscriptionExecutionStrategy] replacement that and allows schema subscription + * functions to return either a [Flow] or a [Publisher]. + * + * Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy], + * with updated [createSourceEventStream] that supports [Flow] and [Publisher]. Any returned + * [Flow]s/[Publisher]s will be automatically converted to corresponding [T] by the + * implementing subclass. + */ +abstract class BaseFlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) { + constructor() : this(SimpleDataFetcherExceptionHandler()) + + /** + * Convert a schema returned [Flow] or [Publisher] to the supported [T] variant for this implementation. + */ + abstract fun convertToSupportedFlow(publisherOrFlow: Any?): T? + + /** + * Returns a function that takes a [Flow] or [Publisher], subscribes to it, and returns a + * corresponding [ExecutionResult] + */ + abstract fun getSubscriberAdapter(executionContext: ExecutionContext, parameters: ExecutionStrategyParameters): (T?) -> ExecutionResult + + override fun execute( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters + ): CompletableFuture { + + val instrumentation = executionContext.instrumentation + val instrumentationParameters = InstrumentationExecutionStrategyParameters(executionContext, parameters) + val executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters) + + val sourceEventStream = createSourceEventStream(executionContext, parameters) + + // + // when the upstream source event stream completes, subscribe to it and wire in our adapter + val overallResult: CompletableFuture = sourceEventStream.thenApply(getSubscriberAdapter(executionContext, parameters)) + + // dispatched the subscription query + executionStrategyCtx.onDispatched(overallResult) + overallResult.whenComplete(executionStrategyCtx::onCompleted) + + return overallResult + } + /* + https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md + + CreateSourceEventStream(subscription, schema, variableValues, initialValue): + + Let {subscriptionType} be the root Subscription type in {schema}. + Assert: {subscriptionType} is an Object type. + Let {selectionSet} be the top level Selection Set in {subscription}. + Let {rootField} be the first top level field in {selectionSet}. + Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}. + Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}. + Return {fieldStream}. + */ + private fun createSourceEventStream( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters + ): CompletableFuture { + val newParameters = firstFieldOfSubscriptionSelection(parameters) + + val fieldFetched = fetchField(executionContext, newParameters) + return fieldFetched.thenApply { fetchedValue -> + convertToSupportedFlow(fetchedValue.fetchedValue) + } + } + + /* + ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): + + Let {subscriptionType} be the root Subscription type in {schema}. + Assert: {subscriptionType} is an Object type. + Let {selectionSet} be the top level Selection Set in {subscription}. + Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization). + Let {errors} be any field errors produced while executing the selection set. + Return an unordered map containing {data} and {errors}. + + Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced. + */ + protected fun executeSubscriptionEvent( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters, + eventPayload: Any? + ): CompletableFuture { + val instrumentation = executionContext.instrumentation + + val newExecutionContext = executionContext.transform { builder -> + builder + .root(eventPayload) + .resetErrors() + } + val newParameters = firstFieldOfSubscriptionSelection(parameters) + val subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters) + + val i13nFieldParameters = InstrumentationFieldParameters(executionContext) { subscribedFieldStepInfo } + val subscribedFieldCtx = instrumentation.beginSubscribedFieldEvent(i13nFieldParameters) + + val fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload) + + val fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue) + val overallResult = fieldValueInfo + .fieldValue + .thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) } + + // dispatch instrumentation so they can know about each subscription event + subscribedFieldCtx.onDispatched(overallResult) + overallResult.whenComplete(subscribedFieldCtx::onCompleted) + + // allow them to instrument each ER should they want to + val i13ExecutionParameters = InstrumentationExecutionParameters( + executionContext.executionInput, executionContext.graphQLSchema, executionContext.instrumentationState + ) + + return overallResult.thenCompose { executionResult -> + instrumentation.instrumentExecutionResult(executionResult, i13ExecutionParameters) + } + } + + private fun wrapWithRootFieldName( + parameters: ExecutionStrategyParameters, + executionResult: ExecutionResult + ): ExecutionResult { + val rootFieldName = getRootFieldName(parameters) + return ExecutionResultImpl( + Collections.singletonMap(rootFieldName, executionResult.getData()), + executionResult.errors + ) + } + + private fun getRootFieldName(parameters: ExecutionStrategyParameters): String { + val rootField = parameters.field.singleField + return if (rootField.alias != null) rootField.alias else rootField.name + } + + private fun firstFieldOfSubscriptionSelection( + parameters: ExecutionStrategyParameters + ): ExecutionStrategyParameters { + val fields = parameters.fields + val firstField = fields.getSubField(fields.keys[0]) + + val fieldPath = parameters.path.segment(mkNameForPath(firstField.singleField)) + return parameters.transform { builder -> builder.field(firstField).path(fieldPath) } + } + + private fun createSubscribedFieldStepInfo( + executionContext: ExecutionContext, + parameters: ExecutionStrategyParameters + ): ExecutionStepInfo { + val field = parameters.field.singleField + val parentType = parameters.executionStepInfo.unwrappedNonNullType as GraphQLObjectType + val fieldDef = getFieldDef(executionContext.graphQLSchema, parentType, field) + return createExecutionStepInfo(executionContext, parameters, fieldDef, parentType) + } +} diff --git a/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/FlowSubscriptionExecutionStrategy.kt b/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/FlowSubscriptionExecutionStrategy.kt index e360157379..a8fac87136 100644 --- a/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/FlowSubscriptionExecutionStrategy.kt +++ b/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/FlowSubscriptionExecutionStrategy.kt @@ -20,49 +20,34 @@ import graphql.ExecutionResult import graphql.ExecutionResultImpl import graphql.execution.DataFetcherExceptionHandler import graphql.execution.ExecutionContext -import graphql.execution.ExecutionStepInfo -import graphql.execution.ExecutionStrategy import graphql.execution.ExecutionStrategyParameters import graphql.execution.SimpleDataFetcherExceptionHandler import graphql.execution.SubscriptionExecutionStrategy -import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters -import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters -import graphql.execution.instrumentation.parameters.InstrumentationFieldParameters import graphql.execution.reactive.CompletionStageMappingPublisher -import graphql.schema.GraphQLObjectType import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.asPublisher import org.reactivestreams.Publisher -import java.util.Collections -import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import java.util.function.Function /** * [SubscriptionExecutionStrategy] replacement that and allows schema subscription functions - * to return either a [Flow] or a [Publisher]. - * - * Note this implementation is mostly a java->kotlin copy of [SubscriptionExecutionStrategy], - * with updated [createSourceEventStream] that supports [Flow] and [Publisher]. Any returned - * [Flow]s will be automatically converted to corresponding [Publisher]. + * to return either a [Flow] or a [Publisher], and converts [Flow]s to [Publisher]s. */ -class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : ExecutionStrategy(dfe) { +class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : BaseFlowSubscriptionExecutionStrategy>(dfe) { constructor() : this(SimpleDataFetcherExceptionHandler()) - override fun execute( - executionContext: ExecutionContext, - parameters: ExecutionStrategyParameters - ): CompletableFuture { - - val instrumentation = executionContext.instrumentation - val instrumentationParameters = InstrumentationExecutionStrategyParameters(executionContext, parameters) - val executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters) - - val sourceEventStream = createSourceEventStream(executionContext, parameters) + override fun convertToSupportedFlow(publisherOrFlow: Any?): Publisher? { + return when (publisherOrFlow) { + is Publisher<*> -> publisherOrFlow + // below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance + is Flow<*> -> (publisherOrFlow as? Flow)?.asPublisher() + else -> null + } + } - // - // when the upstream source event stream completes, subscribe to it and wire in our adapter - val overallResult: CompletableFuture = sourceEventStream.thenApply { publisher -> + override fun getSubscriberAdapter(executionContext: ExecutionContext, parameters: ExecutionStrategyParameters): (Publisher?) -> ExecutionResult { + return { publisher -> if (publisher == null) { ExecutionResultImpl(null, executionContext.errors) } else { @@ -78,129 +63,5 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec ExecutionResultImpl(mapSourceToResponse, executionContext.errors) } } - - // dispatched the subscription query - executionStrategyCtx.onDispatched(overallResult) - overallResult.whenComplete(executionStrategyCtx::onCompleted) - - return overallResult - } - - /* - https://github.com/facebook/graphql/blob/master/spec/Section%206%20--%20Execution.md - - CreateSourceEventStream(subscription, schema, variableValues, initialValue): - - Let {subscriptionType} be the root Subscription type in {schema}. - Assert: {subscriptionType} is an Object type. - Let {selectionSet} be the top level Selection Set in {subscription}. - Let {rootField} be the first top level field in {selectionSet}. - Let {argumentValues} be the result of {CoerceArgumentValues(subscriptionType, rootField, variableValues)}. - Let {fieldStream} be the result of running {ResolveFieldEventStream(subscriptionType, initialValue, rootField, argumentValues)}. - Return {fieldStream}. - */ - private fun createSourceEventStream( - executionContext: ExecutionContext, - parameters: ExecutionStrategyParameters - ): CompletableFuture?> { - val newParameters = firstFieldOfSubscriptionSelection(parameters) - - val fieldFetched = fetchField(executionContext, newParameters) - return fieldFetched.thenApply { fetchedValue -> - val publisher = when (val publisherOrFlow: Any? = fetchedValue.fetchedValue) { - is Publisher<*> -> publisherOrFlow - // below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance - is Flow<*> -> (publisherOrFlow as? Flow)?.asPublisher() - else -> null - } - publisher - } - } - - /* - ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): - - Let {subscriptionType} be the root Subscription type in {schema}. - Assert: {subscriptionType} is an Object type. - Let {selectionSet} be the top level Selection Set in {subscription}. - Let {data} be the result of running {ExecuteSelectionSet(selectionSet, subscriptionType, initialValue, variableValues)} normally (allowing parallelization). - Let {errors} be any field errors produced while executing the selection set. - Return an unordered map containing {data} and {errors}. - - Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced. - */ - private fun executeSubscriptionEvent( - executionContext: ExecutionContext, - parameters: ExecutionStrategyParameters, - eventPayload: Any? - ): CompletableFuture { - val instrumentation = executionContext.instrumentation - - val newExecutionContext = executionContext.transform { builder -> - builder - .root(eventPayload) - .resetErrors() - } - val newParameters = firstFieldOfSubscriptionSelection(parameters) - val subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters) - - val i13nFieldParameters = InstrumentationFieldParameters(executionContext) { subscribedFieldStepInfo } - val subscribedFieldCtx = instrumentation.beginSubscribedFieldEvent(i13nFieldParameters) - - val fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload) - - val fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue) - val overallResult = fieldValueInfo - .fieldValue - .thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) } - - // dispatch instrumentation so they can know about each subscription event - subscribedFieldCtx.onDispatched(overallResult) - overallResult.whenComplete(subscribedFieldCtx::onCompleted) - - // allow them to instrument each ER should they want to - val i13ExecutionParameters = InstrumentationExecutionParameters( - executionContext.executionInput, executionContext.graphQLSchema, executionContext.instrumentationState - ) - - return overallResult.thenCompose { executionResult -> - instrumentation.instrumentExecutionResult(executionResult, i13ExecutionParameters) - } - } - - private fun wrapWithRootFieldName( - parameters: ExecutionStrategyParameters, - executionResult: ExecutionResult - ): ExecutionResult { - val rootFieldName = getRootFieldName(parameters) - return ExecutionResultImpl( - Collections.singletonMap(rootFieldName, executionResult.getData()), - executionResult.errors - ) - } - - private fun getRootFieldName(parameters: ExecutionStrategyParameters): String { - val rootField = parameters.field.singleField - return if (rootField.alias != null) rootField.alias else rootField.name - } - - private fun firstFieldOfSubscriptionSelection( - parameters: ExecutionStrategyParameters - ): ExecutionStrategyParameters { - val fields = parameters.fields - val firstField = fields.getSubField(fields.keys[0]) - - val fieldPath = parameters.path.segment(mkNameForPath(firstField.singleField)) - return parameters.transform { builder -> builder.field(firstField).path(fieldPath) } - } - - private fun createSubscribedFieldStepInfo( - executionContext: ExecutionContext, - parameters: ExecutionStrategyParameters - ): ExecutionStepInfo { - val field = parameters.field.singleField - val parentType = parameters.executionStepInfo.unwrappedNonNullType as GraphQLObjectType - val fieldDef = getFieldDef(executionContext.graphQLSchema, parentType, field) - return createExecutionStepInfo(executionContext, parameters, fieldDef, parentType) } } diff --git a/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/NativeFlowSubscriptionExecutionStrategy.kt b/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/NativeFlowSubscriptionExecutionStrategy.kt new file mode 100644 index 0000000000..6a8a0e5f4b --- /dev/null +++ b/generator/graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/generator/execution/NativeFlowSubscriptionExecutionStrategy.kt @@ -0,0 +1,60 @@ +/* + * Copyright 2020 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.generator.execution + +import graphql.ExecutionResult +import graphql.ExecutionResultImpl +import graphql.execution.DataFetcherExceptionHandler +import graphql.execution.ExecutionContext +import graphql.execution.ExecutionStrategyParameters +import graphql.execution.SimpleDataFetcherExceptionHandler +import graphql.execution.SubscriptionExecutionStrategy +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.future.await +import kotlinx.coroutines.reactive.asFlow +import org.reactivestreams.Publisher + +/** + * [SubscriptionExecutionStrategy] replacement that and allows schema subscription functions + * to return either a [Flow] or a [Publisher], and converts [Publisher]s to [Flow]s. + */ +class NativeFlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : BaseFlowSubscriptionExecutionStrategy>(dfe) { + constructor() : this(SimpleDataFetcherExceptionHandler()) + + override fun convertToSupportedFlow(publisherOrFlow: Any?): Flow<*>? { + return when (publisherOrFlow) { + is Publisher<*> -> publisherOrFlow.asFlow() + // below explicit cast is required due to the type erasure and Kotlin declaration-site variance vs Java use-site variance + is Flow<*> -> publisherOrFlow + else -> null + } + } + + override fun getSubscriberAdapter(executionContext: ExecutionContext, parameters: ExecutionStrategyParameters): (Flow<*>?) -> ExecutionResult { + return { sourceFlow -> + if (sourceFlow == null) { + ExecutionResultImpl(null, executionContext.errors) + } else { + val returnFlow = sourceFlow.map { + executeSubscriptionEvent(executionContext, parameters, it).await() + } + ExecutionResultImpl(returnFlow, executionContext.errors) + } + } + } +} diff --git a/generator/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/generator/execution/NativeFlowSubscriptionExecutionStrategyTest.kt b/generator/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/generator/execution/NativeFlowSubscriptionExecutionStrategyTest.kt new file mode 100644 index 0000000000..cce709f22a --- /dev/null +++ b/generator/graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/generator/execution/NativeFlowSubscriptionExecutionStrategyTest.kt @@ -0,0 +1,262 @@ +/* + * Copyright 2020 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.generator.execution + +import com.expediagroup.graphql.generator.SchemaGeneratorConfig +import com.expediagroup.graphql.generator.TopLevelObject +import com.expediagroup.graphql.generator.exceptions.GraphQLKotlinException +import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorHooks +import com.expediagroup.graphql.generator.toSchema +import graphql.ExecutionInput +import graphql.ExecutionResult +import graphql.ExecutionResultImpl +import graphql.GraphQL +import graphql.GraphQLError +import graphql.GraphqlErrorBuilder +import graphql.execution.DataFetcherResult +import graphql.execution.instrumentation.SimpleInstrumentation +import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters +import graphql.schema.GraphQLSchema +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import org.reactivestreams.Publisher +import java.util.concurrent.CompletableFuture +import kotlin.test.assertEquals +import kotlin.test.assertNull +import kotlin.test.assertTrue + +@InternalCoroutinesApi +class NativeFlowSubscriptionExecutionStrategyTest { + + private val testSchema: GraphQLSchema = toSchema( + config = SchemaGeneratorConfig( + supportedPackages = listOf("com.expediagroup.graphql.generator.execution"), + hooks = FlowSubscriptionSchemaGeneratorHooks() + ), + queries = listOf(TopLevelObject(BasicQuery())), + mutations = listOf(TopLevelObject(BasicQuery())), + subscriptions = listOf(TopLevelObject(FlowSubscription())) + ) + private val testGraphQL: GraphQL = GraphQL.newGraphQL(testSchema) + .subscriptionExecutionStrategy(NativeFlowSubscriptionExecutionStrategy()) + .instrumentation(TestInstrumentation()) + .build() + + @Test + fun `verify subscription to flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { ticker }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + list.add(it.getData>().getValue("ticker")) + assertEquals(it.extensions["testKey"], "testValue") + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to datafetcher flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { datafetcher }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + val intVal = it.getData>().getValue("datafetcher") + list.add(intVal) + assertEquals(it.extensions["testKey"], "testValue") + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to publisher`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { publisherTicker }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + list.add(it.getData>().getValue("publisherTicker")) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to flow with context`() = runBlocking { + val request = ExecutionInput.newExecutionInput() + .query("subscription { contextualTicker }") + .context(SubscriptionContext("junitHandler")) + .build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + val contextValue = it.getData>().getValue("contextualTicker") + assertTrue(contextValue.startsWith("junitHandler:")) + list.add(contextValue.substringAfter("junitHandler:").toInt()) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + @Test + fun `verify subscription to failing flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { alwaysThrows }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val errors = mutableListOf() + val results = mutableListOf() + try { + flow.collect { + val dataMap = it.getData>() + if (dataMap != null) { + results.add(dataMap.getValue("alwaysThrows")) + } + errors.addAll(it.errors) + } + } catch (e: Exception) { + errors.add(GraphqlErrorBuilder.newError().message(e.message).build()) + } + + assertEquals(2, results.size) + for (i in results.indices) { + assertEquals(i + 1, results[i]) + } + assertEquals(1, errors.size) + assertEquals("JUNIT subscription failure", errors[0].message) + } + + @Test + fun `verify subscription to exploding flow`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { throwsFast }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val errors = response.errors + assertNull(flow) + assertEquals(1, errors.size) + assertEquals("JUNIT flow failure", errors[0].message.substringAfter(" : ")) + } + + @Test + fun `verify subscription alias`() = runBlocking { + val request = ExecutionInput.newExecutionInput().query("subscription { t: ticker }").build() + val response = testGraphQL.execute(request) + val flow = response.getData>() + val list = mutableListOf() + flow.collect { + list.add(it.getData>().getValue("t")) + } + assertEquals(5, list.size) + for (i in list.indices) { + assertEquals(i + 1, list[i]) + } + } + + // GraphQL spec requires at least single query to be present as Query type is needed to run introspection queries + // see: https://github.com/graphql/graphql-spec/issues/490 and https://github.com/graphql/graphql-spec/issues/568 + class BasicQuery { + @Suppress("Detekt.FunctionOnlyReturningConstant") + fun query(): String = "hello" + } + + class TestInstrumentation : SimpleInstrumentation() { + override fun instrumentExecutionResult( + executionResult: ExecutionResult, + parameters: InstrumentationExecutionParameters? + ): CompletableFuture { + return CompletableFuture.completedFuture( + ExecutionResultImpl.newExecutionResult() + .from(executionResult) + .addExtension("testKey", "testValue") + .build() + ) + } + } + + class FlowSubscription { + fun ticker(): Flow { + return flow { + for (i in 1..5) { + delay(100) + emit(i) + } + } + } + + fun datafetcher(): Flow> { + return flow { + for (i in 1..5) { + delay(100) + emit(DataFetcherResult.newResult().data(i).build()) + } + } + } + + fun publisherTicker(): Publisher { + return flow { + for (i in 1..5) { + delay(100) + emit(i) + } + }.asPublisher() + } + + fun throwsFast(): Flow { + throw GraphQLKotlinException("JUNIT flow failure") + } + + fun alwaysThrows(): Flow { + return flow { + for (i in 1..5) { + if (i > 2) { + throw GraphQLKotlinException("JUNIT subscription failure") + } + delay(100) + emit(i) + } + } + } + + fun contextualTicker(context: SubscriptionContext): Flow { + return flow { + for (i in 1..5) { + delay(100) + emit("${context.value}:$i") + } + } + } + } + + data class SubscriptionContext(val value: String) : GraphQLContext +}