Skip to content

Commit

Permalink
KTOR-5199 re-run multi perform loop to schedule requests
Browse files Browse the repository at this point in the history
  • Loading branch information
dtretyakov committed Jan 30, 2024
1 parent 8f82e3d commit 7b95cc5
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ internal class RequestContainer(
val completionHandler: CompletableDeferred<CurlSuccess>
)

/**
* A class responsible for processing requests asynchronously.
*
* It holds a dispatcher interacting with curl multi interface API,
* which requires API calls from single thread.
*/
internal class CurlProcessor(coroutineContext: CoroutineContext) {
@OptIn(InternalAPI::class)
private val curlDispatcher: CloseableCoroutineDispatcher =
Expand All @@ -28,6 +34,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {

private val curlScope = CoroutineScope(coroutineContext + curlDispatcher)
private val requestQueue: Channel<RequestContainer> = Channel(Channel.UNLIMITED)
private val requestCounter = atomic(0L)
private val curlProtocols by lazy { getCurlProtocols() }

init {
Expand All @@ -48,8 +55,9 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
}

val result = CompletableDeferred<CurlSuccess>()
requestQueue.send(RequestContainer(request, result))
curlApi!!.wakeup()
nextRequest {
requestQueue.send(RequestContainer(request, result))
}
return result.await()
}

Expand All @@ -59,7 +67,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
val api = curlApi!!
while (!requestQueue.isClosedForReceive) {
drainRequestQueue(api)
api.perform()
api.perform(requestCounter)
}
}
}
Expand Down Expand Up @@ -91,6 +99,8 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
if (!closed.compareAndSet(false, true)) return

requestQueue.close()
nextRequest()

GlobalScope.launch(curlDispatcher) {
curlScope.coroutineContext[Job]!!.join()
curlApi!!.close()
Expand All @@ -105,4 +115,10 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
curlApi!!.cancelRequest(easyHandle, cause)
}
}

private inline fun nextRequest(body: (Long) -> Unit = {}) = try {
body(requestCounter.incrementAndGet())
} finally {
curlApi!!.wakeup()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.ktor.client.plugins.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.locks.*
import kotlinx.atomicfu.*
import kotlinx.cinterop.*
import kotlinx.coroutines.*
import libcurl.*
Expand All @@ -25,6 +26,11 @@ private class RequestHolder @OptIn(ExperimentalForeignApi::class) constructor(
}
}

/**
* Handles requests using libcurl with multi interface.
*
* @see <a href="https://curl.se/libcurl/c/libcurl-multi.html">Multi interface overview</a>
*/
@OptIn(InternalAPI::class)
internal class CurlMultiApiHandler : Closeable {
@OptIn(ExperimentalForeignApi::class)
Expand Down Expand Up @@ -132,11 +138,12 @@ internal class CurlMultiApiHandler : Closeable {
}

@OptIn(ExperimentalForeignApi::class)
internal fun perform() {
internal fun perform(counter: AtomicLong) {
if (activeHandles.isEmpty()) return

memScoped {
val transfersRunning = alloc<IntVar>()
val requestId = counter.value
do {
synchronized(easyHandlesToUnpauseLock) {
var handle = easyHandlesToUnpause.removeFirstOrNull()
Expand All @@ -152,7 +159,7 @@ internal class CurlMultiApiHandler : Closeable {
if (transfersRunning.value < activeHandles.size) {
handleCompleted()
}
} while (transfersRunning.value != 0)
} while (transfersRunning.value != 0 && requestId == counter.value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ package io.ktor.client.engine.curl.test
import io.ktor.client.*
import io.ktor.client.engine.curl.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.serialization.json.*
import kotlin.test.*

class CurlWebSocketTests {

private val TEST_SERVER: String = "http://127.0.0.1:8080"
private val TEST_WEBSOCKET_SERVER: String = "ws://127.0.0.1:8080"

@Test
Expand Down Expand Up @@ -75,4 +77,27 @@ class CurlWebSocketTests {
}
}
}

@Test
fun testParallelSessions() {
val client = HttpClient(Curl) {
install(WebSockets)
}

runBlocking {
val websocketInitialized = CompletableDeferred<Boolean>()

launch {
client.webSocket("$TEST_WEBSOCKET_SERVER/websockets/echo") {
websocketInitialized.complete(true)
delay(20)
}
}

websocketInitialized.await()

val response = client.get(TEST_SERVER)
assertEquals(200, response.status.value)
}
}
}

0 comments on commit 7b95cc5

Please sign in to comment.