Skip to content

Commit

Permalink
Propagate byte read/write counts from unsafe buffer ops (#364)
Browse files Browse the repository at this point in the history
* Propagate byte read/write counts from unsafe buffer ops
* Include JVM-specific extensions as well

This allows the caller to more easily perform their own bookkeeping based on the amount of bytes moved.

Closes #360
  • Loading branch information
JakeWharton authored Aug 14, 2024
1 parent ecb1d38 commit d7b0533
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 67 deletions.
14 changes: 7 additions & 7 deletions core/api/kotlinx-io-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,16 @@ public final class kotlinx/io/unsafe/UnsafeBufferOperations {
public final fun iterate (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
public final fun moveToTail (Lkotlinx/io/Buffer;[BII)V
public static synthetic fun moveToTail$default (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[BIIILjava/lang/Object;)V
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)V
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)V
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function2;)V
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)V
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function2;)I
public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)I
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function2;)I
public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)I
}

public final class kotlinx/io/unsafe/UnsafeBufferOperationsJvmKt {
public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)V
public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)V
public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)V
public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)J
public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)I
public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)I
}

public final class kotlinx/io/unsafe/UnsafeBufferOperationsKt {
Expand Down
8 changes: 4 additions & 4 deletions core/api/kotlinx-io-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ final object kotlinx.io.unsafe/UnsafeBufferOperations { // kotlinx.io.unsafe/Uns
final fun moveToTail(kotlinx.io/Buffer, kotlin/ByteArray, kotlin/Int = ..., kotlin/Int = ...) // kotlinx.io.unsafe/UnsafeBufferOperations.moveToTail|moveToTail(kotlinx.io.Buffer;kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0]
final inline fun iterate(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Unit>){}[0]
final inline fun iterate(kotlinx.io/Buffer, kotlin/Long, kotlin/Function3<kotlinx.io.unsafe/BufferIterationContext, kotlinx.io/Segment?, kotlin/Long, kotlin/Unit>) // kotlinx.io.unsafe/UnsafeBufferOperations.iterate|iterate(kotlinx.io.Buffer;kotlin.Long;kotlin.Function3<kotlinx.io.unsafe.BufferIterationContext,kotlinx.io.Segment?,kotlin.Long,kotlin.Unit>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function2<kotlinx.io.unsafe/SegmentWriteContext, kotlinx.io/Segment, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function2<kotlinx.io.unsafe.SegmentWriteContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function2<kotlinx.io.unsafe/SegmentReadContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function2<kotlinx.io.unsafe.SegmentReadContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function2<kotlinx.io.unsafe/SegmentWriteContext, kotlinx.io/Segment, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function2<kotlinx.io.unsafe.SegmentWriteContext,kotlinx.io.Segment,kotlin.Int>){}[0]
final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3<kotlin/ByteArray, kotlin/Int, kotlin/Int, kotlin/Int>): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3<kotlin.ByteArray,kotlin.Int,kotlin.Int,kotlin.Int>){}[0]
}

final val kotlinx.io.files/SystemFileSystem // kotlinx.io.files/SystemFileSystem|{}SystemFileSystem[0]
Expand Down
50 changes: 34 additions & 16 deletions core/common/src/unsafe/UnsafeBufferOperations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public object UnsafeBufferOperations {
* and data from the consumed prefix will be no longer available for read.
* If the operation does not consume anything, the action should return `0`.
* It's considered an error to return a negative value or a value exceeding the size of a readable range.
* This value will also be propagated as the function return value.
*
* If [readAction] ends execution by throwing an exception, no data will be consumed from the buffer.
*
Expand All @@ -75,6 +76,8 @@ public object UnsafeBufferOperations {
* the best effort basis, meaning that there are no strong zero-copy guarantees
* and the copy will be created if it could not be omitted.
*
* @return Number of bytes consumed as returned by [readAction].
*
* @throws IllegalStateException when [readAction] returns negative value or a values exceeding
* the `endIndexExclusive - startIndexInclusive` value.
* @throws IllegalArgumentException when the [buffer] is empty.
Expand All @@ -84,14 +87,16 @@ public object UnsafeBufferOperations {
public inline fun readFromHead(
buffer: Buffer,
readAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int
) {
): Int {
require(!buffer.exhausted()) { "Buffer is empty" }
val head = buffer.head!!
val bytesRead = readAction(head.dataAsByteArray(true), head.pos, head.limit)
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead == 0) return
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
if (bytesRead != 0) {
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
}
return bytesRead
}

/**
Expand All @@ -107,26 +112,31 @@ public object UnsafeBufferOperations {
* and data from the consumed prefix will be no longer available for read.
* If the operation does not consume anything, the action should return `0`.
* It's considered an error to return a negative value or a value exceeding the [Segment.size].
* This value will also be propagated as the function return value.
*
* Both [readAction] arguments are valid only within [readAction] scope,
* it's an error to store and reuse it later.
*
* If the buffer is empty, [IllegalArgumentException] will be thrown.
*
* @return Number of bytes consumed as returned by [readAction].
*
* @throws IllegalStateException when [readAction] returns negative value or a values exceeding
* the [Segment.size] value.
* @throws IllegalArgumentException when the [buffer] is empty.
*
* @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.readUleb128
*/
public inline fun readFromHead(buffer: Buffer, readAction: (SegmentReadContext, Segment) -> Int) {
public inline fun readFromHead(buffer: Buffer, readAction: (SegmentReadContext, Segment) -> Int): Int {
require(!buffer.exhausted()) { "Buffer is empty" }
val head = buffer.head!!
val bytesRead = readAction(SegmentReadContextImpl, head)
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead == 0) return
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
if (bytesRead != 0) {
if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count")
if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes")
buffer.skip(bytesRead.toLong())
}
return bytesRead
}

/**
Expand All @@ -146,13 +156,16 @@ public object UnsafeBufferOperations {
* The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer.
* If no data was written, `0` should be returned.
* It's an error to return a negative value or a value exceeding the size of the provided writeable range.
* This value will also be propagated as the function return value.
*
* If [writeAction] ends execution by throwing an exception, no data will be written to the buffer.
*
* The data array is passed to the [writeAction] directly from the buffer's internal storage without copying
* on the best-effort basis, meaning that there are no strong zero-copy guarantees
* and the copy will be created if it could not be omitted.
*
* @return Number of bytes written as returned by [writeAction].
*
* @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled.
* @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding
* the `endIndexExclusive - startIndexInclusive` value.
Expand All @@ -162,7 +175,7 @@ public object UnsafeBufferOperations {
public inline fun writeToTail(
buffer: Buffer, minimumCapacity: Int,
writeAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int
) {
): Int {
val tail = buffer.writableSegment(minimumCapacity)

val data = tail.dataAsByteArray(false)
Expand All @@ -175,7 +188,7 @@ public object UnsafeBufferOperations {
tail.writeBackData(data, bytesWritten)
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}

check(bytesWritten in 0..tail.remainingCapacity) {
Expand All @@ -185,11 +198,12 @@ public object UnsafeBufferOperations {
tail.writeBackData(data, bytesWritten)
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}
if (tail.isEmpty()) {
buffer.recycleTail()
}
return bytesWritten
}

/**
Expand All @@ -208,10 +222,13 @@ public object UnsafeBufferOperations {
* The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer.
* If no data was written, `0` should be returned.
* It's an error to return a negative value or a value exceeding the [Segment.remainingCapacity].
* This value will also be propagated as the function return value.
*
* Both [writeAction] arguments are valid only within [writeAction] scope,
* it's an error to store and reuse it later.
*
* @return Number of bytes written as returned by [writeAction].
*
* @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled.
* @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding
* the [Segment.remainingCapacity] value for the provided segment.
Expand All @@ -222,15 +239,15 @@ public object UnsafeBufferOperations {
buffer: Buffer,
minimumCapacity: Int,
writeAction: (SegmentWriteContext, Segment) -> Int
) {
): Int {
val tail = buffer.writableSegment(minimumCapacity)
val bytesWritten = writeAction(SegmentWriteContextImpl, tail)

// fast path
if (bytesWritten == minimumCapacity) {
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}

check(bytesWritten in 0..tail.remainingCapacity) {
Expand All @@ -239,12 +256,13 @@ public object UnsafeBufferOperations {
if (bytesWritten != 0) {
tail.limit += bytesWritten
buffer.sizeMut += bytesWritten
return
return bytesWritten
}

if (tail.isEmpty()) {
buffer.recycleTail()
}
return bytesWritten
}

/**
Expand Down
20 changes: 13 additions & 7 deletions core/common/test/unsafe/UnsafeBufferOperationsReadTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
import kotlin.test.fail

@OptIn(UnsafeIoApi::class)
class UnsafeBufferOperationsReadTest {
Expand All @@ -38,10 +39,11 @@ class UnsafeBufferOperationsReadTest {

val buffer = Buffer().apply { write(expectedData) }
for (idx in actualData.indices) {
UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ ->
val read = UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ ->
actualData[idx] = data[startIndex]
1
}
assertEquals(1, read)
assertEquals(actualData.size - idx - 1, buffer.size.toInt())
}
assertTrue(buffer.exhausted())
Expand All @@ -51,36 +53,40 @@ class UnsafeBufferOperationsReadTest {
@Test
fun readNothing() {
val buffer = Buffer().apply { writeInt(42) }
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
val read1 = UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
assertEquals(0, read1)
assertEquals(42, buffer.readInt())

buffer.writeInt(42)
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
val read2 = UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
assertEquals(0, read2)
assertEquals(42, buffer.readInt())
}

@Test
fun readEverything() {
val buffer = Buffer().apply { writeString("hello world") }
UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex ->
val read1 = UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex ->
endIndex - startIndex
}
assertEquals(11, read1)
assertTrue(buffer.exhausted())

buffer.writeString("hello world")
UnsafeBufferOperations.readFromHead(buffer) { _, seg -> seg.size }
val read2 = UnsafeBufferOperations.readFromHead(buffer) { _, seg -> seg.size }
assertEquals(11, read2)
assertTrue(buffer.exhausted())
}

@Test
fun readFromEmptyBuffer() {
val buffer = Buffer()
assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 }
UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> fail() }
}

assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> 0 }
UnsafeBufferOperations.readFromHead(buffer) { _, _ -> fail() }
}
}

Expand Down
19 changes: 12 additions & 7 deletions core/common/test/unsafe/UnsafeBufferOperationsWriteTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ class UnsafeBufferOperationsWriteTest {
val data = "hello world".encodeToByteArray()

for (idx in data.indices) {
UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ ->
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ ->
writeable[pos] = data[idx]
1
}
assertEquals(1, written)
assertEquals(idx + 1, buffer.size.toInt())
}
assertEquals("hello world", buffer.readString())
Expand All @@ -51,10 +52,12 @@ class UnsafeBufferOperationsWriteTest {
fun writeNothing() {
val buffer = Buffer()

UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 }
val write1 = UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 }
assertEquals(0, write1)
assertTrue(buffer.exhausted())

UnsafeBufferOperations.writeToTail(buffer, 1) { _, _ -> 0 }
val write2 = UnsafeBufferOperations.writeToTail(buffer, 1) { _, _ -> 0 }
assertEquals(0, write2)
assertTrue(buffer.exhausted())

buffer.writeInt(42)
Expand All @@ -75,12 +78,13 @@ class UnsafeBufferOperationsWriteTest {
@Test
fun writeWholeBuffer() {
val buffer = Buffer()
UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to ->
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to ->
for (idx in from..<to) {
data[idx] = 42
}
to - from
}
assertEquals(Segment.SIZE, written)
assertEquals(Segment.SIZE, buffer.size.toInt())
assertArrayEquals(ByteArray(Segment.SIZE) { 42 }, buffer.readByteArray())
}
Expand All @@ -89,25 +93,26 @@ class UnsafeBufferOperationsWriteTest {
fun writeWithCtx() {
val buffer = Buffer()

UnsafeBufferOperations.writeToTail(buffer, 1) { ctx, segment ->
val written = UnsafeBufferOperations.writeToTail(buffer, 1) { ctx, segment ->
ctx.setUnchecked(segment, 0, 1)
ctx.setUnchecked(segment, 1, 2)
2
}

assertEquals(2, written)
assertArrayEquals(byteArrayOf(1, 2), buffer.readByteArray())
}

@Test
fun requireToManyBytes() {
val buffer = Buffer()
assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> 0 }
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> fail() }
}
assertTrue(buffer.exhausted())

assertFailsWith<IllegalArgumentException> {
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _ -> 0 }
UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _ -> fail() }
}
assertTrue(buffer.exhausted())
}
Expand Down
Loading

0 comments on commit d7b0533

Please sign in to comment.