-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding zstandard compression library #12408
Adding zstandard compression library #12408
Conversation
thanks for looking into this. For the most part we care a lot more about decoding speed and total size for query performance, so it would be worth running the 'select rows' version of the benchmarks too. Additionally, some of the benchmarks with a 'main' method also collect a 2nd "encoded size" measurement, so running something like:
should collect both time and size values instead of just the time value like when run with |
Sure thing, thank you for the tip. I am running those tests you requested right now and will post the results once they finish. |
@clintropolis so I am running the benchmark with the args you provided and the ETA is 114 days for the benchmark to finish. Is there anything that can be done to speed this up? |
Ah, yeah I would definitely reduce the number of parameter combinations for the run, since they are all focused around testing changes in different scenarios so no need to run everything. I would recommend like 1 or 2 |
Hi @clintropolis I ran the tests with the parameters, but had to run it with |
Hi @churromorales, very sorry for the delay!
That is unfortunate, I haven't actually run these in a while so maybe I made a mistake in my instructions, I'll try to have a look at it at some point. I was hoping it would work so we could easily get the size difference so we could compare with LZ4 and consider the differences for guidance documentation for cluster operators.
Yes, these benchmarks are testing the "hot" historical use case, where it is provisioned with a lot of extra "free" memory to be occupied with page cache of the memory mapped segment files. It is meant to find the best case scenario of performance. It would be interesting to have measurements where every read was from disk as well, so we could compare the opposite case of very dense historicals, but such benchmarks do not exist yet afaik.
Totally, I don't think this needs to be faster than LZ4 to be added, mostly just curious where it falls so we know how to advise cluster operators on when it might be a good idea to consider using it.
Do you still have these results? I'd be interested in seeing the difference, as well as the size difference if that isn't too hard to find out. In the mean time, I'll try to get this PR reviewed so we can add the option. |
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out) | ||
{ | ||
out.clear(); | ||
// some tests don't use dbb's and zstd jni doesn't allow for non-dbb byte buffers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, looking around it seems like it should the places we use compression should only be using direct buffers, what do you think about making this an error condition instead and just skipping the tests that use heap buffers?
If we find out that it is in fact needed, then the cloneBuffer
code needs to be modified to call ByteBufferUtils.free
after it is done with the direct buffer so that we do not leak memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, everyone should use DBB, but I believe when running the following test: It failed with:
java.lang.IllegalArgumentException: srcBuff must be a direct buffer
at com.github.luben.zstd.ZstdDecompressCtx.decompressDirectByteBuffer(ZstdDecompressCtx.java:110)
at com.github.luben.zstd.Zstd.decompressDirectByteBuffer(Zstd.java:434)
at org.apache.druid.segment.data.CompressionStrategy$ZstdDecompressor.decompress(CompressionStrategy.java:407)
at org.apache.druid.segment.data.DecompressingByteBufferObjectStrategy.fromByteBuffer(DecompressingByteBufferObjectStrategy.java:53)
at org.apache.druid.segment.data.DecompressingByteBufferObjectStrategy.fromByteBuffer(DecompressingByteBufferObjectStrategy.java:28)
at org.apache.druid.segment.data.GenericIndexed$BufferIndexed.bufferedIndexedGet(GenericIndexed.java:490)
at org.apache.druid.segment.data.GenericIndexed$3.get(GenericIndexed.java:645)
at org.apache.druid.segment.data.CompressedColumnarIntsSupplier$CompressedColumnarInts.loadBuffer(CompressedColumnarIntsSupplier.java:321)
at org.apache.druid.segment.data.CompressedColumnarIntsSupplier$CompressedColumnarInts.get(CompressedColumnarIntsSupplier.java:312)
at org.apache.druid.segment.data.CompressedColumnarIntsSupplierTest.assertIndexMatchesVals(CompressedColumnarIntsSupplierTest.java:289)
at org.apache.druid.segment.data.CompressedColumnarIntsSupplierTest.testSanityWithSerde(CompressedColumnarIntsSupplierTest.java:175)
I don't know what is the right solution here, I agree what I wrote feels a bit dirty. I'll defer to you here on how you think we should sort this out..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the right way is to probably make it be an exception to use this compression strategy without direct buffers, and modify the test to use direct buffers (or to skip this compression strategy).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so its not just one test, it is many these are all of the following classes that fail because they use HeapByteBuffers:
CompressedColumnarIntsSerializerTest
CompressedColumnarIntsSupplierTest
CompressedDoublesSerdeTest
CompressedFloatsSerdeTest
CompressedLongsAutoEncodingSerdeTest
CompressedLongsSerdeTest
CompressedVSizeColumnarIntsSerializerTest
CompressedVSizeColumnarIntsSupplierTest
V3CompressedVSizeColumnarMultiIntsSerializerTest
And within those test classes, multiple tests fail for not using dbb's. I recall now that when i looked into fixing all of these tests, it was going to end up to be quite a bigger patch than this actual feature. Do you have any suggestions here on how fix these? I can't see a non-invasive way to do this. Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it mostly would be replacing ByteBuffer.wrap
with a direct buffer, and adding an @After
to release them, unless I'm missing something?
If you'd rather not modify the tests (though it seems like they would probably be better tests using direct buffers since it more closely reflects what would be happening in practice), I would be ok with instead modifying the code to call ByteBufferUtils.free
on the cloned direct buffers, so that in case I missed something and we ever do use heap buffers in production code they are at least not leaked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, but I have a better idea I think on what to do... how about instead of cloning into a direct buffer, we just use byte arrays and Zstd.decompressByteArray
and copy the output byte array to the output buffer.
@Override
public void decompress(ByteBuffer in, int numBytes, ByteBuffer out)
{
out.clear();
if (!in.isDirect() || !out.isDirect()) {
// fall back to heap byte arrays if both buffers are not direct
final byte[] inputBytes = new byte[numBytes];
in.get(inputBytes);
try (final ResourceHolder<byte[]> outputBytesHolder = CompressedPools.getOutputBytes()) {
final byte[] outputBytes = outputBytesHolder.get();
int decompressedBytes = (int) Zstd.decompressByteArray(outputBytes, 0, outputBytes.length, inputBytes, 0, numBytes);
out.put(outputBytes, 0, decompressedBytes);
out.flip();
}
} else {
int decompressedBytes = (int) Zstd.decompressDirectByteBuffer(
out,
out.position(),
out.remaining(),
in,
in.position(),
numBytes
);
out.limit(out.position() + decompressedBytes);
}
}
I tested it locally and it seems to work. Some of the lzf methods do this too, so it is also not without precedent.
Also make sure any modifications done to tests in this PR that might allocate direct buffers are releasing them, see #12521, which also looks like has caused a conflict with this PR. I'll have a scan and see if I can spot anything too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clintropolis Thanks for the review. I agree, just having a byte array is the easiest thing. I fixed the checkstyle issues and as for the PR modifications from #12521 , you want me to rebase and force push, or merge main into my branch? How do you like to do this such that I can fix the CompressionStrategyTest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apologies for the delay; thinking a bit more about it, do we actually need to modify CompressionStrategyTest
now that we can handle heap buffers? It might be slightly better coverage to leave the test added by #12521 instead of just testing direct. That said, the changes overall lgtm now 👍
looks like there are a couple of CI failures about pom dependency stuffs https://app.travis-ci.com/github/apache/druid/jobs/571153862#L3183
and also maybe our license checking scripts aren't handling an entry correctly, though it isn't obvious why yet:
https://app.travis-ci.com/github/apache/druid/jobs/571153859#L4257
since the script has that key... https://github.com/apache/druid/blob/master/distribution/bin/check-licenses.py#L236
maybe its a bad error message though, due to https://github.com/apache/druid/blob/master/licenses.yaml#L3967 not being updated to the new version in this PR? https://github.com/apache/druid/blob/master/licenses.yaml#L3979 should also be updated to whatever version of zstd that the new version of the jni wrapper points to. Sorry I missed this stuff on earlier review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a reason for modifying the tests. The only thing that was calling compress with a non-dbb was that CompressionStrategyTest
. We don't support a non-dbb compression
in zstandard right now. The only thing that changed about the tests was not to test the non-dbb path. Which I thought was acceptable, because in the end we should never be calling this code with any other buffer than a direct.
I can fix the rest of the open items. I will wait to hear from you about what you want to do with the CompressionStrategyTest. If it is worthwhile, I'll have to do buffer copies for the compress like what was done in the Decompressor.
Again thank you for the review and making my life easier debugging the CICD issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see now, yeah that makes sense 👍 I think I'm ok with leaving the test like this.
I guess we could add the byte[]
version fallback of compress
just in case anything does try to call it that we didn't find, though I think all compression uses the 'allocate' methods on CompressionStrategy
so its probably not necessary.
… byte array when the buffers are not direct. 2. Cleaned up checkstyle issues.
hmm, license check still failing with an exception, wonder if there is something wrong with our script, will have a closer look in a bit (looks like an intellij inspection issue as well) |
Yeah I'm sorry, I don't know much about the CICD system. Thank you for helping me out with this, if there are any issues that I need to sort out please let me know. Appreciate the review |
Shouldnt the License name here: https://github.com/apache/druid/blob/master/distribution/bin/check-licenses.py#L236 |
I think the correct thing to do is change https://github.com/apache/druid/blob/master/distribution/bin/check-licenses.py#L236 to
and update https://github.com/apache/druid/blob/master/licenses.yaml#L5031 to use Sorry for the troubles with the license checker |
This makes sense, but since the license checker is throwing a KeyError:
Or we could modify the existing key from |
@clintropolis @a2l007 looks like things pass now, let me know if there is anything else required to get this patch merged. Thank you again for your help. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks @churromorales
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤘
oops, I think there is maybe an issue with something running locally Im seeing
Looking into it. |
Is this issue with the closer not being thread safe? And everyone sharing the same one?
… On May 28, 2022, at 5:59 PM, Clint Wylie ***@***.***> wrote:
oops, I think there is maybe an issue with something
running locally Im seeing
[WARNING] org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplierTest.testConcurrency[5: compression=zstd, byteOrder=LITTLE_ENDIAN]
[ERROR] Run 1: CompressedVSizeColumnarIntsSupplierTest>CompressionStrategyTest.testConcurrency:150 » Execution java.lang.ArrayIndexOutOfBoundsException: Index 54 out of bounds for length 54
[ERROR] Run 2: CompressedVSizeColumnarIntsSupplierTest>CompressionStrategyTest.closeCloser:88 » ArrayIndexOutOfBounds Index 70 out of bounds for length 54
[INFO] Run 3: PASS
[ERROR] Tests run: 65, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 5.832 s <<< FAILURE! - in org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplierTest
[ERROR] org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplierTest.testConcurrency[5: compression=zstd, byteOrder=LITTLE_ENDIAN] Time elapsed: 0.007 s <<< ERROR!
java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 54 out of bounds for length 54
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at org.apache.druid.segment.data.CompressionStrategyTest.testConcurrency(CompressionStrategyTest.java:150)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 54 out of bounds for length 54
at java.base/java.util.ArrayDeque.grow(ArrayDeque.java:159)
at java.base/java.util.ArrayDeque.addFirst(ArrayDeque.java:291)
at org.apache.druid.java.util.common.io.Closer.register(Closer.java:127)
at org.apache.druid.segment.data.CompressionStrategy$ZstdCompressor.allocateOutBuffer(CompressionStrategy.java:379)
at org.apache.druid.segment.data.CompressionStrategyTest.lambda$testConcurrency$1(CompressionStrategyTest.java:133)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
[ERROR] org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplierTest.testConcurrency[5: compression=zstd, byteOrder=LITTLE_ENDIAN] Time elapsed: 0.007 s <<< ERROR!
java.lang.ArrayIndexOutOfBoundsException: Index 70 out of bounds for length 54
at java.base/java.util.ArrayDeque.elementAt(ArrayDeque.java:260)
at java.base/java.util.ArrayDeque.pollFirst(ArrayDeque.java:380)
at java.base/java.util.ArrayDeque.removeFirst(ArrayDeque.java:361)
at org.apache.druid.java.util.common.io.Closer.close(Closer.java:180)
at org.apache.druid.segment.data.CompressionStrategyTest.closeCloser(CompressionStrategyTest.java:88)
...
Looking into it.
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you were mentioned.
|
I'm not entirely sure if it is from this PR or not yet, since I also see a failure with lz4 (though this PR did make some changes
I also have only seen it so far when i run all of the tests in |
fixes an issue caused by a test modification in #12408 that was closing buffers allocated by the compression strategy instead of allowing the closer to do it
fixes an issue caused by a test modification in #12408 that was closing buffers allocated by the compression strategy instead of allowing the closer to do it
Description
Adds Zstandard compression library to
CompressionStrategy
Ran Druid benchmark vs lz4: https://gist.github.com/churromorales/0a580e03e69f6ca6c13001c2a9aab0e0
One thing that I did not like about this patch is some tests use a HeapByteBuffer and call the compression codec, and the zstd requires a DBB as it calls out to the c-library and it is always good to avoid doing the copy. For those tests I checked in the
ZstdDecompressor
if the buffer was direct and if it wasn't i copied the contents into a dbb.If there are any other solutions here on how to approach this please let me know.