Skip to content

Commit

Permalink
Added ability to read buffered huge strings in custom KSerializers (#…
Browse files Browse the repository at this point in the history
…2012)

Added stream-friendly version of decodeString for new ChunkedDecoder interface.

Fixes #1987 

Co-authored-by: Leonid Startsev <sandwwraith@users.noreply.github.com>
  • Loading branch information
fred01 and sandwwraith authored Feb 16, 2023
1 parent 623dcad commit 90113a9
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 2 deletions.
4 changes: 4 additions & 0 deletions core/api/kotlinx-serialization-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ public abstract class kotlinx/serialization/encoding/AbstractEncoder : kotlinx/s
public fun shouldEncodeElementDefault (Lkotlinx/serialization/descriptors/SerialDescriptor;I)Z
}

public abstract interface class kotlinx/serialization/encoding/ChunkedDecoder {
public abstract fun decodeStringChunked (Lkotlin/jvm/functions/Function1;)V
}

public abstract interface class kotlinx/serialization/encoding/CompositeDecoder {
public static final field Companion Lkotlinx/serialization/encoding/CompositeDecoder$Companion;
public static final field DECODE_DONE I
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package kotlinx.serialization.encoding

import kotlinx.serialization.ExperimentalSerializationApi

/**
* This interface indicates that decoder supports consuming large strings by chunks via consumeChunk method.
* Currently, only streaming json decoder implements this interface.
* Please note that this interface is only applicable to streaming decoders. That means that it is not possible to use
* some JsonTreeDecoder features like polymorphism with this interface.
*/
@ExperimentalSerializationApi
public interface ChunkedDecoder {
/**
* Method allows decoding a string value by fixed-size chunks.
* Usable for handling very large strings that may not fit in memory.
* Chunk size is guaranteed to not exceed 16384 chars (but it may be smaller than that).
* Feeds string chunks to the provided consumer.
*
* @param consumeChunk - lambda function to handle string chunks
*
* Example usage:
* ```
* @Serializable(with = LargeStringSerializer::class)
* data class LargeStringData(val largeString: String)
*
* @Serializable
* data class ClassWithLargeStringDataField(val largeStringField: LargeStringData)
*
* object LargeStringSerializer : KSerializer<LargeStringData> {
* override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LargeStringContent", PrimitiveKind.STRING)
*
* override fun deserialize(decoder: Decoder): LargeStringData {
* require(decoder is ChunkedDecoder) { "Only chunked decoder supported" }
*
* val tmpFile = createTempFile()
* val writer = FileWriter(tmpFile.toFile()).use {
* decoder.decodeStringChunked { chunk ->
* writer.append(chunk)
* }
* }
* return LargeStringData("file://${tmpFile.absolutePathString()}")
* }
* }
* ```
*
* In this sample, we need to be able to handle a huge string coming from json. Instead of storing it in memory,
* we offload it into a file and return the file name instead
*/
@ExperimentalSerializationApi
public fun decodeStringChunked(consumeChunk: (chunk: String) -> Unit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package kotlinx.serialization.json

import kotlinx.serialization.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.descriptors.*
import kotlinx.serialization.encoding.*
import kotlinx.serialization.test.assertFailsWithMessage
import kotlin.test.*


@Serializable(with = LargeStringSerializer::class)
data class LargeStringData(val largeString: String)

@Serializable
data class ClassWithLargeStringDataField(val largeStringField: LargeStringData)


object LargeStringSerializer : KSerializer<LargeStringData> {
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LargeStringContent", PrimitiveKind.STRING)

override fun deserialize(decoder: Decoder): LargeStringData {
require(decoder is ChunkedDecoder) { "Only chunked decoder supported" }

val outStringBuilder = StringBuilder()

decoder.decodeStringChunked { chunk ->
outStringBuilder.append(chunk)
}
return LargeStringData(outStringBuilder.toString())
}

override fun serialize(encoder: Encoder, value: LargeStringData) {
encoder.encodeString(value.largeString)
}
}

open class JsonChunkedStringDecoderTest : JsonTestBase() {

@Test
fun decodePlainLenientString() {
val longString = "abcd".repeat(8192) // Make string more than 16k
val sourceObject = ClassWithLargeStringDataField(LargeStringData(longString))
val serializedObject = "{\"largeStringField\": $longString }"
val jsonWithLenientMode = Json { isLenient = true }
testDecodeInAllModes(jsonWithLenientMode, serializedObject, sourceObject)
}

@Test
fun decodePlainString() {
val longStringWithEscape = "${"abcd".repeat(4096)}\"${"abcd".repeat(4096)}" // Make string more than 16k
val sourceObject = ClassWithLargeStringDataField(LargeStringData(longStringWithEscape))
val serializedObject = Json.encodeToString(sourceObject)
testDecodeInAllModes(Json, serializedObject, sourceObject)
}

private fun testDecodeInAllModes(
seralizer: Json, serializedObject: String, sourceObject: ClassWithLargeStringDataField
) {
/* Filter out Java Streams mode in common tests. Java streams tested separately in java tests */
JsonTestingMode.values().filterNot { it == JsonTestingMode.JAVA_STREAMS }.forEach { mode ->
if (mode == JsonTestingMode.TREE) {
assertFailsWithMessage<IllegalArgumentException>(
"Only chunked decoder supported", "Shouldn't decode JSON in TREE mode"
) {
seralizer.decodeFromString<ClassWithLargeStringDataField>(serializedObject, mode)
}
} else {
val deserializedObject =
seralizer.decodeFromString<ClassWithLargeStringDataField>(serializedObject, mode)
assertEquals(sourceObject.largeStringField, deserializedObject.largeStringField)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package kotlinx.serialization.json

import kotlinx.serialization.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.descriptors.*
import kotlinx.serialization.encoding.*
import kotlinx.serialization.test.assertFailsWithMessage
import org.junit.Test
import java.io.*
import java.util.*
import kotlin.random.Random
import kotlin.test.*


@Serializable(with = LargeBase64StringSerializer::class)
data class LargeBinaryData(val binaryData: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as LargeBinaryData

if (!binaryData.contentEquals(other.binaryData)) return false

return true
}

override fun hashCode(): Int {
return binaryData.contentHashCode()
}
}

@Serializable
data class ClassWithBinaryDataField(val binaryField: LargeBinaryData)

object LargeBase64StringSerializer : KSerializer<LargeBinaryData> {
private val b64Decoder: Base64.Decoder = Base64.getDecoder()
override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("LargeStringContent", PrimitiveKind.STRING)

override fun deserialize(decoder: Decoder): LargeBinaryData {
require(decoder is ChunkedDecoder) { "Only chunked decoder supported" }

var reminder = ""
val decodedBytes = ByteArrayOutputStream().use { bos ->
decoder.decodeStringChunked {
val actualChunk = reminder + it
val reminderLength = actualChunk.length % 4
val alignedLength = actualChunk.length - reminderLength
val alignedChunk = actualChunk.take(alignedLength)
reminder = actualChunk.takeLast(reminderLength)
bos.write(b64Decoder.decode(alignedChunk))
}
bos.toByteArray()
}

return LargeBinaryData(decodedBytes)
}

override fun serialize(encoder: Encoder, value: LargeBinaryData) {
encoder.encodeString(Base64.getEncoder().encodeToString(value.binaryData))
}
}

class JsonChunkedBase64DecoderTest : JsonTestBase() {

@Test
fun decodeBase64String() {
val sourceObject =
ClassWithBinaryDataField(LargeBinaryData(Random.nextBytes(16 * 1024))) // After encoding to Base64 will be larger than 16k (JsonLexer#BATCH_SIZE)
val serializedObject = Json.encodeToString(sourceObject)

JsonTestingMode.values().forEach { mode ->
if (mode == JsonTestingMode.TREE) {
assertFailsWithMessage<IllegalArgumentException>(
"Only chunked decoder supported", "Shouldn't decode JSON in TREE mode"
) {
Json.decodeFromString<ClassWithBinaryDataField>(serializedObject, mode)
}
} else {
val deserializedObject = Json.decodeFromString<ClassWithBinaryDataField>(serializedObject, mode)
assertEquals(sourceObject.binaryField, deserializedObject.binaryField)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal open class StreamingJsonDecoder(
@JvmField internal val lexer: AbstractJsonLexer,
descriptor: SerialDescriptor,
discriminatorHolder: DiscriminatorHolder?
) : JsonDecoder, AbstractDecoder() {
) : JsonDecoder, ChunkedDecoder, AbstractDecoder() {

// A mutable reference to the discriminator that have to be skipped when in optimistic phase
// of polymorphic serialization, see `decodeSerializableValue`
Expand Down Expand Up @@ -343,6 +343,10 @@ internal open class StreamingJsonDecoder(
}
}

override fun decodeStringChunked(consumeChunk: (chunk: String) -> Unit) {
lexer.consumeStringChunked(configuration.isLenient, consumeChunk)
}

override fun decodeInline(descriptor: SerialDescriptor): Decoder =
if (descriptor.isUnsignedNumber) JsonDecoderForUnsignedTypes(lexer, json)
else super.decodeInline(descriptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package kotlinx.serialization.json.internal

import kotlinx.serialization.json.internal.*
import kotlinx.serialization.json.internal.CharMappings.CHAR_TO_TOKEN
import kotlinx.serialization.json.internal.CharMappings.ESCAPE_2_CHAR
import kotlin.js.*
Expand Down Expand Up @@ -310,6 +309,58 @@ internal abstract class AbstractJsonLexer {
*/
abstract fun consumeKeyString(): String

private fun insideString(isLenient: Boolean, char: Char): Boolean = if (isLenient) {
charToTokenClass(char) == TC_OTHER
} else {
char != STRING
}

open fun consumeStringChunked(isLenient: Boolean, consumeChunk: (stringChunk: String) -> Unit) { // open to allow simpler implementations (i.e. StringJsonLexer)
val nextToken = peekNextToken()
if (isLenient && nextToken != TC_OTHER) return // noting to consume

if (!isLenient) {
consumeNextToken(STRING)
}
var currentPosition = this.currentPosition
var lastPosition = currentPosition
var char = source[currentPosition] // Avoid two range checks visible in the profiler
var usedAppend = false
while (insideString(isLenient, char)) {
if (!isLenient && char == STRING_ESC) { // handle escaping only in non-lenient mode
usedAppend = true
currentPosition = prefetchOrEof(appendEscape(lastPosition, currentPosition))
lastPosition = currentPosition
} else {
currentPosition++
}
if (currentPosition >= source.length) {
// end of chunk
writeRange(lastPosition, currentPosition, usedAppend, consumeChunk)
usedAppend = false
currentPosition = prefetchOrEof(currentPosition)
if (currentPosition == -1)
fail("EOF", currentPosition)
lastPosition = currentPosition
}
char = source[currentPosition]
}
writeRange(lastPosition, currentPosition, usedAppend, consumeChunk)
this.currentPosition = currentPosition
if (!isLenient) {
consumeNextToken(STRING)
}
}

private fun writeRange(fromIndex: Int, toIndex: Int, currentChunkHasEscape: Boolean, consumeChunk: (stringChunk: String) -> Unit) {
if (currentChunkHasEscape) {
consumeChunk(decodedString(fromIndex, toIndex))
} else {
consumeChunk(substring(fromIndex, toIndex))
}
}


fun consumeString(): String {
if (peekedString != null) {
return takePeeked()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ internal class StringJsonLexer(override val source: String) : AbstractJsonLexer(
return source.substring(current, closingQuote)
}

override fun consumeStringChunked(isLenient: Boolean, consumeChunk: (stringChunk: String) -> Unit) {
(if (isLenient) consumeStringLenient() else consumeString()).chunked(BATCH_SIZE).forEach(consumeChunk)
}

override fun consumeLeadingMatchingValue(keyToMatch: String, isLenient: Boolean): String? {
val positionSnapshot = currentPosition
try {
Expand Down

0 comments on commit 90113a9

Please sign in to comment.