Skip to content

Commit

Permalink
Issue 8974: Peeking at compressed messages throws an exception (Reado…
Browse files Browse the repository at this point in the history
…nly buffers not supported by Airlift) (#8990)

Fixes #8974 

### Motivation
In certain cases peeking messages on compresses topics return an error, see #8974 because Airlift does not support readonly ByteBuffers, because they do not give access to the underlying array)

### Modifications

Copy the ByteByffer in case of unsupported buffer type

### Verifying this change

This change adds new tests that reproduce the error and demonstrate that the problem is fixed.

(cherry picked from commit cbc606b)
  • Loading branch information
eolivelli authored and codelipenghui committed Dec 23, 2020
1 parent 5febead commit fbbc251
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.compression;

import java.nio.ByteBuffer;

/**
* Utilities.
*/
public abstract class AirliftUtils {

static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) {
if (!encodedNio.isDirect() && !encodedNio.hasArray()) {
// airlift needs a raw ByteArray
ByteBuffer copy = ByteBuffer.allocate(uncompressedLength);
copy.put(encodedNio);
copy.flip();
encodedNio = copy;
}
return encodedNio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOExceptio
} else {
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());

encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}

uncompressed.writerIndex(uncompressedLength);
return uncompressed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOExceptio
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());

encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOExceptio
} else {
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());

encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,37 @@ void testCompressDecompress(CompressionType type, String compressedText) throws
assertEquals(compressed.refCnt(), 0);
}

@Test(dataProvider = "codec")
void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException {
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
byte[] data = text.getBytes();
ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
raw.writeBytes(data);

ByteBuf compressed = codec.encode(raw);
assertEquals(raw.readableBytes(), data.length);

int compressedSize = compressed.readableBytes();
// Readonly ByteBuffers are not supported by AirLift
// https://github.com/apache/pulsar/issues/8974
ByteBuf compressedComplexByteBuf = compressed.asReadOnly();
ByteBuf uncompressed = codec.decode(compressedComplexByteBuf, data.length);

assertEquals(compressed.readableBytes(), compressedSize);

assertEquals(uncompressed.readableBytes(), data.length);
assertEquals(uncompressed, raw);

raw.release();
compressed.release();
uncompressed.release();

// Verify compression codecs have the same behavior with buffers ref counting
assertEquals(raw.refCnt(), 0);
assertEquals(compressed.refCnt(), 0);
assertEquals(compressed.refCnt(), 0);
}

@Test(dataProvider = "codec")
void testEmptyInput(CompressionType type, String compressedText) throws IOException {
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
Expand Down

0 comments on commit fbbc251

Please sign in to comment.