Skip to content

Commit

Permalink
[Unsafe API 4/5] Rewrite existing extensions using unsafe API (#337)
Browse files Browse the repository at this point in the history
* Reimplement some methods to use Unsafe API instead of internal one
* Reimplement UTF-8-related functions using Unsafe API
* Remove last direct Segment.data use
* Reimplement Buffer.snapshot to use bulk append
  • Loading branch information
fzhinkin authored Aug 23, 2024
1 parent 046523f commit 821b1bc
Show file tree
Hide file tree
Showing 14 changed files with 366 additions and 354 deletions.
47 changes: 22 additions & 25 deletions core/apple/src/AppleCore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package kotlinx.io

import kotlinx.cinterop.*
import kotlinx.io.unsafe.UnsafeBufferOperations
import platform.Foundation.NSInputStream
import platform.Foundation.NSOutputStream
import platform.Foundation.NSStreamStatusClosed
Expand All @@ -31,29 +32,28 @@ private open class OutputStreamSink(
if (out.streamStatus == NSStreamStatusNotOpen) out.open()
}

@OptIn(UnsafeIoApi::class)
override fun write(source: Buffer, byteCount: Long) {
if (out.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed")

checkOffsetAndCount(source.size, 0, byteCount)
var remaining = byteCount
var bytesWritten = 0L
while (remaining > 0) {
val head = source.head!!
val toCopy = minOf(remaining, head.limit - head.pos).toInt()
val bytesWritten = head.data.usePinned {
val bytes = it.addressOf(head.pos).reinterpret<uint8_tVar>()
out.write(bytes, toCopy.convert()).toLong()
UnsafeBufferOperations.readFromHead(source) { data, pos, limit ->
val toCopy = minOf(remaining, limit - pos).toInt()
bytesWritten = data.usePinned {
val bytes = it.addressOf(pos).reinterpret<uint8_tVar>()
out.write(bytes, toCopy.convert()).toLong()
}
0
}

if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error")
if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity")

head.pos += bytesWritten.toInt()
source.skip(bytesWritten)
remaining -= bytesWritten
source.sizeMut -= bytesWritten

if (head.pos == head.limit) {
source.recycleHead()
}
}
}

Expand Down Expand Up @@ -83,29 +83,26 @@ private open class NSInputStreamSource(
if (input.streamStatus == NSStreamStatusNotOpen) input.open()
}

@OptIn(UnsafeIoApi::class)
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
if (input.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed")

if (byteCount == 0L) return 0L
checkByteCount(byteCount)

val tail = sink.writableSegment(1)
val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit)
val bytesRead = tail.data.usePinned {
val bytes = it.addressOf(tail.limit).reinterpret<uint8_tVar>()
input.read(bytes, maxToCopy.convert()).toLong()
var bytesRead = 0L
UnsafeBufferOperations.writeToTail(sink, 1) { data, pos, limit ->
val maxToCopy = minOf(byteCount, limit - pos)
val read = data.usePinned { ba ->
val bytes = ba.addressOf(pos).reinterpret<uint8_tVar>()
input.read(bytes, maxToCopy.convert()).toLong()
}
bytesRead = read
maxOf(read.toInt(), 0)
}

if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error")
if (bytesRead == 0L) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.recycleTail()
}
return -1
}
tail.limit += bytesRead.toInt()
sink.sizeMut += bytesRead
if (bytesRead == 0L) return -1
return bytesRead
}

Expand Down
67 changes: 34 additions & 33 deletions core/apple/src/BuffersApple.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,69 +8,70 @@
package kotlinx.io

import kotlinx.cinterop.*
import kotlinx.io.unsafe.UnsafeBufferOperations
import kotlinx.io.unsafe.withData
import platform.Foundation.NSData
import platform.Foundation.create
import platform.Foundation.data
import platform.darwin.NSUIntegerMax
import platform.posix.*

@OptIn(ExperimentalForeignApi::class)
@OptIn(ExperimentalForeignApi::class, UnsafeIoApi::class)
internal fun Buffer.write(source: CPointer<uint8_tVar>, maxLength: Int) {
require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" }

var currentOffset = 0
while (currentOffset < maxLength) {
val tail = writableSegment(1)

val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit)
tail.data.usePinned {
memcpy(it.addressOf(tail.limit), source + currentOffset, toCopy.convert())
UnsafeBufferOperations.writeToTail(this, 1) { data, pos, limit ->
val toCopy = minOf(maxLength - currentOffset, limit - pos)
data.usePinned {
memcpy(it.addressOf(pos), source + currentOffset, toCopy.convert())
}
currentOffset += toCopy
toCopy
}

currentOffset += toCopy
tail.limit += toCopy
}
this.sizeMut += maxLength
}

@OptIn(UnsafeIoApi::class)
internal fun Buffer.readAtMostTo(sink: CPointer<uint8_tVar>, maxLength: Int): Int {
require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" }

val s = head ?: return 0
val toCopy = minOf(maxLength, s.limit - s.pos)
s.data.usePinned {
memcpy(sink, it.addressOf(s.pos), toCopy.convert())
}

s.pos += toCopy
this.sizeMut -= toCopy.toLong()

if (s.pos == s.limit) {
recycleHead()
var toCopy = 0
UnsafeBufferOperations.readFromHead(this) { data, pos, limit ->
toCopy = minOf(maxLength, limit - pos)
data.usePinned {
memcpy(sink, it.addressOf(pos), toCopy.convert())
}
toCopy
}

return toCopy
}

@OptIn(BetaInteropApi::class)
@OptIn(BetaInteropApi::class, UnsafeIoApi::class)
internal fun Buffer.snapshotAsNSData(): NSData {
if (size == 0L) return NSData.data()

check(size.toULong() <= NSUIntegerMax) { "Buffer is too long ($size) to be converted into NSData." }

val bytes = malloc(size.convert())?.reinterpret<uint8_tVar>()
?: throw Error("malloc failed: ${strerror(errno)?.toKString()}")
var curr = head
var index = 0
do {
check(curr != null) { "Current segment is null" }
val pos = curr.pos
val length = curr.limit - pos
curr.data.usePinned {
memcpy(bytes + index, it.addressOf(pos), length.convert())

UnsafeBufferOperations.iterate(this) { ctx, head ->
var curr: Segment? = head
var index = 0
while (curr != null) {
val segment: Segment = curr
ctx.withData(segment) { data, pos, limit ->
val length = limit - pos
data.usePinned {
memcpy(bytes + index, it.addressOf(pos), length.convert())
}
index += length
}
curr = ctx.next(segment)
}
curr = curr.next
index += length
} while (curr != null)
}
return NSData.create(bytesNoCopy = bytes, length = size.convert())
}
3 changes: 3 additions & 0 deletions core/common/src/Buffer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ public class Buffer : Source, Sink {
if (position < 0 || position >= size) {
throw IndexOutOfBoundsException("position ($position) is not within the range [0..size($size))")
}
if (position == 0L) {
return head!!.getUnchecked(0)
}
seek(position) { s, offset ->
return s!!.data[(s.pos + position - offset).toInt()]
}
Expand Down
16 changes: 10 additions & 6 deletions core/common/src/Buffers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,28 @@ package kotlinx.io

import kotlinx.io.bytestring.ByteString
import kotlinx.io.bytestring.buildByteString
import kotlinx.io.unsafe.UnsafeBufferOperations
import kotlinx.io.unsafe.withData

/**
* Creates a byte string containing a copy of all the data from this buffer.
*
* This call doesn't consume data from the buffer, but instead copies it.
*/
@OptIn(UnsafeIoApi::class)
public fun Buffer.snapshot(): ByteString {
if (size == 0L) return ByteString()

check(size <= Int.MAX_VALUE) { "Buffer is too long ($size) to be converted into a byte string." }

return buildByteString(size.toInt()) {
var curr = head
do {
check(curr != null) { "Current segment is null" }
append(curr.data, curr.pos, curr.limit)
curr = curr.next
} while (curr != null)
UnsafeBufferOperations.iterate(this@snapshot) { ctx, head ->
var curr = head
while (curr != null) {
ctx.withData(curr, this::append)
curr = ctx.next(curr)
}
}
}
}

Expand Down
29 changes: 13 additions & 16 deletions core/common/src/ByteStrings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.io.bytestring.ByteString
import kotlinx.io.bytestring.isEmpty
import kotlinx.io.bytestring.unsafe.UnsafeByteStringApi
import kotlinx.io.bytestring.unsafe.UnsafeByteStringOperations
import kotlinx.io.unsafe.UnsafeBufferOperations
import kotlin.math.min

/**
Expand All @@ -25,7 +26,7 @@ import kotlin.math.min
*
* @sample kotlinx.io.samples.ByteStringSamples.writeByteString
*/
@OptIn(DelicateIoApi::class)
@OptIn(DelicateIoApi::class, UnsafeByteStringApi::class, UnsafeIoApi::class)
public fun Sink.write(byteString: ByteString, startIndex: Int = 0, endIndex: Int = byteString.size) {
checkBounds(byteString.size, startIndex, endIndex)
if (endIndex == startIndex) {
Expand All @@ -34,21 +35,17 @@ public fun Sink.write(byteString: ByteString, startIndex: Int = 0, endIndex: Int

writeToInternalBuffer { buffer ->
var offset = startIndex
val tail = buffer.head?.prev
if (tail != null) {
val bytesToWrite = min(tail.data.size - tail.limit, endIndex - offset)
byteString.copyInto(tail.data, tail.limit, offset, offset + bytesToWrite)
offset += bytesToWrite
tail.limit += bytesToWrite
buffer.sizeMut += bytesToWrite
}
while (offset < endIndex) {
val bytesToWrite = min(endIndex - offset, Segment.SIZE)
val seg = buffer.writableSegment(bytesToWrite)
byteString.copyInto(seg.data, seg.limit, offset, offset + bytesToWrite)
seg.limit += bytesToWrite
buffer.sizeMut += bytesToWrite
offset += bytesToWrite

UnsafeByteStringOperations.withByteArrayUnsafe(byteString) { data ->
while (offset < endIndex) {
var written = 0
UnsafeBufferOperations.writeToTail(buffer, 1) { segData, pos, limit ->
written = min(endIndex - offset, limit - pos)
data.copyInto(segData, pos, offset, offset + written)
written
}
offset += written
}
}
}
}
Expand Down
44 changes: 19 additions & 25 deletions core/common/src/Sinks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kotlinx.io
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind.EXACTLY_ONCE
import kotlin.contracts.contract
import kotlinx.io.unsafe.UnsafeBufferOperations

private val HEX_DIGIT_BYTES = ByteArray(16) {
((if (it < 10) '0'.code else ('a'.code - 10)) + it).toByte()
Expand Down Expand Up @@ -67,7 +68,7 @@ public fun Sink.writeLongLe(long: Long) {
*
* @sample kotlinx.io.samples.KotlinxIoCoreCommonSamples.writeDecimalLong
*/
@OptIn(DelicateIoApi::class)
@OptIn(DelicateIoApi::class, UnsafeIoApi::class)
public fun Sink.writeDecimalLong(long: Long) {
var v = long
if (v == 0L) {
Expand Down Expand Up @@ -120,20 +121,17 @@ public fun Sink.writeDecimalLong(long: Long) {
}

writeToInternalBuffer { buffer ->
val tail = buffer.writableSegment(width)
val data = tail.data
var pos = tail.limit + width // We write backwards from right to left.
while (v != 0L) {
val digit = (v % 10).toInt()
data[--pos] = HEX_DIGIT_BYTES[digit]
v /= 10
UnsafeBufferOperations.writeToTail(buffer, width) { ctx, segment ->
for (pos in width - 1 downTo if (negative) 1 else 0) {
val digit = (v % 10).toByte()
ctx.setUnchecked(segment, pos, HEX_DIGIT_BYTES[digit.toInt()])
v /= 10
}
if (negative) {
ctx.setUnchecked(segment, 0, '-'.code.toByte())
}
width
}
if (negative) {
data[--pos] = '-'.code.toByte()
}

tail.limit += width
buffer.sizeMut += width.toLong()
}
}

Expand All @@ -149,7 +147,7 @@ public fun Sink.writeDecimalLong(long: Long) {
*
* @sample kotlinx.io.samples.KotlinxIoCoreCommonSamples.writeHexLong
*/
@OptIn(DelicateIoApi::class)
@OptIn(DelicateIoApi::class, UnsafeIoApi::class)
public fun Sink.writeHexadecimalUnsignedLong(long: Long) {
var v = long
if (v == 0L) {
Expand All @@ -161,17 +159,13 @@ public fun Sink.writeHexadecimalUnsignedLong(long: Long) {
val width = hexNumberLength(v)

writeToInternalBuffer { buffer ->
val tail = buffer.writableSegment(width)
val data = tail.data
var pos = tail.limit + width - 1
val start = tail.limit
while (pos >= start) {
data[pos] = HEX_DIGIT_BYTES[(v and 0xF).toInt()]
v = v ushr 4
pos--
UnsafeBufferOperations.writeToTail(buffer, width) { ctx, segment ->
for (pos in width - 1 downTo 0) {
ctx.setUnchecked(segment, pos, HEX_DIGIT_BYTES[v.toInt().and(0xF)])
v = v ushr 4
}
width
}
tail.limit += width
buffer.sizeMut += width.toLong()
}
}

Expand Down
Loading

0 comments on commit 821b1bc

Please sign in to comment.