-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KTOR-6030 Migrate Ktor to kotlinx-io #4032
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A glorious amount of deleted code!
Seems that compilation is broken for common code in the CI. I also can't seem to build on my machine, I might need some instruction if there is any config changes or missing gradle configs.
@@ -138,6 +138,7 @@ internal actual class ConnectionPipeline actual constructor( | |||
if (shouldClose) break | |||
} | |||
} finally { | |||
@Suppress("DEPRECATION") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these close deprecation warnings something we should follow up on? Maybe create a task in YT? The recommendation seems to be using either flushAndClose or cancel.
ktor-client/ktor-client-cio/jvm/src/io/ktor/client/engine/cio/ConnectionPipeline.kt
Outdated
Show resolved
Hide resolved
...or-client-plugins/ktor-client-logging/jvm/test/io/ktor/client/plugins/logging/LoggingTest.kt
Outdated
Show resolved
Hide resolved
@@ -214,7 +216,7 @@ class HttpTimeoutTest : ClientLoader() { | |||
parameter("delay", 500) | |||
}.body<ByteReadChannel>() | |||
|
|||
assertFailsWith<HttpRequestTimeoutException> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should consider wrapping this once again with the HttpRequestTimeoutException
... Since we don't have checked exceptions, this could cause some tough-to-diagnose issues for consumers that rely on the current timeout exceptions.
ktor-client/ktor-client-tests/jvm/src/io/ktor/client/tests/utils/ClientLoaderJvm.kt
Show resolved
Hide resolved
@Suppress("UnusedReceiverParameter", "UNUSED_PARAMETER") | ||
public fun ByteChannel.attachJob(job: Job) { | ||
job.invokeOnCompletion { | ||
if (it != null) { | ||
cancel(it) | ||
} | ||
} | ||
} | ||
|
||
public fun ByteChannel.attachJob(job: ChannelJob) { | ||
attachJob(job.job) | ||
} | ||
|
||
public fun ByteChannel(@Suppress("UNUSED_PARAMETER") block: (Throwable?) -> Throwable?): ByteChannel = ByteChannel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like these functions need some documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, will add
|
||
internal class SourceByteReadChannel(private val source: Source) : ByteReadChannel { | ||
|
||
val created = Exception() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another created exception property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, will fix
this[index + 1] = (value shr 16).toByte() | ||
this[index + 2] = (value shr 8).toByte() | ||
this[index + 3] = value.toByte() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be cool if kotlinx.io
provided these kinds of random-access extensions for ByteArray
too since they're all implemented for Buffer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I see, it's used only in WebSocket implementation and can be rather easily replaced with operations on kotlinx-io primitives.
IMO, there is no need to have such functions exposed nor in ktor
, nor in kotlinx-io
, as more high level primitives should be used.
Additionally, there will be something like this for unsafe
access described in Kotlin/kotlinx-io#135 (comment)
} | ||
|
||
/** | ||
* Resume waiter. | ||
*/ | ||
fun resume() { | ||
suspension.getAndSet(null)?.complete() | ||
val continuation = suspension.getAndUpdate { | ||
if (it == CLOSED) CLOSED else null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be it as? ClosedSlot
so that we include the cause / new instance from line 44 val closeContinuation = if (cause != null) ClosedSlot(cause) else CLOSED
.
Hey @bjhham, thank you for the review. Indeed looks like compilation is broken since rebase. I'll check it and address the comments! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a big step forward for the ecosystem!
Mostly skimmed though public declarations
/* | ||
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package io.ktor.utils.io.core | ||
|
||
public expect interface Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be replaced with AutoCloseable
(with a deprecated typealias
, same for use
function) as it's stable in Kotlin 2.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, will replace this with 2.0.0 bump
} | ||
@Deprecated( | ||
"Use transferTo instead", | ||
ReplaceWith("output.transferTo(this)", "kotlinx.io.transferTo"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with is not the same as implementation transferFrom
vs transferTo
this[index + 1] = (value shr 16).toByte() | ||
this[index + 2] = (value shr 8).toByte() | ||
this[index + 3] = value.toByte() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as far as I see, it's used only in WebSocket implementation and can be rather easily replaced with operations on kotlinx-io primitives.
IMO, there is no need to have such functions exposed nor in ktor
, nor in kotlinx-io
, as more high level primitives should be used.
Additionally, there will be something like this for unsafe
access described in Kotlin/kotlinx-io#135 (comment)
|
||
package io.ktor.utils.io.errors | ||
|
||
public typealias IOException = kotlinx.io.IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be those should be deprecated, so users will migrate to kotlinx.io.*
types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, good catch! Will add
private const val DEFAULT_POOL_ARRAY_SIZE = 4096 | ||
private const val DEFAULT_POOL_CAPACITY = 128 | ||
|
||
public val ByteArrayPool: ObjectPool<ByteArray> = object : DefaultPool<ByteArray>(DEFAULT_POOL_CAPACITY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be deprecated and replaced with kotlinx-io
APIs (I mean, creating Buffer
and using it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it can be. Let me check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needs a huge rewrite in the other ktor modules. I prefer to keep it in the separate PR for stability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to create YT issue and link it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add
Looks like
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like she's good to go, pending the kotlinx-io release.
/** | ||
* Sequential (non-concurrent) byte channel implementation | ||
*/ | ||
public class ByteChannel(public val autoFlush: Boolean = false) : ByteReadChannel, BufferedByteWriteChannel { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can drop this argument here.
public class ByteChannel(public val autoFlush: Boolean = false) : ByteReadChannel, BufferedByteWriteChannel { | ||
private val _closedCause = atomic<CloseToken?>(null) | ||
private val slot = AwaitingSlot() | ||
private val flushBuffer: Buffer = Buffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intent with this class to eventually drop the three buffers and flush orchestration in favour of just using a single buffer with some synchronization? Seems like we could just have a single bounded buffer that blocks writes until there's space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that not all operations writing this buffer are suspend, so you need actually block the thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway let's discuss this idea
Great work. Looking forward to this integration! |
bb406bf
to
93b6a55
Compare
val buffer = Buffer() | ||
try { | ||
DefaultDatagramChunkBufferPool.useInstance { buffer -> | ||
element.packet.copy().readAvailable(buffer) | ||
|
||
val bytes = element.packet.copy().readBytes() | ||
val bytesWritten = sento(element, bytes) | ||
|
||
result = when (bytesWritten) { | ||
0 -> throw IOException("Failed writing to closed socket") | ||
-1 -> { | ||
if (errno == EAGAIN) { | ||
false | ||
} else { | ||
throw PosixException.forErrno() | ||
} | ||
element.packet.copy().readAvailable(buffer) | ||
|
||
val bytes = element.packet.copy().readBytes() | ||
val bytesWritten = sento(element, bytes) | ||
|
||
result = when (bytesWritten) { | ||
0 -> throw kotlinx.io.IOException("Failed writing to closed socket") | ||
-1 -> { | ||
if (errno == EAGAIN) { | ||
false | ||
} else { | ||
throw PosixException.forErrno() | ||
} | ||
else -> true | ||
} | ||
|
||
else -> true | ||
} | ||
} finally { | ||
buffer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While trying to fix another issue I noticed that buffer
may not actually be used. The packet is copied into buffer, but then never read. I guess the buffer can just be removed, right? Or use the buffer properly instead of readBytes
.
Fix KTOR-6030 Migrate to new kotlinx.io library