Skip to content

Commit

Permalink
KTOR-7682 Fix TestApplication.stop() doesn't stop the application any…
Browse files Browse the repository at this point in the history
…more (#4534)

* Fix docs deferred -> completable job
* Return the same instance of TestApplication as used in client
* Remove builder from TestApplication constructor
  • Loading branch information
osipxd authored Dec 10, 2024
1 parent 80645d6 commit d90ef06
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.ktor.utils.io.*
import kotlinx.coroutines.*

/**
* Stop server on job cancellation. The returned deferred need to be completed or cancelled.
* Stop server on job cancellation. The returned [CompletableJob] needs to be completed or cancelled.
*/
@OptIn(InternalAPI::class)
public fun ApplicationEngine.stopServerOnCancellation(
Expand All @@ -23,27 +23,27 @@ public fun ApplicationEngine.stopServerOnCancellation(
} ?: Job()

/**
* Launch a coroutine with [block] body when the parent job is cancelled or a returned deferred is cancelled.
* It is important to complete or cancel returned deferred
* Launch a coroutine with [block] body when either the parent job or the returned job is cancelled.
* It is important to complete or cancel the returned [CompletableJob]
* otherwise the parent job will be unable to complete successfully.
*/
@OptIn(DelicateCoroutinesApi::class)
@InternalAPI
public fun Job.launchOnCancellation(block: suspend () -> Unit): CompletableJob {
val deferred: CompletableJob = Job(parent = this)
val completableJob: CompletableJob = Job(parent = this)

GlobalScope.launch(this + Dispatchers.IOBridge) {
GlobalScope.launch(this + Dispatchers.IOBridge + CoroutineName("cancellation-watcher")) {
var cancelled = false
try {
deferred.join()
completableJob.join()
} catch (_: Throwable) {
cancelled = true
}

if (cancelled || deferred.isCancelled) {
if (cancelled || completableJob.isCancelled) {
block()
}
}

return deferred
return completableJob
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package io.ktor.server.jetty.jakarta
import io.ktor.events.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import kotlinx.coroutines.*
import org.eclipse.jetty.server.*
import kotlin.time.*
import kotlinx.coroutines.CompletableJob
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

/**
Expand Down Expand Up @@ -42,7 +43,7 @@ public open class JettyApplicationEngineBase(
public var idleTimeout: Duration = 30.seconds
}

private var cancellationDeferred: CompletableJob? = null
private var cancellationJob: CompletableJob? = null

/**
* Jetty server instance being configuring and starting
Expand All @@ -54,7 +55,7 @@ public open class JettyApplicationEngineBase(

override fun start(wait: Boolean): JettyApplicationEngineBase {
server.start()
cancellationDeferred = stopServerOnCancellation(
cancellationJob = stopServerOnCancellation(
applicationProvider(),
configuration.shutdownGracePeriod,
configuration.shutdownTimeout
Expand All @@ -74,7 +75,7 @@ public open class JettyApplicationEngineBase(
}

override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
cancellationDeferred?.complete()
cancellationJob?.complete()
monitor.raise(ApplicationStopPreparing, environment)
server.stopTimeout = timeoutMillis
server.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package io.ktor.server.jetty
import io.ktor.events.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import kotlinx.coroutines.*
import org.eclipse.jetty.server.*
import kotlin.time.*
import kotlinx.coroutines.CompletableJob
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

/**
Expand Down Expand Up @@ -42,7 +43,7 @@ public open class JettyApplicationEngineBase(
public var idleTimeout: Duration = 30.seconds
}

private var cancellationDeferred: CompletableJob? = null
private var cancellationJob: CompletableJob? = null

/**
* Jetty server instance being configuring and starting
Expand All @@ -54,7 +55,7 @@ public open class JettyApplicationEngineBase(

override fun start(wait: Boolean): JettyApplicationEngineBase {
server.start()
cancellationDeferred = stopServerOnCancellation(
cancellationJob = stopServerOnCancellation(
applicationProvider(),
configuration.shutdownGracePeriod,
configuration.shutdownTimeout
Expand All @@ -74,7 +75,7 @@ public open class JettyApplicationEngineBase(
}

override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
cancellationDeferred?.complete()
cancellationJob?.complete()
monitor.raise(ApplicationStopPreparing, environment)
server.stopTimeout = timeoutMillis
server.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@ import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.util.network.*
import io.ktor.util.pipeline.*
import io.netty.bootstrap.*
import io.netty.channel.*
import io.netty.channel.epoll.*
import io.netty.channel.kqueue.*
import io.netty.channel.socket.*
import io.netty.channel.socket.nio.*
import io.netty.handler.codec.http.*
import kotlinx.coroutines.*
import java.net.*
import java.util.concurrent.*
import kotlin.reflect.*
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.Channel
import io.netty.channel.ChannelOption
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.epoll.Epoll
import io.netty.channel.epoll.EpollServerSocketChannel
import io.netty.channel.kqueue.KQueue
import io.netty.channel.kqueue.KQueueServerSocketChannel
import io.netty.channel.socket.ServerSocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.http.HttpObjectDecoder
import io.netty.handler.codec.http.HttpServerCodec
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import java.net.BindException
import java.util.concurrent.TimeUnit
import kotlin.reflect.KClass

private val AFTER_CALL_PHASE = PipelinePhase("After")

Expand Down Expand Up @@ -157,7 +165,7 @@ public class NettyApplicationEngine(
workerEventGroup.asCoroutineDispatcher()
}

private var cancellationDeferred: CompletableJob? = null
private var cancellationJob: CompletableJob? = null

private var channels: List<Channel>? = null
internal val bootstraps: List<ServerBootstrap> by lazy {
Expand Down Expand Up @@ -224,7 +232,7 @@ public class NettyApplicationEngine(

monitor.raiseCatching(ServerReady, environment, environment.log)

cancellationDeferred = stopServerOnCancellation(
cancellationJob = stopServerOnCancellation(
applicationProvider(),
configuration.shutdownGracePeriod,
configuration.shutdownTimeout
Expand All @@ -243,7 +251,7 @@ public class NettyApplicationEngine(
}

override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
cancellationDeferred?.complete()
cancellationJob?.complete()
monitor.raise(ApplicationStopPreparing, environment)
val channelFutures = channels?.mapNotNull { if (it.isOpen) it.close() else null }.orEmpty()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import io.ktor.server.testing.client.*
import io.ktor.test.dispatcher.*
import io.ktor.util.pipeline.*
import io.ktor.utils.io.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.coroutines.*
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.Job
import kotlinx.coroutines.test.TestResult
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* A client attached to [TestApplication].
Expand Down Expand Up @@ -54,17 +56,19 @@ public interface ClientProvider {
* @see [testApplication]
*/
public class TestApplication internal constructor(
private val builder: ApplicationTestBuilder
) : ClientProvider by builder {
createServer: () -> EmbeddedServer<TestApplicationEngine, TestApplicationEngine.Configuration>,
clientProvider: ClientProvider,
private val externalServices: ExternalServicesBuilder,
) : ClientProvider by clientProvider {

internal enum class State {
Created, Starting, Started, Stopped
}

private val state = atomic(State.Created)

internal val externalApplications by lazy { builder.externalServices.externalApplications }
internal val server by lazy { builder.embeddedServer }
internal val externalApplications by lazy { externalServices.externalApplications }
internal val server by lazy { createServer() }
private val applicationStarting by lazy { Job(server.engine.coroutineContext[Job]) }

/**
Expand All @@ -73,8 +77,8 @@ public class TestApplication internal constructor(
public suspend fun start() {
if (state.compareAndSet(State.Created, State.Starting)) {
try {
builder.embeddedServer.start()
builder.externalServices.externalApplications.values.forEach { it.start() }
server.start()
externalServices.externalApplications.values.forEach { it.start() }
} finally {
state.value = State.Started
applicationStarting.complete()
Expand All @@ -90,8 +94,8 @@ public class TestApplication internal constructor(
*/
public fun stop() {
state.value = State.Stopped
builder.embeddedServer.stop()
builder.externalServices.externalApplications.values.forEach { it.stop() }
server.stop()
externalServices.externalApplications.values.forEach { it.stop() }
client.close()
}
}
Expand All @@ -105,10 +109,7 @@ public class TestApplication internal constructor(
public fun TestApplication(
block: TestApplicationBuilder.() -> Unit
): TestApplication {
val builder = ApplicationTestBuilder()
val testApplication = TestApplication(builder)
builder.block()
return testApplication
return ApplicationTestBuilder().apply(block).application
}

/**
Expand Down Expand Up @@ -274,7 +275,13 @@ public class ApplicationTestBuilder : TestApplicationBuilder(), ClientProvider {

override val client: HttpClient by lazy { createClient { } }

internal val application: TestApplication by lazy { TestApplication(this) }
internal val application: TestApplication by lazy {
TestApplication(
createServer = { embeddedServer },
clientProvider = this,
externalServices = externalServices,
)
}

/**
* Starts instance of [TestApplication].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import io.ktor.util.*
import io.ktor.util.pipeline.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.atomicfu.*
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.coroutines.CoroutineContext

@OptIn(InternalAPI::class)
@PublicAPICandidate("2.2.0")
Expand All @@ -39,7 +40,7 @@ public class TestApplicationEngine(
) : BaseApplicationEngine(environment, monitor, developmentMode, EnginePipeline(developmentMode)), CoroutineScope {

private val testEngineJob = Job(applicationProvider().parentCoroutineContext[Job])
private var cancellationDeferred: CompletableJob? = null
private var cancellationJob: CompletableJob? = null

override val coroutineContext: CoroutineContext =
applicationProvider().parentCoroutineContext + testEngineJob + configuration.dispatcher
Expand Down Expand Up @@ -144,7 +145,7 @@ public class TestApplicationEngine(

override fun start(wait: Boolean): ApplicationEngine {
check(testEngineJob.isActive) { "Test engine is already completed" }
cancellationDeferred = stopServerOnCancellation(
cancellationJob = stopServerOnCancellation(
applicationProvider(),
configuration.shutdownGracePeriod,
configuration.shutdownTimeout
Expand All @@ -157,7 +158,7 @@ public class TestApplicationEngine(

override fun stop(gracePeriodMillis: Long, timeoutMillis: Long) {
try {
cancellationDeferred?.complete()
cancellationJob?.complete()
client.close()
engine.close()
monitor.raise(ApplicationStopPreparing, environment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import kotlin.coroutines.*
import kotlinx.coroutines.test.runTest
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

class TestApplicationTest {

Expand Down Expand Up @@ -87,6 +89,19 @@ class TestApplicationTest {
assertEquals("value_1", response.headers["response_header"])
}

@Test // KTOR-7682
fun testApplicationBlockStartBeforeClientCall() = runTest(timeout = 5.seconds) {
val application = TestApplication {
serverConfig {
parentCoroutineContext = coroutineContext
}
}

application.start()
application.client.get("/")
application.stop()
}

@Test
fun testBridge() = testApplication {
install(TestPlugin)
Expand Down
Loading

0 comments on commit d90ef06

Please sign in to comment.