-
Notifications
You must be signed in to change notification settings - Fork 371
Generate context with suspend and nullable in subscriptions #1053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6d766a7
dc8dc70
70b6463
ee60f1b
604c610
d8c033a
86b032f
8cdef6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,6 @@ package com.expediagroup.graphql.server.spring.subscriptions | |
|
|
||
| import com.expediagroup.graphql.generator.execution.GraphQLContext | ||
| import org.springframework.web.reactive.socket.WebSocketSession | ||
| import reactor.core.publisher.Mono | ||
|
|
||
| /** | ||
| * Implementation of Apollo Subscription Server Lifecycle Events | ||
|
|
@@ -33,8 +32,8 @@ interface ApolloSubscriptionHooks { | |
| fun onConnect( | ||
| connectionParams: Map<String, String>, | ||
| session: WebSocketSession, | ||
| graphQLContext: GraphQLContext | ||
| ): Mono<GraphQLContext> = Mono.just(graphQLContext) | ||
| graphQLContext: GraphQLContext? | ||
| ): GraphQLContext? = graphQLContext | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no need for a mono wrapper anymore. We should just cache the context to be used later |
||
|
|
||
| /** | ||
| * Called when the client executes a GraphQL operation. | ||
|
|
@@ -43,7 +42,7 @@ interface ApolloSubscriptionHooks { | |
| fun onOperation( | ||
| operationMessage: SubscriptionOperationMessage, | ||
| session: WebSocketSession, | ||
| graphQLContext: GraphQLContext | ||
| graphQLContext: GraphQLContext? | ||
| ): Unit = Unit | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,8 +16,6 @@ | |
|
|
||
| package com.expediagroup.graphql.server.spring.subscriptions | ||
|
|
||
| import com.expediagroup.graphql.generator.execution.DefaultGraphQLContext | ||
| import com.expediagroup.graphql.generator.execution.GraphQLContext | ||
| import com.expediagroup.graphql.server.spring.GraphQLConfigurationProperties | ||
| import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT | ||
| import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_TERMINATE | ||
|
|
@@ -33,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper | |
| import com.fasterxml.jackson.module.kotlin.convertValue | ||
| import com.fasterxml.jackson.module.kotlin.readValue | ||
| import kotlinx.coroutines.ExperimentalCoroutinesApi | ||
| import kotlinx.coroutines.runBlocking | ||
| import org.slf4j.LoggerFactory | ||
| import org.springframework.web.reactive.socket.WebSocketSession | ||
| import reactor.core.publisher.Flux | ||
|
|
@@ -66,7 +65,7 @@ class ApolloSubscriptionProtocolHandler( | |
| return try { | ||
| when (operationMessage.type) { | ||
| GQL_CONNECTION_INIT.type -> onInit(operationMessage, session) | ||
| GQL_START.type -> onStart(operationMessage, session) | ||
| GQL_START.type -> startSubscription(operationMessage, session) | ||
| GQL_STOP.type -> onStop(operationMessage, session) | ||
| GQL_CONNECTION_TERMINATE.type -> onDisconnect(session) | ||
| else -> onUnknownOperation(operationMessage, session) | ||
|
|
@@ -104,17 +103,18 @@ class ApolloSubscriptionProtocolHandler( | |
| @Suppress("Detekt.TooGenericExceptionCaught") | ||
| private fun startSubscription( | ||
| operationMessage: SubscriptionOperationMessage, | ||
| session: WebSocketSession, | ||
| context: GraphQLContext | ||
| session: WebSocketSession | ||
| ): Flux<SubscriptionOperationMessage> { | ||
| val context = sessionState.getContext(session) | ||
|
|
||
| subscriptionHooks.onOperation(operationMessage, session, context) | ||
|
|
||
| if (operationMessage.id == null) { | ||
| logger.error("GraphQL subscription operation id is required") | ||
| return Flux.just(basicConnectionErrorMessage) | ||
| } | ||
|
|
||
| if (sessionState.operationExists(session, operationMessage)) { | ||
| if (sessionState.doesOperationExist(session, operationMessage)) { | ||
| logger.info("Already subscribed to operation ${operationMessage.id} for session ${session.id}") | ||
| return Flux.empty() | ||
| } | ||
|
|
@@ -149,13 +149,23 @@ class ApolloSubscriptionProtocolHandler( | |
| } | ||
|
|
||
| private fun onInit(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Flux<SubscriptionOperationMessage> { | ||
| val connectionParams = getConnectionParams(operationMessage.payload) | ||
| val graphQLContext = contextFactory.generateContext(session) ?: DefaultGraphQLContext() | ||
| val onConnect = subscriptionHooks.onConnect(connectionParams, session, graphQLContext) | ||
| sessionState.saveContext(session, onConnect) | ||
| val acknowledgeMessage = Flux.just(acknowledgeMessage) | ||
| saveContext(operationMessage, session) | ||
| val acknowledgeMessage = Mono.just(acknowledgeMessage) | ||
| val keepAliveFlux = getKeepAliveFlux(session) | ||
| return acknowledgeMessage.concatWith(keepAliveFlux) | ||
| .onErrorReturn(getConnectionErrorMessage(operationMessage)) | ||
| } | ||
|
|
||
| /** | ||
| * Generate the context and save it for all future messages. | ||
| */ | ||
| private fun saveContext(operationMessage: SubscriptionOperationMessage, session: WebSocketSession) { | ||
| runBlocking { | ||
| val connectionParams = getConnectionParams(operationMessage.payload) | ||
| val context = contextFactory.generateContext(session) | ||
| val onConnect = subscriptionHooks.onConnect(connectionParams, session, context) | ||
| sessionState.saveContext(session, onConnect) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -172,25 +182,6 @@ class ApolloSubscriptionProtocolHandler( | |
| return emptyMap() | ||
| } | ||
|
|
||
| /** | ||
| * Called when the client sends the start message. | ||
| * It triggers the specific hooks first, runs the operation, and appends it with a complete message. | ||
| */ | ||
| private fun onStart( | ||
| operationMessage: SubscriptionOperationMessage, | ||
| session: WebSocketSession | ||
| ): Flux<SubscriptionOperationMessage> { | ||
| val context = sessionState.getContext(session) | ||
|
|
||
| // If we do not have a context, that means the init message was never sent | ||
| return if (context != null) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The context could be null. Since we have to handle this case this is what sparked all the other changes in subscriptions |
||
| context.flatMapMany { startSubscription(operationMessage, session, it) } | ||
| } else { | ||
| val message = getConnectionErrorMessage(operationMessage) | ||
| Flux.just(message) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Called with the publisher has completed on its own. | ||
| */ | ||
|
|
@@ -216,7 +207,7 @@ class ApolloSubscriptionProtocolHandler( | |
| private fun onDisconnect(session: WebSocketSession): Flux<SubscriptionOperationMessage> { | ||
| subscriptionHooks.onDisconnect(session) | ||
| sessionState.terminateSession(session) | ||
| return Flux.empty<SubscriptionOperationMessage>() | ||
| return Flux.empty() | ||
| } | ||
|
|
||
| private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Flux<SubscriptionOperationMessage> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With removal of the default context factory we don't have code we are testing much anymore. We could separate some of the routers to internal functions but there is not much going on anymore in the spring module other than defining the beans and subscriptions which are covered.