Skip to content

Commit

Permalink
KTOR-7941 Fix performance on readUtfLineTo
Browse files Browse the repository at this point in the history
  • Loading branch information
bjhham committed Dec 11, 2024
1 parent 2844ae2 commit ef7c54c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 51 deletions.
81 changes: 31 additions & 50 deletions ktor-io/common/src/io/ktor/utils/io/ByteReadChannelOperations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import kotlinx.io.Buffer
import kotlinx.io.bytestring.*
import kotlinx.io.unsafe.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.math.*

@OptIn(InternalAPI::class)
Expand Down Expand Up @@ -357,6 +356,9 @@ public suspend fun ByteReadChannel.discard(max: Long = Long.MAX_VALUE): Long {
return max - remaining
}

private const val CR: Byte = '\r'.code.toByte()
private const val LF: Byte = '\n'.code.toByte()

/**
* Reads a line of UTF-8 characters to the specified [out] buffer.
* It recognizes CR, LF and CRLF as a line delimiter.
Expand All @@ -372,62 +374,41 @@ public suspend fun ByteReadChannel.readUTF8LineTo(out: Appendable, max: Int = In
if (readBuffer.exhausted()) awaitContent()
if (isClosedForRead) return false

var consumed = 0
while (!isClosedForRead) {
awaitContent()

val cr = readBuffer.indexOf('\r'.code.toByte())
val lf = readBuffer.indexOf('\n'.code.toByte())

// No new line separator
if (cr == -1L && lf == -1L) {
if (max == Int.MAX_VALUE) {
val value = readBuffer.readString()
out.append(value)
} else {
val count = minOf(max - consumed, readBuffer.remaining.toInt())
consumed += count
out.append(readBuffer.readString(count.toLong()))

if (consumed == max) throw TooLongLineException("Line exceeds limit of $max characters")
Buffer().use { lineBuffer ->
while (!isClosedForRead) {
while (!readBuffer.exhausted()) {
when (val b = readBuffer.readByte()) {
CR -> {
// Check if LF follows CR after awaiting
if (readBuffer.exhausted()) awaitContent()
if (readBuffer.buffer[0] == LF) {
readBuffer.discard(1)
}
out.append(lineBuffer.readString())
return true
}

LF -> {
out.append(lineBuffer.readString())
return true
}

else -> lineBuffer.writeByte(b)
}
}

continue
}

// CRLF fully in buffer
if (cr >= 0 && lf == cr + 1) {
val count = if (max != Int.MAX_VALUE) cr else minOf(max - consumed, cr.toInt()).toLong()
out.append(readBuffer.readString(count))
if (count == cr) readBuffer.discard(2)
return true
}

// CR in buffer before LF
if (cr >= 0 && (lf == -1L || cr < lf)) {
val count = if (max != Int.MAX_VALUE) cr else minOf(max - consumed, cr.toInt()).toLong()
out.append(readBuffer.readString(count))
if (count == cr) readBuffer.discard(1)

// Check if LF follows CR after awaiting
if (readBuffer.exhausted()) awaitContent()
if (readBuffer.buffer[0] == '\n'.code.toByte()) {
readBuffer.discard(1)
if (lineBuffer.size >= max) {
throw TooLongLineException("Line exceeds limit of $max characters")
}

return true
awaitContent()
}

// LF in buffer before CR
if (lf >= 0) {
val count = if (max != Int.MAX_VALUE) lf else minOf(max - consumed, lf.toInt()).toLong()
out.append(readBuffer.readString(count))
if (count == lf) readBuffer.discard(1)
return true
return (lineBuffer.size > 0).also { remaining ->
if (remaining) {
out.append(lineBuffer.readString())
}
}
}

return true
}

@OptIn(InternalAPI::class, UnsafeIoApi::class, InternalIoApi::class)
Expand Down
2 changes: 1 addition & 1 deletion ktor-io/common/src/io/ktor/utils/io/core/Buffers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.ktor.utils.io.core

import kotlinx.io.*
import kotlinx.io.Buffer
import kotlinx.io.readByteArray

/**
* Read the specified number of bytes specified (optional, read all remaining by default)
Expand Down
24 changes: 24 additions & 0 deletions ktor-io/jvm/test/ByteReadChannelOperationsJvmTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
* Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds
import kotlin.time.measureTime

class ByteReadChannelOperationsJvmTest {

Expand Down Expand Up @@ -75,4 +78,25 @@ class ByteReadChannelOperationsJvmTest {
}
assertEquals(42, channel.readLong())
}

@OptIn(InternalAPI::class)
@Test
fun readUTF8LineTo() = runBlocking {
var lineNumber = 0
var count = 0
val numberOfLines = 200_000
val channel = writer(Dispatchers.IO) {
for (line in generateSequence { "line ${lineNumber++}\n" }.take(numberOfLines))
channel.writeStringUtf8(line)
}.channel
val out = StringBuilder()
val time = measureTime {
while (channel.readUTF8LineTo(out) && count < numberOfLines)
count++
}

assertEquals(numberOfLines, count)
assertTrue(time < 5.seconds, "Expected I/O to be complete in a reasonable time, but it took $time")
assertEquals(2_088_890, out.length)
}
}

0 comments on commit ef7c54c

Please sign in to comment.