From 057b33ea9c3f0f8bc70d27a069ea0e82ecbe1f70 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 22 Dec 2020 18:53:27 +0100 Subject: [PATCH] Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990) Fixes #8974 ### Motivation In certain cases peeking messages on compresses topics return an error, see #8974 because Airlift does not support readonly ByteBuffers, because they do not give access to the underlying array) ### Modifications Copy the ByteByffer in case of unsupported buffer type ### Verifying this change This change adds new tests that reproduce the error and demonstrate that the problem is fixed. (cherry picked from commit cbc606b0b0e836c1238ea1ba92400b3f14e5b349) --- .../common/compression/AirliftUtils.java | 38 +++++++++++++++++++ .../compression/CompressionCodecLZ4.java | 3 +- .../compression/CompressionCodecSnappy.java | 1 + .../compression/CompressionCodecZstd.java | 2 +- .../compression/CompressorCodecTest.java | 31 +++++++++++++++ 5 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java new file mode 100644 index 0000000000000..3bfc609f1bdd6 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.compression; + +import java.nio.ByteBuffer; + +/** + * Utilities. + */ +public abstract class AirliftUtils { + + static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) { + if (!encodedNio.isDirect() && !encodedNio.hasArray()) { + // airlift needs a raw ByteArray + ByteBuffer copy = ByteBuffer.allocate(uncompressedLength); + copy.put(encodedNio); + copy.flip(); + encodedNio = copy; + } + return encodedNio; + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java index 2493af4ca39f8..12a03d1f25286 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java @@ -96,11 +96,12 @@ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOExceptio } else { ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); - + encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength); LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio); } uncompressed.writerIndex(uncompressedLength); return uncompressed; } + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java index 517f1ca7ccede..1e31edc9e12a0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java @@ -103,6 +103,7 @@ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOExceptio ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); + encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength); SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java index 944e1e5a07b03..18caee666f009 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java @@ -97,7 +97,7 @@ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOExceptio } else { ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); - + encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength); ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java index 46e57188bb1ce..ec84741b25f06 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java @@ -83,6 +83,37 @@ void testCompressDecompress(CompressionType type, String compressedText) throws assertEquals(compressed.refCnt(), 0); } + @Test(dataProvider = "codec") + void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException { + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type); + byte[] data = text.getBytes(); + ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer(); + raw.writeBytes(data); + + ByteBuf compressed = codec.encode(raw); + assertEquals(raw.readableBytes(), data.length); + + int compressedSize = compressed.readableBytes(); + // Readonly ByteBuffers are not supported by AirLift + // https://github.com/apache/pulsar/issues/8974 + ByteBuf compressedComplexByteBuf = compressed.asReadOnly(); + ByteBuf uncompressed = codec.decode(compressedComplexByteBuf, data.length); + + assertEquals(compressed.readableBytes(), compressedSize); + + assertEquals(uncompressed.readableBytes(), data.length); + assertEquals(uncompressed, raw); + + raw.release(); + compressed.release(); + uncompressed.release(); + + // Verify compression codecs have the same behavior with buffers ref counting + assertEquals(raw.refCnt(), 0); + assertEquals(compressed.refCnt(), 0); + assertEquals(compressed.refCnt(), 0); + } + @Test(dataProvider = "codec") void testEmptyInput(CompressionType type, String compressedText) throws IOException { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);