Skip to content

Commit

Permalink
feat(batching): v7 check if execution was exhausted when there are er…
Browse files Browse the repository at this point in the history
…rors (#2010)

### 📝 Description
#2009

---------

Co-authored-by: Samuel Vazquez <samvazquez@expediagroup.com>
  • Loading branch information
samuelAndalon and Samuel Vazquez authored Jul 11, 2024
1 parent 3cf86d9 commit b3795bb
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentation : AbstractSyncExecutionExh
parameters: SyncExecutionExhaustedInstrumentationParameters
): OnSyncExecutionExhaustedCallback = { _: List<ExecutionId> ->
parameters
.executionContext.executionInput
.executionInput
.dataLoaderRegistry
.dispatchAll()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
): InstrumentationContext<ExecutionResult>? =
parameters.graphQLContext
?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
?.beginExecution(parameters)
?.beginExecution(
parameters,
this.getOnSyncExecutionExhaustedCallback(
SyncExecutionExhaustedInstrumentationParameters(parameters.executionInput)
)
)

override fun beginExecutionStrategy(
parameters: InstrumentationExecutionStrategyParameters,
Expand All @@ -78,7 +83,7 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
?.beginFieldFetch(
parameters,
this.getOnSyncExecutionExhaustedCallback(
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext)
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext.executionInput)
)
)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2024 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,11 @@
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execution

import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.DataLoaderSyncExecutionExhaustedInstrumentation
import graphql.execution.ExecutionContext
import graphql.ExecutionInput

/**
* Hold information that will be provided to an instance of [DataLoaderSyncExecutionExhaustedInstrumentation]
*/
data class SyncExecutionExhaustedInstrumentationParameters(
val executionContext: ExecutionContext
val executionInput: ExecutionInput
)
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,30 @@ class SyncExecutionExhaustedState(
private val totalExecutions: AtomicReference<Int> = AtomicReference(totalOperations)
val executions = ConcurrentHashMap<ExecutionId, ExecutionBatchState>()

/**
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting an execution,
* for example:
* - parsing, validation errors
* - persisted query errors
* - an exception during execution was thrown
*/
private fun removeExecution(executionId: ExecutionId) {
if (executions.containsKey(executionId)) {
executions.remove(executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
}

/**
* Create the [ExecutionBatchState] When a specific [ExecutionInput] starts his execution
*
* @param parameters contains information of which [ExecutionInput] will start his execution
* @return a non null [InstrumentationContext] object
*/
fun beginExecution(
parameters: InstrumentationExecutionParameters
parameters: InstrumentationExecutionParameters,
onSyncExecutionExhausted: OnSyncExecutionExhaustedCallback
): InstrumentationContext<ExecutionResult> {
executions.computeIfAbsent(parameters.executionInput.executionId) {
ExecutionBatchState()
}
return object : SimpleInstrumentationContext<ExecutionResult>() {
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
if ((result != null && result.errors.size > 0) || t != null) {
removeExecution(parameters.executionInput.executionId)
if (executions.containsKey(parameters.executionInput.executionId)) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Expedia, Inc
* Copyright 2024 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,18 +94,16 @@ object AstronautGraphQL {

private val astronautService = AstronautService()
private val astronautDataFetcher = DataFetcher { environment ->
val astronautId = environment.getArgument<String>("id")?.toInt() ?: throw IllegalArgumentException("Astronaut ID is null")
astronautService.getAstronaut(
AstronautServiceRequest(
environment.getArgument<String>("id").toInt()
),
AstronautServiceRequest(astronautId),
environment
)
}
private val createAstronautDataFetcher = DataFetcher { environment ->
val astronautName = environment.getArgument<String>("name") ?: throw IllegalArgumentException("Astronaut name is null")
astronautService.createAstronaut(
CreateAstronautServiceRequest(
environment.getArgument("name")
)
CreateAstronautServiceRequest(astronautName)
)
}
private val astronautsDataFetcher = DataFetcher { environment ->
Expand All @@ -119,10 +117,9 @@ object AstronautGraphQL {

private val missionService = MissionService()
private val missionDataFetcher = DataFetcher { environment ->
val missionId = environment.getArgument<String>("id")?.toInt() ?: throw IllegalArgumentException("Mission ID is null")
missionService.getMission(
MissionServiceRequest(
environment.getArgument<String>("id").toInt()
),
MissionServiceRequest(missionId),
environment
)
}
Expand All @@ -135,26 +132,26 @@ object AstronautGraphQL {
)
}
private val missionsByAstronautDataFetcher = DataFetcher { environment ->
val astronaut = environment.getSource<Astronaut>()
val astronautId = environment.getSource<Astronaut>()?.id ?: throw IllegalArgumentException("Astronaut ID is null")
missionService
.getMissionsByAstronaut(
MissionServiceRequest(0, astronaut.id),
MissionServiceRequest(0, astronautId),
environment
)
}

private val planetService = PlanetService()
private val planetsByMissionDataFetcher = DataFetcher { environment ->
val mission = environment.getSource<Mission>()
val missionId = environment.getSource<Mission>()?.id ?: throw IllegalArgumentException("Mission ID is null")
planetService.getPlanets(
PlanetServiceRequest(0, mission.id),
PlanetServiceRequest(0, missionId),
environment
)
}
private val planetsByAstronautDataFetcher = DataFetcher { environment ->
val astronaut = environment.getSource<Astronaut>()
val astronautId = environment.getSource<Astronaut>()?.id ?: throw IllegalArgumentException("Astronaut ID is null")
astronautService.getPlanets(
AstronautServiceRequest(astronaut.id),
AstronautServiceRequest(astronautId),
environment
)
}
Expand Down Expand Up @@ -197,11 +194,22 @@ object AstronautGraphQL {
)
)

fun execute(
fun executeOperations(
graphQL: GraphQL,
queries: List<String>,
dataLoaderInstrumentationStrategy: DataLoaderInstrumentationStrategy
): Pair<List<ExecutionResult>, KotlinDataLoaderRegistry> {
): Pair<List<Result<ExecutionResult>>, KotlinDataLoaderRegistry> =
execute(
graphQL,
queries.map { query -> ExecutionInput.newExecutionInput(query).build() },
dataLoaderInstrumentationStrategy
)

fun execute(
graphQL: GraphQL,
executionInputs: List<ExecutionInput>,
dataLoaderInstrumentationStrategy: DataLoaderInstrumentationStrategy
): Pair<List<Result<ExecutionResult>>, KotlinDataLoaderRegistry> {
val kotlinDataLoaderRegistry = spyk(
KotlinDataLoaderRegistryFactory(
AstronautDataLoader(),
Expand All @@ -214,30 +222,36 @@ object AstronautGraphQL {
when (dataLoaderInstrumentationStrategy) {
DataLoaderInstrumentationStrategy.SYNC_EXHAUSTION ->
SyncExecutionExhaustedState::class to SyncExecutionExhaustedState(
queries.size,
executionInputs.size,
kotlinDataLoaderRegistry
)
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED ->
ExecutionLevelDispatchedState::class to ExecutionLevelDispatchedState(
queries.size
executionInputs.size
)
}
)

val results = runBlocking {
queries.map { query ->
executionInputs.map { executionInput ->
async {
graphQL.executeAsync(
ExecutionInput
.newExecutionInput(query)
.dataLoaderRegistry(kotlinDataLoaderRegistry)
.graphQLContext(graphQLContext)
.build()
).await()
try {
Result.success(
graphQL.executeAsync(
executionInput.transform { builder ->
builder
.dataLoaderRegistry(kotlinDataLoaderRegistry)
.graphQLContext(graphQLContext)
.build()
}
).await()
)
} catch (e: Exception) {
Result.failure(e)
}
}
}.awaitAll()
}

return Pair(results, kotlinDataLoaderRegistry)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down Expand Up @@ -77,7 +77,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ nasa { mission(id: 4) { id designation } } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down Expand Up @@ -112,7 +112,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down Expand Up @@ -147,7 +147,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"""mutation { createAstronaut(name: "spaceMan") { id name } }"""
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand All @@ -168,7 +168,7 @@ class DataLoaderLevelDispatchedInstrumentationTest {
"{ mission(id: 4) { designation } }"
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.executeOperations(
graphQL,
queries,
DataLoaderInstrumentationStrategy.LEVEL_DISPATCHED
Expand Down
Loading

0 comments on commit b3795bb

Please sign in to comment.