Skip to content

Commit a945bdc

Browse files
author
Shane Myrick
committed
Update subsctiption javadocs
1 parent 2773b49 commit a945bdc

File tree

7 files changed

+41
-61
lines changed

7 files changed

+41
-61
lines changed

examples/server/spring-server/src/main/kotlin/com/expediagroup/graphql/examples/server/spring/context/MyGraphQLContext.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ import org.springframework.web.reactive.socket.WebSocketSession
2424
* Simple [GraphQLContext] that holds extra value and the [ServerRequest]
2525
*/
2626
class MyGraphQLContext(
27-
val myCustomValue: String,
28-
val request: ServerRequest
27+
val request: ServerRequest,
28+
val myCustomValue: String
2929
) : GraphQLContext
3030

3131
/**
3232
* Simple [GraphQLContext] that holds extra value and the [WebSocketSession]
3333
*/
3434
class MySubscriptionGraphQLContext(
3535
val request: WebSocketSession,
36-
var subscriptionValue: String? = null
36+
var auth: String? = null
3737
) : GraphQLContext

examples/server/spring-server/src/main/kotlin/com/expediagroup/graphql/examples/server/spring/context/MyGraphQLContextFactory.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,19 @@ import org.springframework.web.reactive.socket.WebSocketSession
3030
class MyGraphQLContextFactory : SpringGraphQLContextFactory<MyGraphQLContext>() {
3131

3232
override suspend fun generateContext(request: ServerRequest): MyGraphQLContext = MyGraphQLContext(
33-
myCustomValue = request.headers().firstHeader("MyHeader") ?: "defaultContext",
34-
request = request
33+
request = request,
34+
myCustomValue = request.headers().firstHeader("MyHeader") ?: "defaultContext"
3535
)
3636
}
3737

38+
/**
39+
* [GraphQLContextFactory] that generates [MySubscriptionGraphQLContext] that will be available when processing subscription operations.
40+
*/
3841
@Component
3942
class MySubscriptionGraphQLContextFactory : SpringSubscriptionGraphQLContextFactory<MySubscriptionGraphQLContext>() {
4043

4144
override suspend fun generateContext(request: WebSocketSession): MySubscriptionGraphQLContext = MySubscriptionGraphQLContext(
4245
request = request,
43-
subscriptionValue = null
46+
auth = null
4447
)
4548
}

examples/server/spring-server/src/main/kotlin/com/expediagroup/graphql/examples/server/spring/execution/MySubscriptionHooks.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ class MySubscriptionHooks : ApolloSubscriptionHooks {
3232
graphQLContext: GraphQLContext?
3333
): GraphQLContext? {
3434
if (graphQLContext != null && graphQLContext is MySubscriptionGraphQLContext) {
35-
val bearer = connectionParams["Authorization"] ?: "none"
36-
graphQLContext.subscriptionValue = bearer
35+
graphQLContext.auth = connectionParams["Authorization"]
3736
}
3837
return graphQLContext
3938
}

examples/server/spring-server/src/main/kotlin/com/expediagroup/graphql/examples/server/spring/subscriptions/SimpleSubscription.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,5 @@ class SimpleSubscription : Subscription {
8282

8383
@GraphQLDescription("Returns a value from the subscription context")
8484
fun subscriptionContext(myGraphQLContext: MySubscriptionGraphQLContext): Flux<String> =
85-
Flux.just(myGraphQLContext.subscriptionValue ?: "", "value 2", "value3")
85+
Flux.just(myGraphQLContext.auth ?: "no-auth")
8686
}

servers/graphql-kotlin-spring-server/src/main/kotlin/com/expediagroup/graphql/server/spring/subscriptions/ApolloSubscriptionProtocolHandler.kt

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.expediagroup.graphql.server.spring.subscriptions
1818

19-
import com.expediagroup.graphql.generator.execution.GraphQLContext
2019
import com.expediagroup.graphql.server.spring.GraphQLConfigurationProperties
2120
import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT
2221
import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_TERMINATE
@@ -32,7 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
3231
import com.fasterxml.jackson.module.kotlin.convertValue
3332
import com.fasterxml.jackson.module.kotlin.readValue
3433
import kotlinx.coroutines.ExperimentalCoroutinesApi
35-
import kotlinx.coroutines.reactor.mono
34+
import kotlinx.coroutines.runBlocking
3635
import org.slf4j.LoggerFactory
3736
import org.springframework.web.reactive.socket.WebSocketSession
3837
import reactor.core.publisher.Flux
@@ -66,7 +65,7 @@ class ApolloSubscriptionProtocolHandler(
6665
return try {
6766
when (operationMessage.type) {
6867
GQL_CONNECTION_INIT.type -> onInit(operationMessage, session)
69-
GQL_START.type -> onStart(operationMessage, session)
68+
GQL_START.type -> startSubscription(operationMessage, session)
7069
GQL_STOP.type -> onStop(operationMessage, session)
7170
GQL_CONNECTION_TERMINATE.type -> onDisconnect(session)
7271
else -> onUnknownOperation(operationMessage, session)
@@ -104,9 +103,10 @@ class ApolloSubscriptionProtocolHandler(
104103
@Suppress("Detekt.TooGenericExceptionCaught")
105104
private fun startSubscription(
106105
operationMessage: SubscriptionOperationMessage,
107-
session: WebSocketSession,
108-
context: GraphQLContext?
106+
session: WebSocketSession
109107
): Flux<SubscriptionOperationMessage> {
108+
val context = sessionState.getContext(session)
109+
110110
subscriptionHooks.onOperation(operationMessage, session, context)
111111

112112
if (operationMessage.id == null) {
@@ -149,16 +149,18 @@ class ApolloSubscriptionProtocolHandler(
149149
}
150150

151151
private fun onInit(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Flux<SubscriptionOperationMessage> {
152-
val updateContext = saveContext(operationMessage, session)
152+
saveContext(operationMessage, session)
153153
val acknowledgeMessage = Mono.just(acknowledgeMessage)
154154
val keepAliveFlux = getKeepAliveFlux(session)
155-
return updateContext.then(acknowledgeMessage)
156-
.thenMany(keepAliveFlux)
155+
return acknowledgeMessage.concatWith(keepAliveFlux)
157156
.onErrorReturn(getConnectionErrorMessage(operationMessage))
158157
}
159158

160-
private fun saveContext(operationMessage: SubscriptionOperationMessage, session: WebSocketSession): Mono<Unit> {
161-
return mono {
159+
/**
160+
* Generate the context and save it for all future messages.
161+
*/
162+
private fun saveContext(operationMessage: SubscriptionOperationMessage, session: WebSocketSession) {
163+
runBlocking {
162164
val connectionParams = getConnectionParams(operationMessage.payload)
163165
val context = contextFactory.generateContext(session)
164166
val onConnect = subscriptionHooks.onConnect(connectionParams, session, context)
@@ -180,18 +182,6 @@ class ApolloSubscriptionProtocolHandler(
180182
return emptyMap()
181183
}
182184

183-
/**
184-
* Called when the client sends the start message.
185-
* It triggers the specific hooks first, runs the operation, and appends it with a complete message.
186-
*/
187-
private fun onStart(
188-
operationMessage: SubscriptionOperationMessage,
189-
session: WebSocketSession
190-
): Flux<SubscriptionOperationMessage> {
191-
val context = sessionState.getContext(session)
192-
return startSubscription(operationMessage, session, context)
193-
}
194-
195185
/**
196186
* Called with the publisher has completed on its own.
197187
*/

servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/ApolloSubscriptionProtocolHandlerTest.kt

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ class ApolloSubscriptionProtocolHandlerTest {
103103
val handler = ApolloSubscriptionProtocolHandler(config, nullContextFactory, subscriptionHandler, objectMapper, subscriptionHooks)
104104
val flux = handler.handle(simpleInitMessage.toJson(), session)
105105

106-
val message = flux.blockFirst(Duration.ofSeconds(2))
107-
assertNotNull(message)
108-
assertEquals(expected = GQL_CONNECTION_ACK.type, actual = message.type)
106+
StepVerifier.create(flux)
107+
.expectNextMatches { it.type == GQL_CONNECTION_ACK.type }
108+
.verifyComplete()
109109
}
110110

111111
@Test
@@ -127,6 +127,8 @@ class ApolloSubscriptionProtocolHandlerTest {
127127
StepVerifier.create(initFlux)
128128
.expectNextMatches { it.type == GQL_CONNECTION_ACK.type }
129129
.expectNextMatches { it.type == GQL_CONNECTION_KEEP_ALIVE.type }
130+
.thenCancel()
131+
.verify()
130132
}
131133

132134
@Test
@@ -447,9 +449,8 @@ class ApolloSubscriptionProtocolHandlerTest {
447449
}
448450
val handler = ApolloSubscriptionProtocolHandler(config, nullContextFactory, subscriptionHandler, objectMapper, subscriptionHooks)
449451
val flux = handler.handle(simpleInitMessage.toJson(), session)
450-
val disposable = flux.subscribe()
452+
flux.subscribe().dispose()
451453
verify(exactly = 1) { subscriptionHooks.onConnect(any(), any(), any()) }
452-
disposable.dispose()
453454
}
454455

455456
@Test
@@ -470,9 +471,8 @@ class ApolloSubscriptionProtocolHandlerTest {
470471
}
471472
val handler = ApolloSubscriptionProtocolHandler(config, nullContextFactory, subscriptionHandler, objectMapper, subscriptionHooks)
472473
val flux = handler.handle(operationMessage, session)
473-
val disposable = flux.subscribe()
474+
flux.subscribe().dispose()
474475
verify(exactly = 1) { subscriptionHooks.onConnect(payload, session, any()) }
475-
disposable.dispose()
476476
}
477477

478478
@Test
@@ -568,9 +568,8 @@ class ApolloSubscriptionProtocolHandlerTest {
568568
}
569569
val handler = ApolloSubscriptionProtocolHandler(config, nullContextFactory, subscriptionHandler, objectMapper, subscriptionHooks)
570570
val flux = handler.handle(operationMessage, session)
571-
val disposable = flux.subscribe()
571+
flux.subscribe().dispose()
572572
verify(exactly = 1) { subscriptionHooks.onOperationComplete(session) }
573-
disposable.dispose()
574573
}
575574

576575
@Test

servers/graphql-kotlin-spring-server/src/test/kotlin/com/expediagroup/graphql/server/spring/subscriptions/SubscriptionWebSocketHandlerIT.kt

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -131,34 +131,23 @@ class SubscriptionWebSocketHandlerIT(
131131
startMessage: String,
132132
dataOutput: TestPublisher<String>
133133
): Mono<Void> {
134-
return initConnection(session).thenMany(
135-
session.send(session.textMessage(startMessage).toMono())
136-
.thenMany(
137-
session.receive()
138-
.map { objectMapper.readValue<SubscriptionOperationMessage>(it.payloadAsText) }
139-
.doOnNext {
140-
if (it.type == ServerMessages.GQL_DATA.type) {
141-
val data = objectMapper.writeValueAsString(it.payload)
142-
dataOutput.next(data)
143-
} else if (it.type == ServerMessages.GQL_COMPLETE.type) {
144-
dataOutput.complete()
145-
}
146-
}
147-
)
148-
).then()
149-
}
134+
val firstMessage = session.textMessage(basicInitMessage).toMono()
135+
.concatWith(session.textMessage(startMessage).toMono())
150136

151-
private fun initConnection(session: WebSocketSession): Mono<Void> {
152-
return session.send(session.textMessage(basicInitMessage).toMono())
137+
return session.send(firstMessage)
153138
.thenMany(
154139
session.receive()
155140
.map { objectMapper.readValue<SubscriptionOperationMessage>(it.payloadAsText) }
156141
.doOnNext {
157-
if (it.type != ServerMessages.GQL_CONNECTION_ERROR.type) {
158-
throw Exception("Error connecting to the server")
142+
if (it.type == ServerMessages.GQL_DATA.type) {
143+
val data = objectMapper.writeValueAsString(it.payload)
144+
dataOutput.next(data)
145+
} else if (it.type == ServerMessages.GQL_COMPLETE.type) {
146+
dataOutput.complete()
159147
}
160148
}
161-
).then()
149+
)
150+
.then()
162151
}
163152

164153
@Configuration

0 commit comments

Comments
 (0)