Skip to content

Commit

Permalink
feat: Allow writing on-demand object-container values instead of only…
Browse files Browse the repository at this point in the history
… a Sequence
  • Loading branch information
Chuckame committed Sep 12, 2024
1 parent 6e1d82a commit 5389bd0
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 40 deletions.
7 changes: 5 additions & 2 deletions Migrating-from-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ val dataSequence = sequenceOf(
TheDataClass(...),
)
Files.newOutputStream(Path("/your/file.avro")).use { outputStream ->
AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.encodeToStream(dataSequence, outputStream) {
val writer = AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.openWriter(outputStream) {
codec(CodecFactory.snappyCodec())
// you can also add your metadata !
metadata("myProp", 1234L)
metadata("a string metadata", "hello")
}
writer.use {
dataSequence.forEach { writer.write(it) }
}
}
```

Expand Down
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ fun main() {
## Object container

Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainer` class. This encoding will prefix the binary data with the
full schema to
allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

The main difference with the `AvroObjectContainer` is that you will encode and decode `Sequence` of objects instead of single objects.
full schema to allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

Be aware that consuming the decoded `Sequence` needs to be done **before** closing the stream, or you will get an exception as a sequence is a "hot" source,
which means that if there is millions of objects in the file, all the objects are extracted one-by-one when requested. If you take only the first 10 objects and close the stream,
Expand All @@ -126,12 +123,14 @@ fun main() {
Project("avro4k", "Kotlin"),
)
Files.newOutputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.encodeToStream(valuesToEncode, fileStream, builder)
AvroObjectContainer.openWriter(fileStream).use { writer ->
valuesToEncode.forEach { writer.write(it) }
}
}

// Deserializing objects
Files.newInputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream, builder).forEach {
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream).forEach {
println(it) // Project(name=kotlinx.serialization, language=Kotlin) ...
}
}
Expand Down
12 changes: 8 additions & 4 deletions api/avro4k-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -169,20 +169,20 @@ public abstract class com/github/avrokotlin/avro4k/AvroObjectContainer {
public synthetic fun <init> (Lcom/github/avrokotlin/avro4k/Avro;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun decodeFromStream (Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;Lkotlin/jvm/functions/Function1;)Lkotlin/sequences/Sequence;
public static synthetic fun decodeFromStream$default (Lcom/github/avrokotlin/avro4k/AvroObjectContainer;Lkotlinx/serialization/DeserializationStrategy;Ljava/io/InputStream;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlin/sequences/Sequence;
public final fun encodeToStream (Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Lkotlin/sequences/Sequence;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun encodeToStream$default (Lcom/github/avrokotlin/avro4k/AvroObjectContainer;Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Lkotlin/sequences/Sequence;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public final fun getAvro ()Lcom/github/avrokotlin/avro4k/Avro;
public final fun openWriter (Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;)Lcom/github/avrokotlin/avro4k/AvroObjectContainerWriter;
public static synthetic fun openWriter$default (Lcom/github/avrokotlin/avro4k/AvroObjectContainer;Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Ljava/io/OutputStream;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lcom/github/avrokotlin/avro4k/AvroObjectContainerWriter;
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainer$Default : com/github/avrokotlin/avro4k/AvroObjectContainer {
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerBuilder {
public fun <init> (Lorg/apache/avro/file/DataFileWriter;)V
public final fun codec (Lorg/apache/avro/file/CodecFactory;)V
public final fun metadata (Ljava/lang/String;J)V
public final fun metadata (Ljava/lang/String;Ljava/lang/String;)V
public final fun metadata (Ljava/lang/String;[B)V
public final fun syncInterval (I)V
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerKt {
Expand All @@ -191,7 +191,6 @@ public final class com/github/avrokotlin/avro4k/AvroObjectContainerKt {
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerMetadataDumper {
public fun <init> (Lorg/apache/avro/file/DataFileStream;)V
public final fun metadata (Ljava/lang/String;)Lcom/github/avrokotlin/avro4k/AvroObjectContainerMetadataDumper$MetadataAccessor;
}

Expand All @@ -202,6 +201,11 @@ public final class com/github/avrokotlin/avro4k/AvroObjectContainerMetadataDumpe
public final fun asString ()Ljava/lang/String;
}

public final class com/github/avrokotlin/avro4k/AvroObjectContainerWriter : java/io/Closeable {
public fun close ()V
public final fun writeValue (Ljava/lang/Object;)V
}

public final class com/github/avrokotlin/avro4k/AvroOkioExtensionsKt {
public static final fun decodeFromSource (Lcom/github/avrokotlin/avro4k/Avro;Lorg/apache/avro/Schema;Lkotlinx/serialization/DeserializationStrategy;Lokio/BufferedSource;)Ljava/lang/Object;
public static final fun encodeToSink (Lcom/github/avrokotlin/avro4k/Avro;Lorg/apache/avro/Schema;Lkotlinx/serialization/SerializationStrategy;Ljava/lang/Object;Lokio/BufferedSink;)V
Expand Down
47 changes: 28 additions & 19 deletions src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.avro.file.DataFileStream
import org.apache.avro.file.DataFileWriter
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DatumWriter
import java.io.Closeable
import java.io.InputStream
import java.io.OutputStream

Expand All @@ -28,28 +29,21 @@ public sealed class AvroObjectContainer(
public companion object Default : AvroObjectContainer(Avro)

/**
* Encodes the given sequence to the given output stream.
* Opens a writer to allow encoding values to avro and writing them to the output stream.
*
* Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks.
*/
public fun <T> encodeToStream(
public fun <T> openWriter(
schema: Schema,
serializer: SerializationStrategy<T>,
values: Sequence<T>,
outputStream: OutputStream,
builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
): AvroObjectContainerWriter<T> {
val datumWriter: DatumWriter<T> = KotlinxSerializationDatumWriter(serializer, avro)
val dataFileWriter = DataFileWriter(datumWriter)
try {
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
values.forEach {
dataFileWriter.append(it)
}
} finally {
dataFileWriter.flush()
}
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
return AvroObjectContainerWriter(dataFileWriter)
}

public fun <T> decodeFromStream(
Expand All @@ -66,6 +60,18 @@ public sealed class AvroObjectContainer(
}
}

public class AvroObjectContainerWriter<T> internal constructor(
private val writer: DataFileWriter<T>,
) : Closeable {
public fun writeValue(value: T) {
writer.append(value)
}

override fun close() {
writer.flush()
}
}

private class AvroObjectContainerImpl(avro: Avro) : AvroObjectContainer(avro)

public fun AvroObjectContainer(
Expand All @@ -76,13 +82,12 @@ public fun AvroObjectContainer(
}

@ExperimentalSerializationApi
public inline fun <reified T> AvroObjectContainer.encodeToStream(
values: Sequence<T>,
public inline fun <reified T> AvroObjectContainer.openWriter(
outputStream: OutputStream,
noinline builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
): AvroObjectContainerWriter<T> {
val serializer = avro.serializersModule.serializer<T>()
encodeToStream(avro.schema(serializer), serializer, values, outputStream, builder)
return openWriter(avro.schema(serializer), serializer, outputStream, builder)
}

@ExperimentalSerializationApi
Expand All @@ -94,7 +99,7 @@ public inline fun <reified T> AvroObjectContainer.decodeFromStream(
return decodeFromStream(serializer, inputStream, metadataDumper)
}

public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*>) {
public class AvroObjectContainerBuilder internal constructor(private val fileWriter: DataFileWriter<*>) {
public fun metadata(
key: String,
value: ByteArray,
Expand All @@ -116,12 +121,16 @@ public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*
fileWriter.setMeta(key, value)
}

public fun syncInterval(value: Int) {
fileWriter.setSyncInterval(value)
}

public fun codec(codec: CodecFactory) {
fileWriter.setCodec(codec)
}
}

public class AvroObjectContainerMetadataDumper(private val fileStream: DataFileStream<*>) {
public class AvroObjectContainerMetadataDumper internal constructor(private val fileStream: DataFileStream<*>) {
public fun metadata(key: String): MetadataAccessor? {
return fileStream.getMeta(key)?.let { MetadataAccessor(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ internal class AvroObjectContainerTest : StringSpec({
// write with avro4k
val bytes =
ByteArrayOutputStream().use {
AvroObjectContainer.encodeToStream(sequenceOf(firstProfile, secondProfile), it) {
metadata("meta-string", "awesome string")
metadata("meta-long", 42)
metadata("bytes", byteArrayOf(1, 3, 2, 42))
}
val writer =
AvroObjectContainer.openWriter<UserProfile>(it) {
metadata("meta-string", "awesome string")
metadata("meta-long", 42)
metadata("bytes", byteArrayOf(1, 3, 2, 42))
}
writer.writeValue(firstProfile)
writer.writeValue(secondProfile)
writer.close()
it.toByteArray()
}
// read with apache avro lib
Expand Down Expand Up @@ -105,7 +109,6 @@ internal class AvroObjectContainerTest : StringSpec({
var closed = false

override fun write(b: Int) {
throw UnsupportedOperationException()
}

override fun close() {
Expand All @@ -114,9 +117,8 @@ internal class AvroObjectContainerTest : StringSpec({
}

val os = SimpleOutputStream()
shouldThrow<UnsupportedOperationException> {
AvroObjectContainer.encodeToStream<UserId>(sequence {}, os)
}
val writer = AvroObjectContainer.openWriter<UserId>(os)
writer.close()
os.closed shouldBe false
}
"decoding error is not closing the stream" {
Expand Down

0 comments on commit 5389bd0

Please sign in to comment.