Skip to content

Commit

Permalink
feat: Allow writing on-demand object-container values instead of only…
Browse files Browse the repository at this point in the history
… a Sequence
  • Loading branch information
Chuckame committed Sep 11, 2024
1 parent 6e1d82a commit 184ce7d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
7 changes: 5 additions & 2 deletions Migrating-from-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ val dataSequence = sequenceOf(
TheDataClass(...),
)
Files.newOutputStream(Path("/your/file.avro")).use { outputStream ->
AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.encodeToStream(dataSequence, outputStream) {
val writer = AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.openWriter(outputStream) {
codec(CodecFactory.snappyCodec())
// you can also add your metadata !
metadata("myProp", 1234L)
metadata("a string metadata", "hello")
}
writer.use {
dataSequence.forEach { writer.write(it) }
}
}
```

Expand Down
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ fun main() {
## Object container

Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainer` class. This encoding will prefix the binary data with the
full schema to
allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

The main difference with the `AvroObjectContainer` is that you will encode and decode `Sequence` of objects instead of single objects.
full schema to allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

Be aware that consuming the decoded `Sequence` needs to be done **before** closing the stream, or you will get an exception as a sequence is a "hot" source,
which means that if there is millions of objects in the file, all the objects are extracted one-by-one when requested. If you take only the first 10 objects and close the stream,
Expand All @@ -126,12 +123,14 @@ fun main() {
Project("avro4k", "Kotlin"),
)
Files.newOutputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.encodeToStream(valuesToEncode, fileStream, builder)
AvroObjectContainer.openWriter(fileStream).use { writer ->
valuesToEncode.forEach { writer.write(it) }
}
}

// Deserializing objects
Files.newInputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream, builder).forEach {
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream).forEach {
println(it) // Project(name=kotlinx.serialization, language=Kotlin) ...
}
}
Expand Down
47 changes: 28 additions & 19 deletions src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.avro.file.DataFileStream
import org.apache.avro.file.DataFileWriter
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DatumWriter
import java.io.Closeable
import java.io.InputStream
import java.io.OutputStream

Expand All @@ -28,28 +29,21 @@ public sealed class AvroObjectContainer(
public companion object Default : AvroObjectContainer(Avro)

/**
* Encodes the given sequence to the given output stream.
* Opens a writer to allow encoding values to avro and writing them to the output stream.
*
* Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks.
*/
public fun <T> encodeToStream(
public fun <T> openWriter(
schema: Schema,
serializer: SerializationStrategy<T>,
values: Sequence<T>,
outputStream: OutputStream,
builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
): AvroObjectContainerWriter<T> {
val datumWriter: DatumWriter<T> = KotlinxSerializationDatumWriter(serializer, avro)
val dataFileWriter = DataFileWriter(datumWriter)
try {
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
values.forEach {
dataFileWriter.append(it)
}
} finally {
dataFileWriter.flush()
}
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
return AvroObjectContainerWriter(dataFileWriter)
}

public fun <T> decodeFromStream(
Expand All @@ -66,6 +60,18 @@ public sealed class AvroObjectContainer(
}
}

public class AvroObjectContainerWriter<T> internal constructor(
private val writer: DataFileWriter<T>,
) : Closeable {
public fun writeValue(value: T) {
writer.append(value)
}

override fun close() {
writer.flush()
}
}

private class AvroObjectContainerImpl(avro: Avro) : AvroObjectContainer(avro)

public fun AvroObjectContainer(
Expand All @@ -76,13 +82,12 @@ public fun AvroObjectContainer(
}

@ExperimentalSerializationApi
public inline fun <reified T> AvroObjectContainer.encodeToStream(
values: Sequence<T>,
public inline fun <reified T> AvroObjectContainer.openWriter(
outputStream: OutputStream,
noinline builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
): AvroObjectContainerWriter<T> {
val serializer = avro.serializersModule.serializer<T>()
encodeToStream(avro.schema(serializer), serializer, values, outputStream, builder)
return openWriter(avro.schema(serializer), serializer, outputStream, builder)
}

@ExperimentalSerializationApi
Expand All @@ -94,7 +99,7 @@ public inline fun <reified T> AvroObjectContainer.decodeFromStream(
return decodeFromStream(serializer, inputStream, metadataDumper)
}

public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*>) {
public class AvroObjectContainerBuilder internal constructor(private val fileWriter: DataFileWriter<*>) {
public fun metadata(
key: String,
value: ByteArray,
Expand All @@ -116,12 +121,16 @@ public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*
fileWriter.setMeta(key, value)
}

public fun syncInterval(value: Int) {
fileWriter.setSyncInterval(value)
}

public fun codec(codec: CodecFactory) {
fileWriter.setCodec(codec)
}
}

public class AvroObjectContainerMetadataDumper(private val fileStream: DataFileStream<*>) {
public class AvroObjectContainerMetadataDumper internal constructor(private val fileStream: DataFileStream<*>) {
public fun metadata(key: String): MetadataAccessor? {
return fileStream.getMeta(key)?.let { MetadataAccessor(it) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ internal class AvroObjectContainerTest : StringSpec({
// write with avro4k
val bytes =
ByteArrayOutputStream().use {
AvroObjectContainer.encodeToStream(sequenceOf(firstProfile, secondProfile), it) {
val writer = AvroObjectContainer.openWriter<UserProfile>(it) {
metadata("meta-string", "awesome string")
metadata("meta-long", 42)
metadata("bytes", byteArrayOf(1, 3, 2, 42))
}
writer.writeValue(firstProfile)
writer.writeValue(secondProfile)
it.toByteArray()
}
// read with apache avro lib
Expand Down Expand Up @@ -115,7 +117,7 @@ internal class AvroObjectContainerTest : StringSpec({

val os = SimpleOutputStream()
shouldThrow<UnsupportedOperationException> {
AvroObjectContainer.encodeToStream<UserId>(sequence {}, os)
AvroObjectContainer.openWriter<UserId>(os)
}
os.closed shouldBe false
}
Expand Down

0 comments on commit 184ce7d

Please sign in to comment.