diff --git a/core/api/kotlinx-io-core.api b/core/api/kotlinx-io-core.api index 78260853c..2fb84fbd6 100644 --- a/core/api/kotlinx-io-core.api +++ b/core/api/kotlinx-io-core.api @@ -10,7 +10,10 @@ public final class kotlinx/io/Buffer : kotlinx/io/Sink, kotlinx/io/Source { public fun flush ()V public final fun get (J)B public fun getBuffer ()Lkotlinx/io/Buffer; + public final synthetic fun getHead ()Lkotlinx/io/Segment; public final fun getSize ()J + public final synthetic fun getSizeMut ()J + public final synthetic fun getTail ()Lkotlinx/io/Segment; public fun hintEmit ()V public fun peek ()Lkotlinx/io/Source; public fun readAtMostTo (Lkotlinx/io/Buffer;J)J @@ -20,12 +23,17 @@ public final class kotlinx/io/Buffer : kotlinx/io/Sink, kotlinx/io/Source { public fun readLong ()J public fun readShort ()S public fun readTo (Lkotlinx/io/RawSink;J)V + public final synthetic fun recycleTail ()V public fun request (J)Z public fun require (J)V + public final synthetic fun setHead (Lkotlinx/io/Segment;)V + public final synthetic fun setSizeMut (J)V + public final synthetic fun setTail (Lkotlinx/io/Segment;)V public fun skip (J)V public fun toString ()Ljava/lang/String; public fun transferFrom (Lkotlinx/io/RawSource;)J public fun transferTo (Lkotlinx/io/RawSink;)J + public final synthetic fun writableSegment (I)Lkotlinx/io/Segment; public fun write (Lkotlinx/io/Buffer;J)V public fun write (Lkotlinx/io/RawSource;J)V public fun write ([BII)V @@ -92,6 +100,24 @@ public abstract interface class kotlinx/io/RawSource : java/lang/AutoCloseable { public abstract fun readAtMostTo (Lkotlinx/io/Buffer;J)J } +public final class kotlinx/io/Segment { + public synthetic fun ([BIIZZLkotlin/jvm/internal/DefaultConstructorMarker;)V + public final synthetic fun dataAsByteArray (Z)[B + public final synthetic fun getLimit ()I + public final synthetic fun getNext ()Lkotlinx/io/Segment; + public final synthetic fun getPos ()I + public final synthetic fun getRemainingCapacity ()I + public final synthetic fun getSize ()I + public final synthetic fun setLimit (I)V + public final synthetic fun setNext (Lkotlinx/io/Segment;)V + public final synthetic fun setPos (I)V + public final synthetic fun writeBackData ([BI)V +} + +public final class kotlinx/io/SegmentKt { + public static final fun isEmpty (Lkotlinx/io/Segment;)Z +} + public abstract interface class kotlinx/io/Sink : kotlinx/io/RawSink { public abstract fun emit ()V public abstract fun flush ()V @@ -186,6 +212,9 @@ public final class kotlinx/io/SourcesKt { public static final fun startsWith (Lkotlinx/io/Source;B)Z } +public abstract interface annotation class kotlinx/io/UnsafeIoApi : java/lang/annotation/Annotation { +} + public final class kotlinx/io/Utf8Kt { public static final fun readCodePointValue (Lkotlinx/io/Source;)I public static final fun readLine (Lkotlinx/io/Source;)Ljava/lang/String; @@ -253,3 +282,18 @@ public final class kotlinx/io/files/PathsKt { public static final fun sourceDeprecated (Lkotlinx/io/files/Path;)Lkotlinx/io/Source; } +public final class kotlinx/io/unsafe/UnsafeBufferOperations { + public static final field INSTANCE Lkotlinx/io/unsafe/UnsafeBufferOperations; + public final fun getMaxSafeWriteCapacity ()I + public final fun moveToTail (Lkotlinx/io/Buffer;[BII)V + public static synthetic fun moveToTail$default (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[BIIILjava/lang/Object;)V + public final fun readFromHead (Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function3;)V + public final fun writeToTail (Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function3;)V +} + +public final class kotlinx/io/unsafe/UnsafeBufferOperationsJvmKt { + public static final fun readBulk (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;[Ljava/nio/ByteBuffer;Lkotlin/jvm/functions/Function2;)V + public static final fun readFromHead (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;Lkotlin/jvm/functions/Function1;)V + public static final fun writeToTail (Lkotlinx/io/unsafe/UnsafeBufferOperations;Lkotlinx/io/Buffer;ILkotlin/jvm/functions/Function1;)V +} + diff --git a/core/api/kotlinx-io-core.klib.api b/core/api/kotlinx-io-core.klib.api index c02943284..78a89aa59 100644 --- a/core/api/kotlinx-io-core.klib.api +++ b/core/api/kotlinx-io-core.klib.api @@ -55,12 +55,14 @@ final class kotlinx.io/Buffer : kotlinx.io/Sink, kotlinx.io/Source { // kotlinx. final fun readLong(): kotlin/Long // kotlinx.io/Buffer.readLong|readLong(){}[0] final fun readShort(): kotlin/Short // kotlinx.io/Buffer.readShort|readShort(){}[0] final fun readTo(kotlinx.io/RawSink, kotlin/Long) // kotlinx.io/Buffer.readTo|readTo(kotlinx.io.RawSink;kotlin.Long){}[0] + final fun recycleTail() // kotlinx.io/Buffer.recycleTail|recycleTail(){}[0] final fun request(kotlin/Long): kotlin/Boolean // kotlinx.io/Buffer.request|request(kotlin.Long){}[0] final fun require(kotlin/Long) // kotlinx.io/Buffer.require|require(kotlin.Long){}[0] final fun skip(kotlin/Long) // kotlinx.io/Buffer.skip|skip(kotlin.Long){}[0] final fun toString(): kotlin/String // kotlinx.io/Buffer.toString|toString(){}[0] final fun transferFrom(kotlinx.io/RawSource): kotlin/Long // kotlinx.io/Buffer.transferFrom|transferFrom(kotlinx.io.RawSource){}[0] final fun transferTo(kotlinx.io/RawSink): kotlin/Long // kotlinx.io/Buffer.transferTo|transferTo(kotlinx.io.RawSink){}[0] + final fun writableSegment(kotlin/Int): kotlinx.io/Segment // kotlinx.io/Buffer.writableSegment|writableSegment(kotlin.Int){}[0] final fun write(kotlin/ByteArray, kotlin/Int, kotlin/Int) // kotlinx.io/Buffer.write|write(kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0] final fun write(kotlinx.io/Buffer, kotlin/Long) // kotlinx.io/Buffer.write|write(kotlinx.io.Buffer;kotlin.Long){}[0] final fun write(kotlinx.io/RawSource, kotlin/Long) // kotlinx.io/Buffer.write|write(kotlinx.io.RawSource;kotlin.Long){}[0] @@ -70,8 +72,34 @@ final class kotlinx.io/Buffer : kotlinx.io/Sink, kotlinx.io/Source { // kotlinx. final fun writeShort(kotlin/Short) // kotlinx.io/Buffer.writeShort|writeShort(kotlin.Short){}[0] final val buffer // kotlinx.io/Buffer.buffer|{}buffer[0] final fun (): kotlinx.io/Buffer // kotlinx.io/Buffer.buffer.|(){}[0] - final var size // kotlinx.io/Buffer.size|{}size[0] + final val size // kotlinx.io/Buffer.size|{}size[0] final fun (): kotlin/Long // kotlinx.io/Buffer.size.|(){}[0] + final var head // kotlinx.io/Buffer.head|{}head[0] + final fun (): kotlinx.io/Segment? // kotlinx.io/Buffer.head.|(){}[0] + final fun (kotlinx.io/Segment?) // kotlinx.io/Buffer.head.|(kotlinx.io.Segment?){}[0] + final var sizeMut // kotlinx.io/Buffer.sizeMut|{}sizeMut[0] + final fun (): kotlin/Long // kotlinx.io/Buffer.sizeMut.|(){}[0] + final fun (kotlin/Long) // kotlinx.io/Buffer.sizeMut.|(kotlin.Long){}[0] + final var tail // kotlinx.io/Buffer.tail|{}tail[0] + final fun (): kotlinx.io/Segment? // kotlinx.io/Buffer.tail.|(){}[0] + final fun (kotlinx.io/Segment?) // kotlinx.io/Buffer.tail.|(kotlinx.io.Segment?){}[0] +} +final class kotlinx.io/Segment { // kotlinx.io/Segment|null[0] + final fun dataAsByteArray(kotlin/Boolean): kotlin/ByteArray // kotlinx.io/Segment.dataAsByteArray|dataAsByteArray(kotlin.Boolean){}[0] + final fun writeBackData(kotlin/ByteArray, kotlin/Int) // kotlinx.io/Segment.writeBackData|writeBackData(kotlin.ByteArray;kotlin.Int){}[0] + final val remainingCapacity // kotlinx.io/Segment.remainingCapacity|{}remainingCapacity[0] + final fun (): kotlin/Int // kotlinx.io/Segment.remainingCapacity.|(){}[0] + final val size // kotlinx.io/Segment.size|{}size[0] + final fun (): kotlin/Int // kotlinx.io/Segment.size.|(){}[0] + final var limit // kotlinx.io/Segment.limit|{}limit[0] + final fun (): kotlin/Int // kotlinx.io/Segment.limit.|(){}[0] + final fun (kotlin/Int) // kotlinx.io/Segment.limit.|(kotlin.Int){}[0] + final var next // kotlinx.io/Segment.next|{}next[0] + final fun (): kotlinx.io/Segment? // kotlinx.io/Segment.next.|(){}[0] + final fun (kotlinx.io/Segment?) // kotlinx.io/Segment.next.|(kotlinx.io.Segment?){}[0] + final var pos // kotlinx.io/Segment.pos|{}pos[0] + final fun (): kotlin/Int // kotlinx.io/Segment.pos.|(){}[0] + final fun (kotlin/Int) // kotlinx.io/Segment.pos.|(kotlin.Int){}[0] } final fun (kotlinx.io.files/Path).kotlinx.io.files/sink(): kotlinx.io/Sink // kotlinx.io.files/sink|sink@kotlinx.io.files.Path(){}[0] final fun (kotlinx.io.files/Path).kotlinx.io.files/source(): kotlinx.io/Source // kotlinx.io.files/source|source@kotlinx.io.files.Path(){}[0] @@ -81,6 +109,7 @@ final fun (kotlinx.io/Buffer).kotlinx.io/readString(): kotlin/String // kotlinx. final fun (kotlinx.io/Buffer).kotlinx.io/snapshot(): kotlinx.io.bytestring/ByteString // kotlinx.io/snapshot|snapshot@kotlinx.io.Buffer(){}[0] final fun (kotlinx.io/RawSink).kotlinx.io/buffered(): kotlinx.io/Sink // kotlinx.io/buffered|buffered@kotlinx.io.RawSink(){}[0] final fun (kotlinx.io/RawSource).kotlinx.io/buffered(): kotlinx.io/Source // kotlinx.io/buffered|buffered@kotlinx.io.RawSource(){}[0] +final fun (kotlinx.io/Segment).kotlinx.io/isEmpty(): kotlin/Boolean // kotlinx.io/isEmpty|isEmpty@kotlinx.io.Segment(){}[0] final fun (kotlinx.io/Sink).kotlinx.io/write(kotlinx.io.bytestring/ByteString, kotlin/Int = ..., kotlin/Int = ...) // kotlinx.io/write|write@kotlinx.io.Sink(kotlinx.io.bytestring.ByteString;kotlin.Int;kotlin.Int){}[0] final fun (kotlinx.io/Sink).kotlinx.io/writeCodePointValue(kotlin/Int) // kotlinx.io/writeCodePointValue|writeCodePointValue@kotlinx.io.Sink(kotlin.Int){}[0] final fun (kotlinx.io/Sink).kotlinx.io/writeDecimalLong(kotlin/Long) // kotlinx.io/writeDecimalLong|writeDecimalLong@kotlinx.io.Sink(kotlin.Long){}[0] @@ -135,6 +164,13 @@ final fun kotlinx.io.files/Path(kotlin/String, kotlin/Array.. final fun kotlinx.io.files/Path(kotlinx.io.files/Path, kotlin/Array...): kotlinx.io.files/Path // kotlinx.io.files/Path|Path(kotlinx.io.files.Path;kotlin.Array...){}[0] final fun kotlinx.io/discardingSink(): kotlinx.io/RawSink // kotlinx.io/discardingSink|discardingSink(){}[0] final inline fun (kotlinx.io/Sink).kotlinx.io/writeToInternalBuffer(kotlin/Function1) // kotlinx.io/writeToInternalBuffer|writeToInternalBuffer@kotlinx.io.Sink(kotlin.Function1){}[0] +final object kotlinx.io.unsafe/UnsafeBufferOperations { // kotlinx.io.unsafe/UnsafeBufferOperations|null[0] + final fun moveToTail(kotlinx.io/Buffer, kotlin/ByteArray, kotlin/Int = ..., kotlin/Int = ...) // kotlinx.io.unsafe/UnsafeBufferOperations.moveToTail|moveToTail(kotlinx.io.Buffer;kotlin.ByteArray;kotlin.Int;kotlin.Int){}[0] + final inline fun readFromHead(kotlinx.io/Buffer, kotlin/Function3) // kotlinx.io.unsafe/UnsafeBufferOperations.readFromHead|readFromHead(kotlinx.io.Buffer;kotlin.Function3){}[0] + final inline fun writeToTail(kotlinx.io/Buffer, kotlin/Int, kotlin/Function3) // kotlinx.io.unsafe/UnsafeBufferOperations.writeToTail|writeToTail(kotlinx.io.Buffer;kotlin.Int;kotlin.Function3){}[0] + final val maxSafeWriteCapacity // kotlinx.io.unsafe/UnsafeBufferOperations.maxSafeWriteCapacity|{}maxSafeWriteCapacity[0] + final fun (): kotlin/Int // kotlinx.io.unsafe/UnsafeBufferOperations.maxSafeWriteCapacity.|(){}[0] +} final val kotlinx.io.files/SystemFileSystem // kotlinx.io.files/SystemFileSystem|{}SystemFileSystem[0] final fun (): kotlinx.io.files/FileSystem // kotlinx.io.files/SystemFileSystem.|(){}[0] final val kotlinx.io.files/SystemPathSeparator // kotlinx.io.files/SystemPathSeparator|{}SystemPathSeparator[0] @@ -147,6 +183,9 @@ open annotation class kotlinx.io/DelicateIoApi : kotlin/Annotation { // kotlinx. open annotation class kotlinx.io/InternalIoApi : kotlin/Annotation { // kotlinx.io/InternalIoApi|null[0] constructor () // kotlinx.io/InternalIoApi.|(){}[0] } +open annotation class kotlinx.io/UnsafeIoApi : kotlin/Annotation { // kotlinx.io/UnsafeIoApi|null[0] + constructor () // kotlinx.io/UnsafeIoApi.|(){}[0] +} open class kotlinx.io.files/FileNotFoundException : kotlinx.io/IOException { // kotlinx.io.files/FileNotFoundException|null[0] constructor (kotlin/String?) // kotlinx.io.files/FileNotFoundException.|(kotlin.String?){}[0] } diff --git a/core/apple/src/AppleCore.kt b/core/apple/src/AppleCore.kt index 36d235078..2eaf59cc7 100644 --- a/core/apple/src/AppleCore.kt +++ b/core/apple/src/AppleCore.kt @@ -49,7 +49,7 @@ private open class OutputStreamSink( head.pos += bytesWritten.toInt() remaining -= bytesWritten - source.size -= bytesWritten + source.sizeMut -= bytesWritten if (head.pos == head.limit) { source.recycleHead() @@ -105,7 +105,7 @@ private open class NSInputStreamSource( return -1 } tail.limit += bytesRead.toInt() - sink.size += bytesRead + sink.sizeMut += bytesRead return bytesRead } diff --git a/core/apple/src/BuffersApple.kt b/core/apple/src/BuffersApple.kt index f2258e0f8..292b9e262 100644 --- a/core/apple/src/BuffersApple.kt +++ b/core/apple/src/BuffersApple.kt @@ -30,7 +30,7 @@ internal fun Buffer.write(source: CPointer, maxLength: Int) { currentOffset += toCopy tail.limit += toCopy } - size += maxLength + this.sizeMut += maxLength } internal fun Buffer.readAtMostTo(sink: CPointer, maxLength: Int): Int { @@ -43,7 +43,7 @@ internal fun Buffer.readAtMostTo(sink: CPointer, maxLength: Int): In } s.pos += toCopy - size -= toCopy.toLong() + this.sizeMut -= toCopy.toLong() if (s.pos == s.limit) { recycleHead() diff --git a/core/common/src/Annotations.kt b/core/common/src/Annotations.kt index c4fb69992..82754aa1c 100644 --- a/core/common/src/Annotations.kt +++ b/core/common/src/Annotations.kt @@ -1,5 +1,5 @@ /* - * Copyright 2010-2023 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. */ @@ -38,4 +38,18 @@ public annotation class DelicateIoApi "Make sure you fully read and understand documentation of the declaration that " + "is marked as an internal API." ) -public annotation class InternalIoApi \ No newline at end of file +public annotation class InternalIoApi + +/** + * Marks API that may cause data corruption or loss or behave unpredictable when used with invalid argument values. + * + * Consider using other APIs instead when possible. + * Otherwise, make sure to read documentation describing an unsafe API. + */ +@Retention(AnnotationRetention.BINARY) +@RequiresOptIn( + level = RequiresOptIn.Level.WARNING, + message = "This is an unsafe API and its use requires care. " + + "Make sure you fully understand documentation of the declaration marked as UnsafeIoApi" +) +public annotation class UnsafeIoApi diff --git a/core/common/src/Buffer.kt b/core/common/src/Buffer.kt index 7e73b4f85..40cef38e7 100644 --- a/core/common/src/Buffer.kt +++ b/core/common/src/Buffer.kt @@ -20,7 +20,7 @@ */ package kotlinx.io -import kotlin.jvm.JvmField +import kotlin.jvm.JvmSynthetic /** * A collection of bytes in memory. @@ -39,17 +39,26 @@ import kotlin.jvm.JvmField * does not affect buffer's state and [exhausted] only indicates that a buffer is empty. */ public class Buffer : Source, Sink { - @JvmField + @PublishedApi + @get:JvmSynthetic + @set:JvmSynthetic internal var head: Segment? = null - @JvmField + @PublishedApi + @get:JvmSynthetic + @set:JvmSynthetic internal var tail: Segment? = null /** * The number of bytes accessible for read from this buffer. */ - public var size: Long = 0L - internal set + public val size: Long + get() = sizeMut + + @PublishedApi + @get:JvmSynthetic + @set:JvmSynthetic + internal var sizeMut: Long = 0L /** * Returns the buffer itself. @@ -77,7 +86,7 @@ public class Buffer : Source, Sink { val limit = segment.limit val data = segment.data val b = data[pos++] - size -= 1L + sizeMut -= 1L if (pos == limit) { recycleHead() } else { @@ -101,7 +110,7 @@ public class Buffer : Source, Sink { val data = segment.data val s = data[pos++] and 0xff shl 8 or (data[pos++] and 0xff) - size -= 2L + sizeMut -= 2L if (pos == limit) { recycleHead() @@ -136,7 +145,7 @@ public class Buffer : Source, Sink { or (data[pos++] and 0xff shl 8) or (data[pos++] and 0xff) ) - size -= 4L + sizeMut -= 4L if (pos == limit) { recycleHead() @@ -173,7 +182,7 @@ public class Buffer : Source, Sink { or (data[pos++] and 0xffL shl 8) or (data[pos++] and 0xffL) ) - size -= 8L + this.sizeMut -= 8L if (pos == limit) { recycleHead() @@ -226,7 +235,7 @@ public class Buffer : Source, Sink { var currentOffset = startIndex var remainingByteCount = endIndex - startIndex - out.size += remainingByteCount + out.sizeMut += remainingByteCount // Skip segments that we aren't copying from. var s = head @@ -305,7 +314,7 @@ public class Buffer : Source, Sink { val head = head ?: throw EOFException("Buffer exhausted before skipping $byteCount bytes.") val toSkip = minOf(remainingByteCount, head.limit - head.pos).toInt() - size -= toSkip.toLong() + sizeMut -= toSkip.toLong() remainingByteCount -= toSkip.toLong() head.pos += toSkip @@ -325,7 +334,7 @@ public class Buffer : Source, Sink { ) s.pos += toCopy - size -= toCopy.toLong() + sizeMut -= toCopy.toLong() if (s.pos == s.limit) { recycleHead() @@ -365,6 +374,8 @@ public class Buffer : Source, Sink { * Returns a tail segment that we can write at least `minimumCapacity` * bytes to, creating it if necessary. */ + @PublishedApi + @JvmSynthetic internal fun writableSegment(minimumCapacity: Int): Segment { require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" } @@ -401,7 +412,7 @@ public class Buffer : Source, Sink { currentOffset += toCopy tail.limit += toCopy } - size += endIndex - startIndex + sizeMut += endIndex - startIndex } override fun write(source: RawSource, byteCount: Long) { @@ -471,7 +482,7 @@ public class Buffer : Source, Sink { // yielding sink [51%, 91%, 30%] and source [62%, 82%]. require(source !== this) { "source == this" } - checkOffsetAndCount(source.size, 0, byteCount) + checkOffsetAndCount(source.sizeMut, 0, byteCount) var remainingByteCount = byteCount @@ -484,8 +495,8 @@ public class Buffer : Source, Sink { ) { // Our existing segments are sufficient. Move bytes from source's head to our tail. source.head!!.writeTo(tail, remainingByteCount.toInt()) - source.size -= remainingByteCount - size += remainingByteCount + source.sizeMut -= remainingByteCount + sizeMut += remainingByteCount return } else { // We're going to need another segment. Split the source's head @@ -506,8 +517,8 @@ public class Buffer : Source, Sink { source.tail = null } pushSegment(segmentToMove, true) - source.size -= movedByteCount - size += movedByteCount + source.sizeMut -= movedByteCount + sizeMut += movedByteCount remainingByteCount -= movedByteCount } } @@ -525,7 +536,7 @@ public class Buffer : Source, Sink { override fun writeByte(byte: Byte) { val tail = writableSegment(1) tail.data[tail.limit++] = byte - size += 1L + sizeMut += 1L } override fun writeShort(short: Short) { @@ -535,7 +546,7 @@ public class Buffer : Source, Sink { data[limit++] = (short.toInt() ushr 8 and 0xff).toByte() data[limit++] = (short.toInt() and 0xff).toByte() tail.limit = limit - size += 2L + sizeMut += 2L } override fun writeInt(int: Int) { @@ -547,7 +558,7 @@ public class Buffer : Source, Sink { data[limit++] = (int ushr 8 and 0xff).toByte() data[limit++] = (int and 0xff).toByte() tail.limit = limit - size += 4L + sizeMut += 4L } override fun writeLong(long: Long) { @@ -563,7 +574,7 @@ public class Buffer : Source, Sink { data[limit++] = (long ushr 8 and 0xffL).toByte() data[limit++] = (long and 0xffL).toByte() tail.limit = limit - size += 8L + sizeMut += 8L } /** @@ -585,7 +596,7 @@ public class Buffer : Source, Sink { s = s.next } - result.size = size + result.sizeMut = size return result } @@ -662,6 +673,8 @@ public class Buffer : Source, Sink { * * It's up to a caller to ensure that the tail exists. */ + @PublishedApi + @JvmSynthetic internal fun recycleTail() { val oldTail = tail!! val newTail = oldTail.prev diff --git a/core/common/src/ByteStrings.kt b/core/common/src/ByteStrings.kt index efdc0d61e..4c8587ebe 100644 --- a/core/common/src/ByteStrings.kt +++ b/core/common/src/ByteStrings.kt @@ -39,14 +39,14 @@ public fun Sink.write(byteString: ByteString, startIndex: Int = 0, endIndex: Int byteString.copyInto(tail.data, tail.limit, offset, offset + bytesToWrite) offset += bytesToWrite tail.limit += bytesToWrite - buffer.size += bytesToWrite + buffer.sizeMut += bytesToWrite } while (offset < endIndex) { val bytesToWrite = min(endIndex - offset, Segment.SIZE) val seg = buffer.writableSegment(bytesToWrite) byteString.copyInto(seg.data, seg.limit, offset, offset + bytesToWrite) seg.limit += bytesToWrite - buffer.size += bytesToWrite + buffer.sizeMut += bytesToWrite offset += bytesToWrite } } diff --git a/core/common/src/Segment.kt b/core/common/src/Segment.kt index 9b6d6d991..f72e4deb1 100644 --- a/core/common/src/Segment.kt +++ b/core/common/src/Segment.kt @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Copyright 2017-2024 JetBrains s.r.o. and respective authors and developers. * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. */ @@ -21,6 +21,7 @@ package kotlinx.io import kotlin.jvm.JvmField +import kotlin.jvm.JvmSynthetic /** * A segment of a buffer. @@ -36,13 +37,15 @@ import kotlin.jvm.JvmField * `limit` and beyond. There is a single owning segment for each byte array. Positions, * limits, prev, and next references are not shared. */ -internal class Segment { +public class Segment { @JvmField - val data: ByteArray + internal val data: ByteArray /** The next byte of application data byte to read in this segment. */ - @JvmField - var pos: Int = 0 + @PublishedApi + @get:JvmSynthetic + @set:JvmSynthetic + internal var pos: Int = 0 /** * The first byte of available data ready to be written to. @@ -50,32 +53,36 @@ internal class Segment { * If the segment is free and linked in the segment pool, the field contains total * byte count of this and next segments. */ - @JvmField - var limit: Int = 0 + @PublishedApi + @get:JvmSynthetic + @set:JvmSynthetic + internal var limit: Int = 0 /** True if other segments or byte strings use the same byte array. */ @JvmField - var shared: Boolean = false + internal var shared: Boolean = false /** True if this segment owns the byte array and can append to it, extending `limit`. */ @JvmField - var owner: Boolean = false + internal var owner: Boolean = false /** Next segment in a list, or null. */ - @JvmField - var next: Segment? = null + @PublishedApi + @get:JvmSynthetic + @set:JvmSynthetic + internal var next: Segment? = null /** Previous segment in the list, or null. */ @JvmField - var prev: Segment? = null + internal var prev: Segment? = null - constructor() { + private constructor() { this.data = ByteArray(SIZE) this.owner = true this.shared = false } - constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) { + private constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) { this.data = data this.pos = pos this.limit = limit @@ -88,7 +95,7 @@ internal class Segment { * are safe but writes are forbidden. This also marks the current segment as shared, which * prevents it from being pooled. */ - fun sharedCopy(): Segment { + internal fun sharedCopy(): Segment { shared = true return Segment(data, pos, limit, true, false) } @@ -97,7 +104,7 @@ internal class Segment { * Removes this segment of a list and returns its successor. * Returns null if the list is now empty. */ - fun pop(): Segment? { + internal fun pop(): Segment? { val result = this.next if (this.prev != null) { this.prev!!.next = this.next @@ -113,7 +120,7 @@ internal class Segment { /** * Appends `segment` after this segment in the list. Returns the pushed segment. */ - fun push(segment: Segment): Segment { + internal fun push(segment: Segment): Segment { segment.prev = this segment.next = this.next if (this.next != null) { @@ -131,7 +138,7 @@ internal class Segment { * * Returns the new head of the list. */ - fun split(byteCount: Int): Segment { + internal fun split(byteCount: Int): Segment { require(byteCount > 0 && byteCount <= limit - pos) { "byteCount out of range" } val prefix: Segment @@ -162,7 +169,7 @@ internal class Segment { * Call this when the tail and its predecessor may both be less than half full. This will copy * data so that segments can be recycled. */ - fun compact(): Segment { + internal fun compact(): Segment { check(this.prev != null) { "cannot compact" } if (!this.prev!!.owner) return this // Cannot compact: prev isn't writable. val byteCount = limit - pos @@ -177,7 +184,7 @@ internal class Segment { } /** Moves `byteCount` bytes from this segment to `sink`. */ - fun writeTo(sink: Segment, byteCount: Int) { + internal fun writeTo(sink: Segment, byteCount: Int) { check(sink.owner) { "only owner can write" } if (sink.limit + byteCount > SIZE) { // We can't fit byteCount bytes at the sink's current position. Shift sink first. @@ -196,15 +203,55 @@ internal class Segment { pos += byteCount } - val size: Int + @PublishedApi + @get:JvmSynthetic + internal val size: Int get() = limit - pos - companion object { + @PublishedApi + @get:JvmSynthetic + internal val remainingCapacity: Int + get() = data.size - limit + + /** + * Return a byte-array view over internal data. + * + * Returned array contains data layed out so that a readable slice starts at + * [Segment.pos] and ends at [Segment.limit], writable slice starts at [Segment.limit] + * and spans over [Segment.remainingCapacity] bytes. + * + * This method exists only to preserve binary compatibility if a segment's internal + * container eventually changes from ByteArray to something else. + */ + @PublishedApi + @JvmSynthetic + @Suppress("UNUSED_PARAMETER") + internal fun dataAsByteArray(readOnly: Boolean): ByteArray = data + + /** + * Write back all modifications that were made to a view returned from [dataAsByteArray]. + * + * This method exists only to preserve binary compatibility if a segment's internal + * container eventually changes from ByteArray to something else. + */ + @PublishedApi + @JvmSynthetic + @Suppress("UNUSED_PARAMETER") + internal fun writeBackData(data: ByteArray, bytesToCommit: Int): Unit = Unit + + internal companion object { /** The size of all segments in bytes. */ - const val SIZE = 8192 + internal const val SIZE = 8192 /** Segments will be shared when doing so avoids `arraycopy()` of this many bytes. */ - const val SHARE_MINIMUM = 1024 + internal const val SHARE_MINIMUM = 1024 + + @JvmSynthetic + internal fun new(): Segment = Segment() + + @JvmSynthetic + internal fun new(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean): Segment + = Segment(data, pos, limit, shared, owner) } } @@ -229,7 +276,6 @@ internal fun Segment.indexOf(byte: Byte, startOffset: Int, endOffset: Int): Int * `startOffset` is relative and should be within `[0, size)`. */ internal fun Segment.indexOfBytesInbound(bytes: ByteArray, startOffset: Int): Int { - // require(startOffset in 0 until size) var offset = startOffset val limit = size - bytes.size + 1 val firstByte = bytes[0] @@ -294,3 +340,6 @@ internal fun Segment.indexOfBytesOutbound(bytes: ByteArray, startOffset: Int): I } return -1 } + +@PublishedApi +internal fun Segment.isEmpty(): Boolean = size == 0 diff --git a/core/common/src/Sinks.kt b/core/common/src/Sinks.kt index 3bb82ff5d..43661fac0 100644 --- a/core/common/src/Sinks.kt +++ b/core/common/src/Sinks.kt @@ -125,7 +125,7 @@ public fun Sink.writeDecimalLong(long: Long) { } tail.limit += width - buffer.size += width.toLong() + buffer.sizeMut += width.toLong() } } @@ -162,7 +162,7 @@ public fun Sink.writeHexadecimalUnsignedLong(long: Long) { pos-- } tail.limit += width - buffer.size += width.toLong() + buffer.sizeMut += width.toLong() } } diff --git a/core/common/src/Utf8.kt b/core/common/src/Utf8.kt index 24d71adb1..31da1471e 100644 --- a/core/common/src/Utf8.kt +++ b/core/common/src/Utf8.kt @@ -483,7 +483,7 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt val runSize = i + segmentOffset - tail.limit // Equivalent to i - (previous i). tail.limit += runSize - size += runSize.toLong() + sizeMut += runSize.toLong() } c < 0x800 -> { @@ -492,7 +492,7 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt tail.data[tail.limit] = (c shr 6 or 0xc0).toByte() // 110xxxxx tail.data[tail.limit + 1] = (c and 0x3f or 0x80).toByte() // 10xxxxxx tail.limit += 2 - size += 2L + sizeMut += 2L i++ } @@ -503,7 +503,7 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt tail.data[tail.limit + 1] = (c shr 6 and 0x3f or 0x80).toByte() // 10xxxxxx tail.data[tail.limit + 2] = (c and 0x3f or 0x80).toByte() // 10xxxxxx tail.limit += 3 - size += 3L + sizeMut += 3L i++ } @@ -528,7 +528,7 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt tail.data[tail.limit + 2] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxyyyy tail.data[tail.limit + 3] = (codePoint and 0x3f or 0x80).toByte() // 10yyyyyy tail.limit += 4 - size += 4L + sizeMut += 4L i += 2 } } @@ -555,7 +555,7 @@ private fun Buffer.commonWriteUtf8CodePoint(codePoint: Int) { tail.data[tail.limit] = (codePoint shr 6 or 0xc0).toByte() // 110xxxxx tail.data[tail.limit + 1] = (codePoint and 0x3f or 0x80).toByte() // 10xxxxxx tail.limit += 2 - size += 2L + sizeMut += 2L } codePoint in 0xd800..0xdfff -> { @@ -570,7 +570,7 @@ private fun Buffer.commonWriteUtf8CodePoint(codePoint: Int) { tail.data[tail.limit + 1] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxxxxx tail.data[tail.limit + 2] = (codePoint and 0x3f or 0x80).toByte() // 10xxxxxx tail.limit += 3 - size += 3L + sizeMut += 3L } else -> { // [0x10000, 0x10ffff] @@ -581,7 +581,7 @@ private fun Buffer.commonWriteUtf8CodePoint(codePoint: Int) { tail.data[tail.limit + 2] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxyyyy tail.data[tail.limit + 3] = (codePoint and 0x3f or 0x80).toByte() // 10yyyyyy tail.limit += 4 - size += 4L + sizeMut += 4L } } } @@ -602,7 +602,7 @@ private fun Buffer.commonReadUtf8(byteCount: Long): String { val result = s.data.commonToUtf8String(s.pos, s.pos + byteCount.toInt()) s.pos += byteCount.toInt() - size -= byteCount + sizeMut -= byteCount if (s.pos == s.limit) { recycleHead() diff --git a/core/common/src/unsafe/UnsafeBufferOperations.kt b/core/common/src/unsafe/UnsafeBufferOperations.kt new file mode 100644 index 000000000..545e62ade --- /dev/null +++ b/core/common/src/unsafe/UnsafeBufferOperations.kt @@ -0,0 +1,157 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.* + +@UnsafeIoApi +public object UnsafeBufferOperations { + /** + * Maximum value that is safe to pass to [writeToTail]. + */ + public val maxSafeWriteCapacity: Int = Segment.SIZE + + /** + * Moves [bytes] to the end of the [buffer]. + * + * Only the region of the [bytes] array spanning from [startIndex] until [endIndex] is considered readable. + * + * The array is wrapped into the buffer without any copies, if possible. + * + * Attempts to write data into [bytes] array once it was moved may lead to data corruption + * and should be considered as an error. + * + * @param buffer a buffer to which data will be added + * @param bytes an array that needs to be added to the buffer + * @param startIndex an index of the first byte readable from the array, `0` by default. + * @param endIndex an index of the byte past the last readable array byte, `bytes.size` byte default. + * + * @throws IndexOutOfBoundsException when [startIndex] or [endIndex] are not within [bytes] bounds + * @throws IllegalArgumentException when `startIndex > endIndex` + * + * @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.moveToTail + */ + public fun moveToTail(buffer: Buffer, bytes: ByteArray, startIndex: Int = 0, endIndex: Int = bytes.size) { + checkBounds(bytes.size, startIndex, endIndex) + val segment = Segment.new( + bytes, startIndex, endIndex, shared = true /* to prevent recycling */, + owner = false /* can't append to it */ + ) + val tail = buffer.tail + if (tail == null) { + buffer.head = segment + buffer.tail = segment + } else { + buffer.tail = tail.push(segment) + } + buffer.sizeMut += endIndex - startIndex + } + + /** + * Provides read-only access to the data from the head of a [buffer] by calling the [readAction] on head's data and + * optionally consumes the data at the end of the action. + * + * The [readAction] receives the byte array with buffer head's data and a pair of indices, startIndex and endIndex, + * denoting the subarray containing meaningful data. + * It's considered an error to read data outside of that range. + * The data array is provided for read-only purposes, updating it may affect buffer's data + * and may lead to undefined behavior when performed outside the provided range. + * + * The [readAction] should return the number of bytes consumed, the buffer's size will be decreased by that value, + * and data from the consumed prefix will be no longer available for read. + * If the operation does not consume anything, the action should return `0`. + * It's considered an error to return a negative value or a value exceeding the size of a readable range. + * + * If [readAction] ends execution by throwing an exception, no data will be consumed from the buffer. + * + * If the buffer is empty, [IllegalArgumentException] will be thrown. + * + * The data is passed to the [readAction] directly from the buffer's internal storage without copying on + * the best effort basis, meaning that there are no strong zero-copy guarantees + * and the copy will be created if it could not be omitted. + * + * @throws IllegalStateException when [readAction] returns negative value or a values exceeding + * the `endIndexExclusive - startIndexInclusive` value. + * @throws IllegalArgumentException when the [buffer] is empty. + * + * @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.readByteArrayFromHead + */ + public inline fun readFromHead( + buffer: Buffer, + readAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int + ) { + require(!buffer.exhausted()) { "Buffer is empty" } + val head = buffer.head!! + val bytesRead = readAction(head.dataAsByteArray(true), head.pos, head.limit) + if (bytesRead < 0) throw IllegalStateException("Returned negative read bytes count") + if (bytesRead == 0) return + if (bytesRead > head.size) throw IllegalStateException("Returned too many bytes") + buffer.skip(bytesRead.toLong()) + } + + /** + * Provides write access to the buffer, allowing to write data + * into a not yet committed portion of the buffer's tail using a [writeAction]. + * + * The [writeAction] receives the byte array and the pair of indices, startIndex and endIndex, + * denoting the range of indices available for writing. It's considered an error to write data outside that range. + * Writing outside the range may corrupt buffer's data. + * + * It's guaranteed that the size of the range is at least [minimumCapacity], + * but if the [minimumCapacity] bytes could not be provided for writing, + * the method will throw [IllegalStateException]. + * It is safe to use any [minimumCapacity] value below [maxSafeWriteCapacity], but unless exact minimum number of + * available bytes is required, it's recommended to use `1` as [minimumCapacity] value. + * + * The value returned by the [writeAction] denotes the number of bytes successfully written to the buffer. + * If no data was written, `0` should be returned. + * It's an error to return a negative value or a value exceeding the size of the provided writeable range. + * + * If [writeAction] ends execution by throwing an exception, no data will be written to the buffer. + * + * The data array is passed to the [writeAction] directly from the buffer's internal storage without copying + * on the best-effort basis, meaning that there are no strong zero-copy guarantees + * and the copy will be created if it could not be omitted. + * + * @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled. + * @throws IllegalStateException when [writeAction] returns a negative value or a value exceeding + * the `endIndexExclusive - startIndexInclusive` value. + * + * @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.writeByteArrayToTail + */ + public inline fun writeToTail( + buffer: Buffer, minimumCapacity: Int, + writeAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Int + ) { + val tail = buffer.writableSegment(minimumCapacity) + + val data = tail.dataAsByteArray(false) + // If writeAction throws an exception, we may end up with an empty segment in tail. + // That's fine as long as we don't treat the presence of a segment as a sing of a buffer being non-empty. + val bytesWritten = writeAction(data, tail.limit, data.size) + + // fast path + if (bytesWritten == minimumCapacity) { + tail.writeBackData(data, bytesWritten) + tail.limit += bytesWritten + buffer.sizeMut += bytesWritten + return + } + + check(bytesWritten in 0..tail.remainingCapacity) { + "Invalid number of bytes written: $bytesWritten. Should be in 0..${tail.remainingCapacity}" + } + if (bytesWritten != 0) { + tail.writeBackData(data, bytesWritten) + tail.limit += bytesWritten + buffer.sizeMut += bytesWritten + return + } + if (tail.isEmpty()) { + buffer.recycleTail() + } + } +} diff --git a/core/common/test/samples/unsafe/unsafeSamples.kt b/core/common/test/samples/unsafe/unsafeSamples.kt new file mode 100644 index 000000000..72b6e4968 --- /dev/null +++ b/core/common/test/samples/unsafe/unsafeSamples.kt @@ -0,0 +1,101 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.samples.unsafe + +import kotlinx.io.Buffer +import kotlinx.io.UnsafeIoApi +import kotlinx.io.readString +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlin.io.encoding.Base64 +import kotlin.io.encoding.ExperimentalEncodingApi +import kotlin.math.min +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals + +class UnsafeBufferOperationsSamples { + @OptIn(UnsafeIoApi::class) + @Test + fun writeByteArrayToTail() { + fun Buffer.writeRandomBytes(byteCount: Int) { + require(byteCount > 0) { "byteCount should be positive. Was: $byteCount" } + var remaining = byteCount + while (remaining > 0) { + UnsafeBufferOperations.writeToTail(this, 1) { data, startIndex, endIndex -> + // data's slice from startIndex to endIndex is available for writing, + // but that slice could be much larger than what remained to write. + val correctedEndIndex = min(endIndex, startIndex + remaining) + // write random bytes + Random.Default.nextBytes(data, startIndex, correctedEndIndex) + // number of bytes written + val written = correctedEndIndex - startIndex + remaining -= written + // that many bytes will be committed to the buffer + written + } + } + } + + val buffer = Buffer() + + buffer.writeRandomBytes(42) + assertEquals(42L, buffer.size) + + buffer.writeRandomBytes(10000) + assertEquals(10042L, buffer.size) + } + + @OptIn(ExperimentalEncodingApi::class, UnsafeIoApi::class) + @Test + fun moveToTail() { + fun Buffer.writeBase64(data: ByteArray, encoder: Base64 = Base64.Default) { + UnsafeBufferOperations.moveToTail(this, encoder.encodeToByteArray(data)) + } + + val buffer = Buffer() + buffer.writeBase64(byteArrayOf(-1, 0, -2, 0)) + + assertEquals("/wD+AA==", buffer.readString()) + } + + @OptIn(UnsafeIoApi::class) + @Test + fun readByteArrayFromHead() { + // see https://en.wikipedia.org/wiki/LEB128#Unsigned_LEB128 + fun Buffer.readULeb128(): ULong { + var shift = 0 + var result = 0L + var complete = false + + while (!complete) { + require(1) // check if we still have something to read + + UnsafeBufferOperations.readFromHead(this) { data, startOffset, endOffset -> + var offset = startOffset + do { + val b = data[offset++] + result = result.or(0x7fL.and(b.toLong()).shl(shift)) + shift += 7 + complete = b >= 0 // we're done if the most significant bit was not set + } while (!complete && offset < endOffset) + // return the number of consumed bytes + offset - startOffset + } + } + + return result.toULong() + } + + val buffer = Buffer().apply { + write(byteArrayOf(0)) // 0 + write(byteArrayOf(0xed.toByte(), 0x9b.toByte(), 0xb0.toByte(), 0x6f)) // dec0ded + write(byteArrayOf(-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1)) // ffffffffffffffff + } + assertEquals(0U, buffer.readULeb128()) + assertEquals(0xDEC0DEDu, buffer.readULeb128()) + assertEquals((-1).toULong(), buffer.readULeb128()) + } +} diff --git a/core/common/test/unsafe/UnsafeBufferOperationsMoveTest.kt b/core/common/test/unsafe/UnsafeBufferOperationsMoveTest.kt new file mode 100644 index 000000000..b251c5592 --- /dev/null +++ b/core/common/test/unsafe/UnsafeBufferOperationsMoveTest.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.* +import kotlin.test.* + +@OptIn(UnsafeIoApi::class) +class UnsafeBufferOperationsMoveTest { + @Test + fun moveArrayFully() { + val buffer = Buffer() + val data = "hello unsafe new world!".encodeToByteArray() + + UnsafeBufferOperations.moveToTail(buffer, data) + assertFalse(buffer.exhausted()) + assertEquals(data.size.toLong(), buffer.size) + + assertEquals("hello unsafe new world!", buffer.readString()) + } + + @Test + fun moveArraySlice() { + val buffer = Buffer() + val data = "hello unsafe new world!".encodeToByteArray() + + UnsafeBufferOperations.moveToTail(buffer, data, 6, 12) + assertEquals(6L, buffer.size) + assertEquals("unsafe", buffer.readString()) + } + + @Test + fun movedArrayIsReadOnly() { + val firstBuffer = Buffer().also { it.writeString("this is ") } + val secondBuffer = Buffer().also { it.writeString("this is ") } + + val data = "first second third".encodeToByteArray() + + UnsafeBufferOperations.moveToTail(firstBuffer, data, 0, 5) + firstBuffer.writeString(" buffer") + + UnsafeBufferOperations.moveToTail(secondBuffer, data, 6, 12) + secondBuffer.writeString(" buffer") + + assertArrayEquals("first second third".encodeToByteArray(), data) + + assertEquals("this is first buffer", firstBuffer.readString()) + assertEquals("this is second buffer", secondBuffer.readString()) + } + + @Test + fun moveEmptySlice() { + val buffer = Buffer() + + UnsafeBufferOperations.moveToTail(buffer, ByteArray(0)) + assertTrue(buffer.exhausted()) + + UnsafeBufferOperations.moveToTail(buffer, ByteArray(10), 5, 5) + assertTrue(buffer.exhausted()) + } + + @Test + fun illegalArgumentsHandling() { + assertFailsWith { + UnsafeBufferOperations.moveToTail(Buffer(), ByteArray(1), -1) + } + + assertFailsWith { + UnsafeBufferOperations.moveToTail(Buffer(), ByteArray(10), 2, 0) + } + + assertFailsWith { + UnsafeBufferOperations.moveToTail(Buffer(), ByteArray(10), 11, 12) + } + } + + @Test + fun moveMultipleSegments() { + val buffer = Buffer() + val segmentsCount = 10 + for (i in 0 ..< segmentsCount) { + UnsafeBufferOperations.moveToTail(buffer, byteArrayOf(i.toByte())) + } + assertEquals(10, buffer.size) + assertEquals(listOf(1, 1, 1, 1, 1, 1, 1, 1, 1, 1), segmentSizes(buffer)) + assertContentEquals(ByteArray(segmentsCount) { it.toByte() }, buffer.readByteArray()) + } +} diff --git a/core/common/test/unsafe/UnsafeBufferOperationsReadTest.kt b/core/common/test/unsafe/UnsafeBufferOperationsReadTest.kt new file mode 100644 index 000000000..fb3a938e1 --- /dev/null +++ b/core/common/test/unsafe/UnsafeBufferOperationsReadTest.kt @@ -0,0 +1,122 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.Buffer +import kotlinx.io.UnsafeIoApi +import kotlinx.io.assertArrayEquals +import kotlinx.io.writeString +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +@OptIn(UnsafeIoApi::class) +class UnsafeBufferOperationsReadTest { + private class TestException : RuntimeException() + + @Test + fun bufferCapacity() { + val buffer = Buffer().apply { writeString("hello world") } + + val head = buffer.head!! + UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, endIndex -> + assertTrue(endIndex <= data.size) + assertEquals(0, startIndex) + assertEquals(head.size, endIndex) + 0 + } + } + + @Test + fun consumeByteByByte() { + val expectedData = "hello world".encodeToByteArray() + val actualData = ByteArray(expectedData.size) + + val buffer = Buffer().apply { write(expectedData) } + for (idx in actualData.indices) { + UnsafeBufferOperations.readFromHead(buffer) { data, startIndex, _ -> + actualData[idx] = data[startIndex] + 1 + } + assertEquals(actualData.size - idx - 1, buffer.size.toInt()) + } + assertTrue(buffer.exhausted()) + assertArrayEquals(expectedData, actualData) + } + + @Test + fun readNothing() { + val buffer = Buffer().apply { writeInt(42) } + UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 } + assertEquals(42, buffer.readInt()) + } + + @Test + fun readEverything() { + val buffer = Buffer().apply { writeString("hello world") } + UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex -> + endIndex - startIndex + } + assertTrue(buffer.exhausted()) + } + + @Test + fun readFromEmptyBuffer() { + val buffer = Buffer() + assertFailsWith { + UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> 0 } + } + } + + @Test + fun readFromTheSegmentEnd() { + val segmentSize = UnsafeBufferOperations.maxSafeWriteCapacity + val extraBytesCount = 128 + val bytesToSkip = segmentSize - 2 + + val buffer = Buffer().apply { write(ByteArray(segmentSize + extraBytesCount) { 0xff.toByte() }) } + buffer.skip(bytesToSkip.toLong()) + val head = buffer.head!! + assertEquals(bytesToSkip, head.pos) + + UnsafeBufferOperations.readFromHead(buffer) { _, startIndex, endIndex -> + assertEquals(2, endIndex - startIndex) + 2 + } + + assertEquals(extraBytesCount, buffer.size.toInt()) + } + + + @Test + fun returnIllegalReadCount() { + val buffer = Buffer().apply { writeInt(0) } + + assertFailsWith { + UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> -1 } + } + assertEquals(4L, buffer.size) + + assertFailsWith { + UnsafeBufferOperations.readFromHead(buffer) { _, f, t -> (t - f + 1) } + } + assertEquals(4L, buffer.size) + } + + @Test + fun resetReadOnException() { + val buffer = Buffer().apply { writeString("hello world") } + + val sizeBeforeRead = buffer.size + assertFailsWith { + UnsafeBufferOperations.readFromHead(buffer) { _, _, _ -> + throw TestException() + } + } + assertEquals(buffer.size, sizeBeforeRead) + } +} diff --git a/core/common/test/unsafe/UnsafeBufferOperationsWriteTest.kt b/core/common/test/unsafe/UnsafeBufferOperationsWriteTest.kt new file mode 100644 index 000000000..a5c1b6a3d --- /dev/null +++ b/core/common/test/unsafe/UnsafeBufferOperationsWriteTest.kt @@ -0,0 +1,143 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.* +import kotlin.test.* + +@OptIn(UnsafeIoApi::class) +class UnsafeBufferOperationsWriteTest { + private class TestException : RuntimeException() + + @Test + fun bufferCapacity() { + val buffer = Buffer() + + UnsafeBufferOperations.writeToTail(buffer, 1) { data, startIndex, endIndex -> + // Unsafe check, head is not committed yet + assertSame(buffer.head!!.data, data) + + assertEquals(0, startIndex) + assertEquals(buffer.head!!.data.size, endIndex) + 0 + } + } + + @Test + fun writeByteByByte() { + val buffer = Buffer() + val data = "hello world".encodeToByteArray() + + for (idx in data.indices) { + UnsafeBufferOperations.writeToTail(buffer, 1) { writeable, pos, _ -> + writeable[pos] = data[idx] + 1 + } + assertEquals(idx + 1, buffer.size.toInt()) + } + assertEquals("hello world", buffer.readString()) + } + + @Test + fun writeNothing() { + val buffer = Buffer() + + UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 } + assertTrue(buffer.exhausted()) + + buffer.writeInt(42) + UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 } + assertEquals(4, buffer.size) + + buffer.write(ByteArray(Segment.SIZE - 4)) + UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> 0 } + assertEquals(Segment.SIZE.toLong(), buffer.size) + } + + @Test + fun writeWholeBuffer() { + val buffer = Buffer() + UnsafeBufferOperations.writeToTail(buffer, 1) { data, from, to -> + for (idx in from ..< to) { + data[idx] = 42 + } + to - from + } + assertEquals(Segment.SIZE, buffer.size.toInt()) + assertArrayEquals(ByteArray(Segment.SIZE) { 42 }, buffer.readByteArray()) + } + + @Test + fun requireToManyBytes() { + val buffer = Buffer() + assertFailsWith { + UnsafeBufferOperations.writeToTail(buffer, 100500) { _, _, _ -> 0 } + } + assertTrue(buffer.exhausted()) + } + + @Test + fun writeToTheEndOfABuffer() { + val buffer = Buffer().apply { write(ByteArray(Segment.SIZE - 1)) } + UnsafeBufferOperations.writeToTail(buffer, 1) { data, pos, limit -> + assertEquals(1, limit - pos) + data[pos] = 42 + 1 + } + assertEquals(Segment.SIZE, buffer.size.toInt()) + UnsafeBufferOperations.writeToTail(buffer, 1) { data, pos, _ -> + data[pos] = 43 + 1 + } + assertEquals(Segment.SIZE + 1, buffer.size.toInt()) + + buffer.skip(Segment.SIZE - 1L) + assertArrayEquals(byteArrayOf(42, 43), buffer.readByteArray()) + } + + @Test + fun returnIllegalWriteCount() { + val buffer = Buffer() + assertFailsWith { + UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> + -1 + } + } + assertTrue(buffer.exhausted()) + + assertFailsWith { + UnsafeBufferOperations.writeToTail(buffer, 1) { _, _, _ -> + 100500 + } + } + assertTrue(buffer.exhausted()) + } + + @Test + fun resetWriteOnException() { + val buffer = Buffer() + + assertFailsWith { + UnsafeBufferOperations.writeToTail(buffer, 2) { _, _, _ -> + throw TestException() + } + } + + assertTrue(buffer.exhausted()) + } + + @Test + fun returnLessBytesThanItWasActuallyWritten() { + val buffer = Buffer() + + UnsafeBufferOperations.writeToTail(buffer, 42) { data, pos, limit -> + data.fill(0xab.toByte(), pos, limit) + 4 + } + assertEquals(4, buffer.size) + assertEquals(0xababababu, buffer.readUInt()) + } +} diff --git a/core/js/src/SegmentPool.kt b/core/js/src/SegmentPool.kt index c77d92ef9..615a14977 100644 --- a/core/js/src/SegmentPool.kt +++ b/core/js/src/SegmentPool.kt @@ -10,7 +10,7 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment() + actual fun take(): Segment = Segment.new() actual fun recycle(segment: Segment) { } diff --git a/core/jvm/src/BuffersJvm.kt b/core/jvm/src/BuffersJvm.kt index 1754e865b..cb5625f31 100644 --- a/core/jvm/src/BuffersJvm.kt +++ b/core/jvm/src/BuffersJvm.kt @@ -72,7 +72,7 @@ private fun Buffer.write(input: InputStream, byteCount: Long, forever: Boolean) throw EOFException("Stream exhausted before $byteCount bytes were read.") } tail.limit += bytesRead - size += bytesRead.toLong() + sizeMut += bytesRead.toLong() remainingByteCount -= bytesRead.toLong() } } @@ -97,7 +97,7 @@ public fun Buffer.readTo(out: OutputStream, byteCount: Long = size) { out.write(s.data, s.pos, toCopy) s.pos += toCopy - size -= toCopy.toLong() + sizeMut -= toCopy.toLong() remainingByteCount -= toCopy.toLong() if (s.pos == s.limit) { @@ -164,7 +164,7 @@ public fun Buffer.readAtMostTo(sink: ByteBuffer): Int { sink.put(s.data, s.pos, toCopy) s.pos += toCopy - size -= toCopy.toLong() + sizeMut -= toCopy.toLong() if (s.pos == s.limit) { recycleHead() @@ -191,7 +191,7 @@ public fun Buffer.transferFrom(source: ByteBuffer): Buffer { tail.limit += toCopy } - size += byteCount.toLong() + sizeMut += byteCount.toLong() return this } diff --git a/core/jvm/src/JvmCore.kt b/core/jvm/src/JvmCore.kt index 036138660..67acedf73 100644 --- a/core/jvm/src/JvmCore.kt +++ b/core/jvm/src/JvmCore.kt @@ -49,7 +49,7 @@ private open class OutputStreamSink( head.pos += toCopy remaining -= toCopy - source.size -= toCopy + source.sizeMut -= toCopy if (head.pos == head.limit) { source.recycleHead() @@ -92,7 +92,7 @@ private open class InputStreamSource( return -1 } tail.limit += bytesRead - sink.size += bytesRead + sink.sizeMut += bytesRead return bytesRead.toLong() } catch (e: AssertionError) { if (e.isAndroidGetsocknameError) throw IOException(e) diff --git a/core/jvm/src/SegmentPool.kt b/core/jvm/src/SegmentPool.kt index a2846a742..f70d22dae 100644 --- a/core/jvm/src/SegmentPool.kt +++ b/core/jvm/src/SegmentPool.kt @@ -20,7 +20,9 @@ */ package kotlinx.io +import kotlinx.io.SegmentPool.HASH_BUCKET_COUNT import kotlinx.io.SegmentPool.LOCK +import kotlinx.io.SegmentPool.MAX_SIZE import kotlinx.io.SegmentPool.recycle import kotlinx.io.SegmentPool.take import java.util.concurrent.atomic.AtomicReference @@ -50,7 +52,7 @@ internal actual object SegmentPool { actual val MAX_SIZE = 64 * 1024 // 64 KiB. /** A sentinel segment to indicate that the linked list is currently being modified. */ - private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false) + private val LOCK = Segment.new(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false) /** * The number of hash buckets. This number needs to balance keeping the pool small and contention @@ -85,13 +87,13 @@ internal actual object SegmentPool { when { first === LOCK -> { // We didn't acquire the lock. Don't take a pooled segment. - return Segment() + return Segment.new() } first == null -> { // We acquired the lock but the pool was empty. Unlock and return a new segment. firstRef.set(null) - return Segment() + return Segment.new() } else -> { diff --git a/core/jvm/src/SourcesJvm.kt b/core/jvm/src/SourcesJvm.kt index c4d808a57..e34946058 100644 --- a/core/jvm/src/SourcesJvm.kt +++ b/core/jvm/src/SourcesJvm.kt @@ -43,7 +43,7 @@ private fun Buffer.readStringImpl(byteCount: Long, charset: Charset): String { val result = String(s.data, s.pos, byteCount.toInt(), charset) s.pos += byteCount.toInt() - size -= byteCount + sizeMut -= byteCount if (s.pos == s.limit) { recycleHead() diff --git a/core/jvm/src/unsafe/UnsafeBufferOperationsJvm.kt b/core/jvm/src/unsafe/UnsafeBufferOperationsJvm.kt new file mode 100644 index 000000000..b0fb6f550 --- /dev/null +++ b/core/jvm/src/unsafe/UnsafeBufferOperationsJvm.kt @@ -0,0 +1,158 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.Buffer +import kotlinx.io.Segment +import kotlinx.io.UnsafeIoApi +import kotlinx.io.unsafe.UnsafeBufferOperations.maxSafeWriteCapacity +import java.nio.ByteBuffer + +/** + * Provides read-only access to the data from the head of a [buffer] by calling the [readAction] on head's data and + * optionally consumes the data at the end of the action. + * + * The [readAction] receives a read-only [ByteBuffer] with buffer head's data. + * + * After exiting the [readAction], all data consumed from the [ByteBuffer] will be also consumed from the [buffer]. + * Consumed bytes determined as a difference between [ByteBuffer.capacity] and [ByteBuffer.remaining]. + * + * If [readAction] ends execution by throwing an exception, no data will be consumed from the buffer. + * + * If the [buffer] is empty, [IllegalArgumentException] will be thrown. + * + * The data is passed to the [readAction] directly from the buffer's internal storage without copying on + * the best effort basis, meaning that there are no strong zero-copy guarantees + * and the copy will be created if it could not be omitted. + * + * @param buffer a buffer to read from + * @param readAction an action that will be invoked on a [ByteBuffer] containing data from [buffer]'s head + * + * @throws IllegalArgumentException when the [buffer] is empty. + * + * @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.writeToByteChannel + */ +@UnsafeIoApi +public inline fun UnsafeBufferOperations.readFromHead(buffer: Buffer, readAction: (ByteBuffer) -> Unit) { + readFromHead(buffer) { rawData, pos, limit -> + val bb = ByteBuffer.wrap(rawData, pos, limit - pos).slice().asReadOnlyBuffer() + readAction(bb) + bb.position() + } +} + +/** + * Provides write access to the buffer, allowing to write data + * into a not yet committed portion of the buffer's tail using a [writeAction]. + * + * The [writeAction] receives a [ByteBuffer] representing uncommitted portion of [buffer]'s tail + * + * It's guaranteed that the size of the [ByteBuffer] is at least [minimumCapacity], + * but if the [minimumCapacity] bytes could not be provided for writing, + * the method will throw [IllegalStateException]. + * It is safe to use any [minimumCapacity] value below [maxSafeWriteCapacity], but unless exact minimum number of + * available bytes is required, it's recommended to use `1` as [minimumCapacity] value. + * + * After exiting [writeAction], bytes written to the [ByteBuffer] will be committed to the buffer. + * The number of bytes written is determined as a difference between [ByteBuffer.capacity] and [ByteBuffer.remaining]. + * + * If [writeAction] ends execution by throwing an exception, no data will be written to the buffer. + * + * The data array is passed to the [writeAction] directly from the buffer's internal storage without copying + * on the best-effort basis, meaning that there are no strong zero-copy guarantees + * and the copy will be created if it could not be omitted. + * + * @param buffer a buffer to read from + * @param minimumCapacity the minimum amount of writable space + * @param writeAction an action that will be invoked on a [ByteBuffer] that will be added to a [buffer] by the end of + * the call + * + * @throws IllegalStateException when [minimumCapacity] is too large and could not be fulfilled. + * + * @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.readFromByteChannel + */ +@UnsafeIoApi +public inline fun UnsafeBufferOperations.writeToTail( + buffer: Buffer, + minimumCapacity: Int, + writeAction: (ByteBuffer) -> Unit +) { + writeToTail(buffer, minimumCapacity) { rawData, pos, limit -> + val bb = ByteBuffer.wrap(rawData, pos, limit - pos).slice() + writeAction(bb) + bb.position() + } +} + +/** + * Provides read-only access to [buffer]'s data by filling provided [iovec] array with [ByteBuffer]'s representing + * [buffer]'s data, supplying it to [readAction] and consuming number of bytes returned by the [readAction]. + * + * If there's not enough space in [iovec] to fit all byte buffers, only a prefix of [buffer]'s data will be supplied to + * [readAction]. + * If the number of byte buffers available for read is less than [iovec]'s size, + * only a prefix of [iovec] will be filled. + * + * The second [readAction]'s parameter denotes the number of buffers supplied. + * + * The value returned by the [readAction] is interpreted as the number of consumed bytes. + * The size of the buffer will be reduced by that value, + * and the corresponding number of bytes from buffer's prefix will be no longer available for read. + * If data was not consumed, the [readAction] should return `0`. + * + * If [readAction] ends execution by throwing an exception, no data will be consumed from the buffer. + * + * If the [iovec] contains any references, it will be overridden during the call. + * + * If the [buffer] is empty, [IllegalArgumentException] will be thrown. + * + * The data is passed to the [readAction] directly from the buffer's internal storage without copying on + * the best effort basis, meaning that there are no strong zero-copy guarantees + * and the copy will be created if it could not be omitted. + * + * @param buffer a buffer to read from + * @param iovec a temporary array to store [ByteBuffer]s with data from [buffer]'s prefix + * @param readAction an action that will be invoked on an array filled with [ByteBuffer]s holding data from [buffer]'s + * prefix + * + * @throws IllegalArgumentException when the [buffer] is empty. + * @throws IllegalArgumentException when the [iovec] is empty. + * + * @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.gatheringWrite + * + */ +@UnsafeIoApi +public inline fun UnsafeBufferOperations.readBulk( + buffer: Buffer, + iovec: Array, + readAction: (iovec: Array, iovecSize: Int) -> Long +) { + val head = buffer.head ?: throw IllegalArgumentException("buffer is empty.") + if (iovec.isEmpty()) throw IllegalArgumentException("iovec is empty.") + + var currentSegment: Segment = head + var idx = 0 + var capacity = 0L + do { + val pos = currentSegment.pos + val limit = currentSegment.limit + val len = limit - pos + iovec[idx++] = ByteBuffer.wrap(currentSegment.dataAsByteArray(true), pos, len) + .slice() + .asReadOnlyBuffer() + capacity += len + currentSegment = currentSegment.next ?: break + } while (idx < iovec.size) + + val bytesRead = readAction(iovec, idx) + if (bytesRead == 0L) return + if (bytesRead < 0 || bytesRead > capacity) { + throw IllegalStateException( + "readAction should return a value in range [0, $capacity], but returned: $bytesRead" + ) + } + buffer.skip(bytesRead) +} diff --git a/core/jvm/test/samples/unsafeAccessSamplesJvm.kt b/core/jvm/test/samples/unsafeAccessSamplesJvm.kt new file mode 100644 index 000000000..053801f7e --- /dev/null +++ b/core/jvm/test/samples/unsafeAccessSamplesJvm.kt @@ -0,0 +1,102 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.samples.unsafe + +import kotlinx.io.Buffer +import kotlinx.io.UnsafeIoApi +import kotlinx.io.readString +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlinx.io.unsafe.readBulk +import kotlinx.io.unsafe.readFromHead +import kotlinx.io.unsafe.writeToTail +import kotlinx.io.writeString +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.Files +import java.nio.file.StandardOpenOption +import kotlin.random.Random +import kotlin.test.Test +import kotlin.test.assertEquals + +@OptIn(UnsafeIoApi::class) +class UnsafeReadWriteSamplesJvm { + + @Test + fun writeToByteChannel() { + val source = Buffer().apply { writeString("hello world") } + // Open a file channel to write into. + FileChannel.open( + Files.createTempFile(null, null), + StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE + ).use { channel -> + // Write data into the channel until source buffer exhausted + while (!source.exhausted()) { + // Take a byte buffer holding source's data prefix and send it to the channel + UnsafeBufferOperations.readFromHead(source) { headByteBuffer: ByteBuffer -> + channel.write(headByteBuffer) + } + } + assertEquals(11, channel.size()) + } + } + + @Test + fun readFromByteChannel() { + val destination = Buffer() + + // Open a file channel + FileChannel.open( + Files.createTempFile(null, null), + StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE + ).use { channel -> + // Write some data into it + channel.write(ByteBuffer.wrap("hello world".encodeToByteArray())) + // And reset a read position to the beginning of a file + channel.position(0) + + // Read data until a channel exhausted + var finished = false + do { + // Require a byte buffer to read data into. + // By the end of the call, + // all data written into that byte buffer will be appended to the destination buffer. + UnsafeBufferOperations.writeToTail(destination, 1) { tailByteBuffer: ByteBuffer -> + val bytesRead = channel.read(tailByteBuffer) + // If we read nothing, it's time to wrap up. + finished = bytesRead <= 0 + } + } while (!finished) + } + assertEquals("hello world", destination.readString()) + } + + @Test + fun gatheringWrite() { + // Pre allocate an array to hold byte buffers during readBulk call. + // Such an array should be reused across multiple readBulk calls to reduce the number of allocations. + val buffers = Array(16) { null } + + // A buffer to read from + val source = Buffer().apply { write(Random.nextBytes(64 * 1024)) } + // Write the source buffer's content into a file using file channel's gathering write + FileChannel.open( + Files.createTempFile(null, null), + StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE + ).use { channel -> + // Continue writing until the source is exhausted + while (!source.exhausted()) { + // Take as many byte buffers as possible (it depends on the source's size and the length + // buffers array) and send it all to the channel. + UnsafeBufferOperations.readBulk(source, buffers) { bbs: Array, byteBuffersCount: Int -> + val bytesWritten = channel.write(bbs, 0, byteBuffersCount) + // Corresponding number of bytes will be consumed from the buffer by the end of readBulk call + bytesWritten + } + } + assertEquals(64 * 1024, channel.size()) + } + } +} diff --git a/core/jvm/test/unsafe/UnsafeBufferOperationsJvmReadBulkTest.kt b/core/jvm/test/unsafe/UnsafeBufferOperationsJvmReadBulkTest.kt new file mode 100644 index 000000000..22084f36c --- /dev/null +++ b/core/jvm/test/unsafe/UnsafeBufferOperationsJvmReadBulkTest.kt @@ -0,0 +1,207 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.* +import java.nio.ByteBuffer +import kotlin.test.* + +@OptIn(UnsafeIoApi::class) +class UnsafeBufferOperationsJvmReadBulkTest { + private class TestException : RuntimeException() + + @Test + fun readAllFromEmptyBuffer() { + assertFailsWith { + UnsafeBufferOperations.readBulk(Buffer(), Array(1) { null }) { _, _ -> 0L } + } + } + + @Test + fun readUsingEmptyArray() { + assertFailsWith { + UnsafeBufferOperations.readBulk( + Buffer().apply { writeByte(0) }, + Array(0) { null }) { _, _ -> 0L } + } + } + + @Test + fun readSingleSegment() { + val buffer = Buffer().apply { writeString("hello world") } + val array = Array(16) { null } + + UnsafeBufferOperations.readBulk(buffer, array) { arrayArg, iovecLen -> + assertSame(array, arrayArg) + assertEquals(1, iovecLen) + + val buf = arrayArg[0] + assertNotNull(buf) + assertEquals(11, buf.capacity()) + + val str = ByteArray(11).let { + buf.get(it) + it.decodeToString() + } + assertEquals("hello world", str) + + 11 + } + assertTrue(buffer.exhausted()) + } + + @Test + fun readSingleSegmentWithoutConsumingIt() { + val buffer = Buffer().apply { writeString("hello world") } + val array = Array(16) { null } + + UnsafeBufferOperations.readBulk(buffer, array) { arrayArg, iovecLen -> + assertSame(array, arrayArg) + assertEquals(1, iovecLen) + + val buf = arrayArg[0] + assertNotNull(buf) + assertEquals(11, buf.capacity()) + + val str = ByteArray(11).let { + buf.get(it) + it.decodeToString() + } + assertEquals("hello world", str) + + 0 + } + assertEquals("hello world", buffer.readString()) + } + + @Test + fun readMultipleSegments() { + val buffer = Buffer().apply { + write(ByteArray(Segment.SIZE) { 1 }) + write(ByteArray(Segment.SIZE) { 2 }) + write(ByteArray(Segment.SIZE + 1) { 3 }) + } + val buffers = Array(16) { null } + UnsafeBufferOperations.readBulk(buffer, buffers) { array, iovecLen -> + assertSame(buffers, array) + assertEquals(4, iovecLen) + + assertEquals(Segment.SIZE, array[0]!!.remaining()) + val tmpBuffer = ByteArray(Segment.SIZE) + array[0]!!.get(tmpBuffer) + assertContentEquals(ByteArray(Segment.SIZE) { 1 }, tmpBuffer) + + assertEquals(Segment.SIZE, array[1]!!.remaining()) + array[1]!!.get(tmpBuffer) + assertContentEquals(ByteArray(Segment.SIZE) { 2 }, tmpBuffer) + + assertEquals(Segment.SIZE, array[2]!!.remaining()) + array[2]!!.get(tmpBuffer) + assertContentEquals(ByteArray(Segment.SIZE) { 3 }, tmpBuffer) + + assertEquals(1, array[3]!!.remaining()) + assertEquals(3, array[3]!!.get()) + + buffer.size + } + assertTrue(buffer.exhausted()) + } + + @Test + fun readMultipleSegmentsWithoutConsumingIt() { + val buffer = Buffer().apply { + write(ByteArray(Segment.SIZE) { 1 }) + write(ByteArray(Segment.SIZE) { 2 }) + write(ByteArray(Segment.SIZE + 1) { 3 }) + } + val buffers = Array(16) { null } + UnsafeBufferOperations.readBulk(buffer, buffers) { array, iovecLen -> + assertSame(buffers, array) + assertEquals(4, iovecLen) + + assertEquals(Segment.SIZE, array[0]!!.remaining()) + val tmpBuffer = ByteArray(Segment.SIZE) + array[0]!!.get(tmpBuffer) + assertContentEquals(ByteArray(Segment.SIZE) { 1 }, tmpBuffer) + + assertEquals(Segment.SIZE, array[1]!!.remaining()) + array[1]!!.get(tmpBuffer) + assertContentEquals(ByteArray(Segment.SIZE) { 2 }, tmpBuffer) + + assertEquals(Segment.SIZE, array[2]!!.remaining()) + array[2]!!.get(tmpBuffer) + assertContentEquals(ByteArray(Segment.SIZE) { 3 }, tmpBuffer) + + assertEquals(1, array[3]!!.remaining()) + assertEquals(3, array[3]!!.get()) + + 0 + } + assertEquals(Segment.SIZE * 3 + 1L, buffer.size) + } + + @Test + fun consumeBufferPartially() { + val buffer = Buffer().apply { + writeString("hello world") + } + UnsafeBufferOperations.readBulk(buffer, Array(1) { null }) { _, _ -> + 6 + } + assertEquals("world", buffer.readString()) + } + + @Test + fun consumeMultiSegmentBufferPartially() { + val buffer = Buffer().apply { + write(ByteArray(Segment.SIZE * 3)) + } + UnsafeBufferOperations.readBulk(buffer, Array(3) { null }) { _, _ -> + Segment.SIZE * 3 - 1111L + } + assertEquals(1111, buffer.size) + } + + @Test + fun passShortArray() { + val buffer = Buffer().apply { + write(ByteArray(Segment.SIZE * 2)) + } + UnsafeBufferOperations.readBulk(buffer, Array(1) { null }) { array, _ -> + array[0]!!.remaining().toLong() + } + assertEquals(Segment.SIZE.toLong(), buffer.size) + } + + @Test + fun returnIncorrectReadValue() { + val buffer = Buffer().apply { write(ByteArray(Segment.SIZE + 1)) } + val size = buffer.size + + assertFailsWith { + UnsafeBufferOperations.readBulk(buffer, Array(2) { null }) { _, _ -> -1L} + } + assertFailsWith { + UnsafeBufferOperations.readBulk(buffer, Array(2) { null }) { _, _ -> size + 1L } + } + assertFailsWith { + UnsafeBufferOperations.readBulk(buffer, Array(1) { null }) { _, _ -> size } + } + } + + @Test + fun resetReadOnException() { + val buffer = Buffer().apply { writeString("hello world") } + + val sizeBeforeRead = buffer.size + assertFailsWith { + UnsafeBufferOperations.readBulk(buffer, Array(1) { null }) { _, _ -> + throw TestException() + } + } + assertEquals(buffer.size, sizeBeforeRead) + } +} diff --git a/core/jvm/test/unsafe/UnsafeBufferOperationsJvmReadFromHeadTest.kt b/core/jvm/test/unsafe/UnsafeBufferOperationsJvmReadFromHeadTest.kt new file mode 100644 index 000000000..a232a0378 --- /dev/null +++ b/core/jvm/test/unsafe/UnsafeBufferOperationsJvmReadFromHeadTest.kt @@ -0,0 +1,128 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.Buffer +import kotlinx.io.UnsafeIoApi +import kotlinx.io.assertArrayEquals +import kotlinx.io.writeString +import java.nio.ByteBuffer +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +@OptIn(UnsafeIoApi::class) +class UnsafeBufferOperationsJvmReadFromHeadTest { + private class TestException : RuntimeException() + + @Test + fun bufferCapacity() { + val buffer = Buffer().apply { writeString("hello world") } + + val head = buffer.head!! + UnsafeBufferOperations.readFromHead(buffer) { bb: ByteBuffer -> + assertEquals(head.size, bb.remaining()) + assertEquals(0, bb.position()) + assertEquals(head.size, bb.limit()) + } + } + + @Test + fun consumeByteByByte() { + val expectedData = "hello world".encodeToByteArray() + val actualData = ByteArray(expectedData.size) + + val buffer = Buffer().apply { write(expectedData) } + for (idx in actualData.indices) { + UnsafeBufferOperations.readFromHead(buffer) { bb -> + actualData[idx] = bb.get() + } + assertEquals(actualData.size - idx - 1, buffer.size.toInt()) + } + assertTrue(buffer.exhausted()) + assertArrayEquals(expectedData, actualData) + } + + @Test + fun readNothing() { + val buffer = Buffer().apply { writeInt(42) } + UnsafeBufferOperations.readFromHead(buffer) { _ -> /* do nothing */ } + assertEquals(42, buffer.readInt()) + } + + @Test + fun readEverything() { + val buffer = Buffer().apply { writeString("hello world") } + UnsafeBufferOperations.readFromHead(buffer) { bb -> + bb.position(bb.limit()) + } + assertTrue(buffer.exhausted()) + } + + @Test + fun writeIntoReadOnlyBuffer() { + val buffer = Buffer().apply { writeInt(42) } + UnsafeBufferOperations.readFromHead(buffer) { bb -> + assertFailsWith { + bb.put(42) + } + } + assertEquals(42, buffer.readInt()) + } + + @Test + fun readFromEmptyBuffer() { + val buffer = Buffer() + assertFailsWith { + UnsafeBufferOperations.readFromHead(buffer) { _ -> } + } + } + + @Test + fun readFromTheSegmentEnd() { + val segmentSize = UnsafeBufferOperations.maxSafeWriteCapacity + val extraBytesCount = 128 + val bytesToSkip = segmentSize - 2 + + val buffer = Buffer().apply { write(ByteArray(segmentSize + extraBytesCount) { 0xff.toByte() }) } + buffer.skip(bytesToSkip.toLong()) + val head = buffer.head!! + assertEquals(bytesToSkip, head.pos) + + UnsafeBufferOperations.readFromHead(buffer) { bb -> + assertEquals(segmentSize - bytesToSkip, bb.remaining()) + bb.getShort() + } + + assertEquals(extraBytesCount, buffer.size.toInt()) + } + + @Test + fun changeLimit() { + val buffer = Buffer().apply { writeString("hello world") } + UnsafeBufferOperations.readFromHead(buffer) { bb -> + // read a single byte only + bb.position(1) + bb.limit(2) + } + assertEquals(10, buffer.size) + } + + @Test + fun resetReadOnException() { + val buffer = Buffer().apply { writeString("hello world") } + + val sizeBeforeRead = buffer.size + assertFailsWith { + UnsafeBufferOperations.readFromHead(buffer) { bb -> + bb.get() + throw TestException() + } + } + assertEquals(buffer.size, sizeBeforeRead) + } +} diff --git a/core/jvm/test/unsafe/UnsafeBufferOperationsJvmWriteToTailTest.kt b/core/jvm/test/unsafe/UnsafeBufferOperationsJvmWriteToTailTest.kt new file mode 100644 index 000000000..4b881b9a4 --- /dev/null +++ b/core/jvm/test/unsafe/UnsafeBufferOperationsJvmWriteToTailTest.kt @@ -0,0 +1,112 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and Kotlin Programming Language contributors. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENSE.txt file. + */ + +package kotlinx.io.unsafe + +import kotlinx.io.* +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +@OptIn(UnsafeIoApi::class) +class UnsafeBufferOperationsJvmWriteToTailTest { + private class TestException : RuntimeException() + + @Test + fun bufferCapacity() { + val buffer = Buffer() + + UnsafeBufferOperations.writeToTail(buffer, 1) { bb -> + // Unsafe check, head is not committed yet + assertEquals(buffer.head!!.data.size, bb.remaining()) + assertEquals(0, bb.position()) + assertEquals(buffer.head!!.data.size, bb.limit()) + } + } + + @Test + fun writeByteByByte() { + val buffer = Buffer() + val data = "hello world".encodeToByteArray() + + for (idx in data.indices) { + UnsafeBufferOperations.writeToTail(buffer, 1) { bb -> + bb.put(data[idx]) + } + assertEquals(idx + 1, buffer.size.toInt()) + } + assertEquals("hello world", buffer.readString()) + } + + @Test + fun writeNothing() { + val buffer = Buffer() + UnsafeBufferOperations.writeToTail(buffer, 1) { _ -> } + assertTrue(buffer.exhausted()) + } + + @Test + fun writeWholeBuffer() { + val buffer = Buffer() + UnsafeBufferOperations.writeToTail(buffer, 1) { bb -> + bb.position(bb.limit()) + } + assertEquals(Segment.SIZE, buffer.size.toInt()) + } + + @Test + fun requireToManyBytes() { + val buffer = Buffer() + assertFailsWith { + UnsafeBufferOperations.writeToTail(buffer, 100500) { _ -> } + } + assertTrue(buffer.exhausted()) + } + + @Test + fun writeToTheEndOfABuffer() { + val buffer = Buffer().apply { write(ByteArray(Segment.SIZE - 1)) } + UnsafeBufferOperations.writeToTail(buffer, 1) { bb -> + assertEquals(1, bb.remaining()) + bb.put(42) + } + assertEquals(Segment.SIZE, buffer.size.toInt()) + UnsafeBufferOperations.writeToTail(buffer, 1) { bb -> + bb.put(43) + } + assertEquals(Segment.SIZE + 1, buffer.size.toInt()) + + buffer.skip(Segment.SIZE - 1L) + assertArrayEquals(byteArrayOf(42, 43), buffer.readByteArray()) + } + + @Test + fun changeLimit() { + val buffer = Buffer() + + UnsafeBufferOperations.writeToTail(buffer, 8) { bb -> + // only two bytes written + bb.position(2) + bb.limit(4) + } + + assertEquals(2, buffer.size) + } + + @Test + fun resetWriteOnException() { + val buffer = Buffer() + + assertFailsWith { + UnsafeBufferOperations.writeToTail(buffer, 2) { bb -> + bb.put(42) + throw TestException() + } + } + + assertTrue(buffer.exhausted()) + } +} diff --git a/core/native/src/SegmentPool.kt b/core/native/src/SegmentPool.kt index feb6e4acd..709e6f894 100644 --- a/core/native/src/SegmentPool.kt +++ b/core/native/src/SegmentPool.kt @@ -25,7 +25,7 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment() + actual fun take(): Segment = Segment.new() actual fun recycle(segment: Segment) { } diff --git a/core/wasm/src/SegmentPool.kt b/core/wasm/src/SegmentPool.kt index c77d92ef9..615a14977 100644 --- a/core/wasm/src/SegmentPool.kt +++ b/core/wasm/src/SegmentPool.kt @@ -10,7 +10,7 @@ internal actual object SegmentPool { actual val byteCount: Int = 0 - actual fun take(): Segment = Segment() + actual fun take(): Segment = Segment.new() actual fun recycle(segment: Segment) { } diff --git a/core/wasmWasi/src/-WasmUtils.kt b/core/wasmWasi/src/-WasmUtils.kt index 6ed622d09..3aa455151 100644 --- a/core/wasmWasi/src/-WasmUtils.kt +++ b/core/wasmWasi/src/-WasmUtils.kt @@ -74,7 +74,7 @@ internal fun Buffer.writeFromLinearMemory(pointer: Pointer, bytes: Int) { currentPtr += toWrite remaining -= toWrite segment.limit += toWrite - size += toWrite + sizeMut += toWrite } }