-
Notifications
You must be signed in to change notification settings - Fork 31
kn: gzip compression #1228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kn: gzip compression #1228
Conversation
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipCompressor.kt
Outdated
Show resolved
Hide resolved
var finished = false | ||
|
||
while (!finished) { | ||
val outputPin = buffer.pin() | ||
stream.next_out = outputPin.addressOf(0).reinterpret() | ||
stream.avail_out = BUFFER_SIZE.toUInt() | ||
|
||
val deflateResult = deflate(stream.ptr, Z_FINISH) | ||
if (deflateResult != Z_STREAM_END && deflateResult != Z_OK) { | ||
throw RuntimeException("Deflate failed during finish: $deflateResult") | ||
} | ||
|
||
val bytesWritten = BUFFER_SIZE - stream.avail_out.toInt() | ||
outputBuffer.addAll(buffer.take(bytesWritten)) | ||
|
||
finished = deflateResult == Z_STREAM_END | ||
outputPin.unpin() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Can this be replaced by a call to consume(availableForRead)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not directly, because the data is still inside the stream
, not in the internal buffer we manage. It could possibly be replaced with update(byteArrayOf())
followed by consume(availableForRead)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually an empty update
won't work, because we need to specify Z_FINISH
rather than Z_NO_FLUSH
to ensure the entire contents are flushed out
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipCompressor.kt
Outdated
Show resolved
Hide resolved
companion object { | ||
internal const val BUFFER_SIZE = 16384 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why is this the right buffer size? Would we ever want to use different sizes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no reason, I just chose this number. I don't think it matters too much, it just controls how often we call deflate
(whether the input passed to update
can fit in single deflate
call or not)
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipCompressor.kt
Outdated
Show resolved
Hide resolved
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipCompressor.kt
Outdated
Show resolved
Hide resolved
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipCompressor.kt
Outdated
Show resolved
Hide resolved
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipCompressor.kt
Outdated
Show resolved
Hide resolved
runtime/runtime-core/native/src/aws/smithy/kotlin/runtime/compression/GzipNative.kt
Show resolved
Hide resolved
// If still no data is available and the channel is closed, we've hit EOF. Close the compressor and write the remaining bytes | ||
if (compressor.availableForRead == 0 && channel.isClosedForRead) { | ||
val terminationBytes = compressor.close() | ||
sink.write(terminationBytes) | ||
return terminationBytes.size.toLong() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correctness: Is it always true that availableForRead == 0
after compressor.update(...)
means EOF? Zlib's manual seems to imply that calls to deflate
might result in data being internally buffered but not necessarily producing output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't, but if the underlying channel is closed (second half of the condition), then it's safe to flush the compressor (preparing all the internally-buffered data) and write the remaining bytes
* add telemetry provider configuration * lint * address pr reviews * add changelog
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
|
||
internal val isClosed | ||
get() = _isClosed.value | ||
private val outputBuffer = SdkByteChannel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Do we have to use a channel instead of a source/sink here? This makes all of the data methods suspend
but I cannot see why they need to be, conceptually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use an SdkSource
or an SdkSink
because those only support either read
or write
but not both. I'll try SdkBuffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to use SdkBuffer
runBlocking { | ||
GzipSdkSource(bytes.source()).readToByteArray().asByteStream() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why do we need to invoke a suspend
method here? We're pulling this all into memory and blocking until it's done so it seems like we can avoid runBlocking
/suspend
altogether...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #1228 (comment)
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
Affected ArtifactsChanged in size
|
GzipCompressor
)decompressGzipBytes
@InternalApi public
, which I think is a fine tradeoffIssue #
Description of changes
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.