From ef7c54c830d19daeaa2a3e828617d5f42ff4fa4f Mon Sep 17 00:00:00 2001 From: Bruce Hamilton Date: Mon, 9 Dec 2024 14:02:54 +0100 Subject: [PATCH] KTOR-7941 Fix performance on readUtfLineTo --- .../utils/io/ByteReadChannelOperations.kt | 81 +++++++------------ .../src/io/ktor/utils/io/core/Buffers.kt | 2 +- .../test/ByteReadChannelOperationsJvmTest.kt | 24 ++++++ 3 files changed, 56 insertions(+), 51 deletions(-) diff --git a/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt b/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt index eee738d5b1b..b446655828f 100644 --- a/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt +++ b/ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt @@ -14,7 +14,6 @@ import kotlinx.io.Buffer import kotlinx.io.bytestring.* import kotlinx.io.unsafe.* import kotlin.coroutines.* -import kotlin.jvm.* import kotlin.math.* @OptIn(InternalAPI::class) @@ -357,6 +356,9 @@ public suspend fun ByteReadChannel.discard(max: Long = Long.MAX_VALUE): Long { return max - remaining } +private const val CR: Byte = '\r'.code.toByte() +private const val LF: Byte = '\n'.code.toByte() + /** * Reads a line of UTF-8 characters to the specified [out] buffer. * It recognizes CR, LF and CRLF as a line delimiter. @@ -372,62 +374,41 @@ public suspend fun ByteReadChannel.readUTF8LineTo(out: Appendable, max: Int = In if (readBuffer.exhausted()) awaitContent() if (isClosedForRead) return false - var consumed = 0 - while (!isClosedForRead) { - awaitContent() - - val cr = readBuffer.indexOf('\r'.code.toByte()) - val lf = readBuffer.indexOf('\n'.code.toByte()) - - // No new line separator - if (cr == -1L && lf == -1L) { - if (max == Int.MAX_VALUE) { - val value = readBuffer.readString() - out.append(value) - } else { - val count = minOf(max - consumed, readBuffer.remaining.toInt()) - consumed += count - out.append(readBuffer.readString(count.toLong())) - - if (consumed == max) throw TooLongLineException("Line exceeds limit of $max characters") + Buffer().use { lineBuffer -> + while (!isClosedForRead) { + while (!readBuffer.exhausted()) { + when (val b = readBuffer.readByte()) { + CR -> { + // Check if LF follows CR after awaiting + if (readBuffer.exhausted()) awaitContent() + if (readBuffer.buffer[0] == LF) { + readBuffer.discard(1) + } + out.append(lineBuffer.readString()) + return true + } + + LF -> { + out.append(lineBuffer.readString()) + return true + } + + else -> lineBuffer.writeByte(b) + } } - - continue - } - - // CRLF fully in buffer - if (cr >= 0 && lf == cr + 1) { - val count = if (max != Int.MAX_VALUE) cr else minOf(max - consumed, cr.toInt()).toLong() - out.append(readBuffer.readString(count)) - if (count == cr) readBuffer.discard(2) - return true - } - - // CR in buffer before LF - if (cr >= 0 && (lf == -1L || cr < lf)) { - val count = if (max != Int.MAX_VALUE) cr else minOf(max - consumed, cr.toInt()).toLong() - out.append(readBuffer.readString(count)) - if (count == cr) readBuffer.discard(1) - - // Check if LF follows CR after awaiting - if (readBuffer.exhausted()) awaitContent() - if (readBuffer.buffer[0] == '\n'.code.toByte()) { - readBuffer.discard(1) + if (lineBuffer.size >= max) { + throw TooLongLineException("Line exceeds limit of $max characters") } - return true + awaitContent() } - // LF in buffer before CR - if (lf >= 0) { - val count = if (max != Int.MAX_VALUE) lf else minOf(max - consumed, lf.toInt()).toLong() - out.append(readBuffer.readString(count)) - if (count == lf) readBuffer.discard(1) - return true + return (lineBuffer.size > 0).also { remaining -> + if (remaining) { + out.append(lineBuffer.readString()) + } } } - - return true } @OptIn(InternalAPI::class, UnsafeIoApi::class, InternalIoApi::class) diff --git a/ktor-io/common/src/io/ktor/utils/io/core/Buffers.kt b/ktor-io/common/src/io/ktor/utils/io/core/Buffers.kt index 0d13f2af7db..73600400a9a 100644 --- a/ktor-io/common/src/io/ktor/utils/io/core/Buffers.kt +++ b/ktor-io/common/src/io/ktor/utils/io/core/Buffers.kt @@ -4,8 +4,8 @@ package io.ktor.utils.io.core -import kotlinx.io.* import kotlinx.io.Buffer +import kotlinx.io.readByteArray /** * Read the specified number of bytes specified (optional, read all remaining by default) diff --git a/ktor-io/jvm/test/ByteReadChannelOperationsJvmTest.kt b/ktor-io/jvm/test/ByteReadChannelOperationsJvmTest.kt index 7b6514c37c3..80688722e93 100644 --- a/ktor-io/jvm/test/ByteReadChannelOperationsJvmTest.kt +++ b/ktor-io/jvm/test/ByteReadChannelOperationsJvmTest.kt @@ -2,9 +2,12 @@ * Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ +import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time import io.ktor.utils.io.* import kotlinx.coroutines.* import kotlin.test.* +import kotlin.time.Duration.Companion.seconds +import kotlin.time.measureTime class ByteReadChannelOperationsJvmTest { @@ -75,4 +78,25 @@ class ByteReadChannelOperationsJvmTest { } assertEquals(42, channel.readLong()) } + + @OptIn(InternalAPI::class) + @Test + fun readUTF8LineTo() = runBlocking { + var lineNumber = 0 + var count = 0 + val numberOfLines = 200_000 + val channel = writer(Dispatchers.IO) { + for (line in generateSequence { "line ${lineNumber++}\n" }.take(numberOfLines)) + channel.writeStringUtf8(line) + }.channel + val out = StringBuilder() + val time = measureTime { + while (channel.readUTF8LineTo(out) && count < numberOfLines) + count++ + } + + assertEquals(numberOfLines, count) + assertTrue(time < 5.seconds, "Expected I/O to be complete in a reasonable time, but it took $time") + assertEquals(2_088_890, out.length) + } }