Skip to content

Commit

Permalink
pekko: helper to decompress while streaming ByteString (#1717)
Browse files Browse the repository at this point in the history
Update the ByteStringInputStream to add a helper method for
creating the stream that automatically wraps with a GZIP
input stream if the data is compressed.
  • Loading branch information
brharrington authored Nov 4, 2024
1 parent 2dac822 commit e1dddd9
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package com.netflix.atlas.pekko

import java.io.InputStream

import org.apache.pekko.util.ByteString

import java.util.zip.GZIPInputStream

/**
* Wraps a `ByteString` to allow it to be read from code expecting an `InputStream`. This
* can be used to avoid allocating a temporary array and using `ByteArrayInputStream`.
Expand Down Expand Up @@ -54,3 +55,24 @@ class ByteStringInputStream(data: ByteString) extends InputStream {
current.remaining()
}
}

object ByteStringInputStream {

// Magic header to recognize GZIP compressed data
// http://www.zlib.org/rfc-gzip.html#file-format
private val gzipMagicHeader = ByteString(Array(0x1F.toByte, 0x8B.toByte))

/**
* Create an InputStream for reading the content of the ByteString. If the data is
* gzip compressed, then it will be wrapped in a GZIPInputStream to handle the
* decompression of the data. This can be handled at the server layer, but it may
* be preferable to decompress while parsing into the final object model to reduce
* the need to allocate an intermediate ByteString of the uncompressed data.
*/
def create(data: ByteString): InputStream = {
if (data.startsWith(gzipMagicHeader))
new GZIPInputStream(new ByteStringInputStream(data))
else
new ByteStringInputStream(data)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,6 @@ import scala.util.Success

object CustomDirectives {

// Magic header to recognize GZIP compressed data
// http://www.zlib.org/rfc-gzip.html#file-format
private val gzipMagicHeader = ByteString(Array(0x1F.toByte, 0x8B.toByte))

/**
* Create an InputStream for reading the content of the ByteString. If the data is
* gzip compressed, then it will be wrapped in a GZIPInputStream to handle the
* decompression of the data. This can be handled at the server layer, but it may
* be preferable to decompress while parsing into the final object model to reduce
* the need to allocate an intermediate ByteString of the uncompressed data.
*/
private def inputStream(bytes: ByteString): InputStream = {
if (bytes.startsWith(gzipMagicHeader))
new GZIPInputStream(new ByteStringInputStream(bytes))
else
new ByteStringInputStream(bytes)
}

private def isSmile(mediaType: MediaType): Boolean = {
mediaType == CustomMediaTypes.`application/x-jackson-smile`
}
Expand All @@ -90,9 +72,9 @@ object CustomDirectives {
def json[T: JavaTypeable]: MediaType => ByteString => T = { mediaType => bs =>
{
if (isSmile(mediaType))
Json.smileDecode[T](inputStream(bs))
Json.smileDecode[T](ByteStringInputStream.create(bs))
else
Json.decode[T](inputStream(bs))
Json.decode[T](ByteStringInputStream.create(bs))
}
}

Expand All @@ -107,9 +89,9 @@ object CustomDirectives {
{
val p =
if (isSmile(mediaType))
Json.newSmileParser(inputStream(bs))
Json.newSmileParser(ByteStringInputStream.create(bs))
else
Json.newJsonParser(inputStream(bs))
Json.newJsonParser(ByteStringInputStream.create(bs))
try decoder(p)
finally p.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,28 @@ package com.netflix.atlas.pekko
import java.io.ByteArrayInputStream
import java.security.MessageDigest
import java.util.Random

import org.apache.pekko.util.ByteString
import munit.FunSuite

import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream
import scala.util.Using

class ByteStringInputStreamSuite extends FunSuite {

private def compress(data: ByteString): ByteString = {
val baos = new ByteArrayOutputStream()
Using.resource(new GZIPOutputStream(baos)) { out =>
out.write(data.toArray)
}
ByteString(baos.toByteArray)
}

private def singleRead(name: String, data: ByteString): Unit = {
test(s"$name: read() and available()") {
val bais = new ByteArrayInputStream(data.toArray)
val bsis = new ByteStringInputStream(data)
val bsis = ByteStringInputStream.create(data)
val gzis = ByteStringInputStream.create(compress(data))

data.indices.foreach { i =>
if (data.isCompact) {
Expand All @@ -36,38 +48,56 @@ class ByteStringInputStreamSuite extends FunSuite {
} else {
assert(bsis.available() > 0)
}
assertEquals(bais.read(), bsis.read())
assert(gzis.available() > 0)

val expected = bais.read()
assertEquals(expected, bsis.read())
assertEquals(expected, gzis.read())
}

assertEquals(bais.read(), bsis.read())
val expected = bais.read()
assertEquals(expected, bsis.read())
assertEquals(expected, gzis.read())

bsis.close()
gzis.close()
}
}

private def bulkRead(name: String, data: ByteString): Unit = {
test(s"$name: read(buffer, offset, length)") {
val bais = new ByteArrayInputStream(data.toArray)
val bsis = new ByteStringInputStream(data)
val bsis = ByteStringInputStream.create(data)
val gzis = ByteStringInputStream.create(compress(data))

val h1 = MessageDigest.getInstance("SHA-256")
val h2 = MessageDigest.getInstance("SHA-256")
val h3 = MessageDigest.getInstance("SHA-256")

val b1 = new Array[Byte](13)
val b2 = new Array[Byte](13)
var i = 0
while (i < data.length) {
val b3 = new Array[Byte](13)
var continue = true
while (continue) {
val len1 = bais.read(b1)
val len2 = bsis.read(b2)
val len3 = gzis.read(b3)
if (data.isCompact) {
assertEquals(len1, len2)
assertEquals(b1.toSeq, b2.toSeq)
}
if (len1 > 0) h1.update(b1, 0, len1)
if (len2 > 0) h2.update(b2, 0, len2)
i += len2
if (len3 > 0) h3.update(b3, 0, len3)
continue = len1 > 0 || len2 > 0 || len3 > 0
}

assertEquals(bais.read(b1), bsis.read(b2))
assertEquals(h1.digest().toSeq, h2.digest().toSeq)
val digest = h1.digest().toSeq
assertEquals(digest, h2.digest().toSeq)
assertEquals(digest, h3.digest().toSeq)

bsis.close()
gzis.close()
}
}

Expand Down

0 comments on commit e1dddd9

Please sign in to comment.