Skip to content

Commit

Permalink
Use pure-java Air-Compressor instead of JNI based libraries (apache#5390
Browse files Browse the repository at this point in the history
)

* Use pure-java Air-Compressor instead of JNI based libraries

* Fixed license files

* Fixed non-needed exclusion

* Added compat tests with JNI implementations

* Ensure direct buffer is used in the test

* Ensure direct bytebuf for both compression and decompression test

Co-authored-by: penghui <penghui@apache.org>
  • Loading branch information
2 people authored and cdbartholomew committed Jul 24, 2020
1 parent 0733dc8 commit ce5e889
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 131 deletions.
26 changes: 0 additions & 26 deletions distribution/server/licenses/LICENSE-zstd-jni.txt

This file was deleted.

6 changes: 2 additions & 4 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ The Apache Software License, Version 2.0
- org.apache.distributedlog-distributedlog-core-4.10.0.jar
- org.apache.distributedlog-distributedlog-protocol-4.10.0.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.10.0.jar
* LZ4 -- org.lz4-lz4-java-1.5.0.jar
* AirCompressor
- io.airlift-aircompressor-0.16.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.1.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
Expand Down Expand Up @@ -467,8 +468,6 @@ The Apache Software License, Version 2.0
- org.apache.yetus-audience-annotations-0.5.0.jar
* @FreeBuilder
- org.inferred-freebuilder-1.14.9.jar
* Snappy Java
- org.xerial.snappy-snappy-java-1.1.1.3.jar
* Squareup
- com.squareup.okhttp-logging-interceptor-2.7.5.jar
- com.squareup.okhttp-okhttp-ws-2.7.5.jar
Expand Down Expand Up @@ -509,7 +508,6 @@ BSD 3-clause "New" or "Revised" License

BSD 2-Clause License
* HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- licenses/LICENSE-HdrHistogram.txt
* Zstandard JNI -- com.github.luben-zstd-jni-1.3.7-3.jar -- licenses/LICENSE-zstd-jni.txt

MIT License
* Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- licenses/LICENSE-SemVer.txt
Expand Down
4 changes: 0 additions & 4 deletions distribution/server/src/assemble/NOTICE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,6 @@ for non-commercial or commercial purposes and without fee is
granted provided that the copyright notice appears in all copies.

------------------------------------------------------------------------------------
- org.xerial.snappy-snappy-java-1.1.1.3.jar

This product includes software developed by Google
Snappy: http://code.google.com/p/snappy/ (New BSD License)

This product includes software developed by Apache
PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
Expand Down
32 changes: 12 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,13 @@ flexible messaging model and an intuitive client API.</description>
<debezium.version>1.0.0.Final</debezium.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<zstd.version>1.3.7-3</zstd.version>
<snappy.version>1.1.1.3</snappy.version>
<hbase.version>1.4.9</hbase.version>
<guava.version>25.1-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.12.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
<aircompressor.version>0.16</aircompressor.version>
<httpcomponents.version>4.5.5</httpcomponents.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>

Expand Down Expand Up @@ -603,24 +602,6 @@ flexible messaging model and an intuitive client API.</description>
<version>2.5</version>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd.version}</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -1137,6 +1118,17 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>jcip-annotations</artifactId>
<version>${jcip.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>${aircompressor.version}</version>
<exclusions>
<exclusion>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
36 changes: 24 additions & 12 deletions pulsar-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,8 @@
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -155,6 +145,28 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.5.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.3.7-3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.1.3</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.airlift.compress.zstd;

/**
* Expose ZstdFrameCompressor which is a package protected class.
*/
public class ZStdRawCompressor {
public static int compress(long inputAddress, long inputLimit,
long outputAddress, long outputLimit, int compressionLevel) {
return ZstdFrameCompressor.compress(null, inputAddress, inputLimit, null, outputAddress, outputLimit,
compressionLevel);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.airlift.compress.zstd;

/**
* Exposes ZstdFrameDecompressor which is package protected.
*/
public class ZStdRawDecompressor extends ZstdFrameDecompressor {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* Helper class to access AirCompressor package private classes.
*/
package io.airlift.compress.zstd;
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,87 @@
*/
package org.apache.pulsar.common.compression;

import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lz4.Lz4RawCompressor;
import io.airlift.compress.lz4.Lz4RawDecompressor;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;

import java.io.IOException;
import java.nio.ByteBuffer;
import lombok.extern.slf4j.Slf4j;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;

import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;

/**
* LZ4 Compression.
*/
@Slf4j
public class CompressionCodecLZ4 implements CompressionCodec {

static {
try {
// Force the attempt to load LZ4 JNI
net.jpountz.util.Native.load();
} catch (Throwable th) {
log.warn("Failed to load native LZ4 implementation: {}", th.getMessage());
private static final FastThreadLocal<Lz4Compressor> LZ4_COMPRESSOR = new FastThreadLocal<Lz4Compressor>() {
@Override
protected Lz4Compressor initialValue() throws Exception {
return new Lz4Compressor();
}
}
};

private static final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
private static final LZ4Compressor compressor = lz4Factory.fastCompressor();
private static final LZ4FastDecompressor decompressor = lz4Factory.fastDecompressor();
private static final FastThreadLocal<Lz4Decompressor> LZ4_DECOMPRESSOR = new FastThreadLocal<Lz4Decompressor>() {
@Override
protected Lz4Decompressor initialValue() throws Exception {
return new Lz4Decompressor();
}
};

private static final FastThreadLocal<int[]> LZ4_TABLE = new FastThreadLocal<int[]>() {
@Override
protected int[] initialValue() throws Exception {
return new int[Lz4RawCompressor.MAX_TABLE_SIZE];
}
};

@Override
public ByteBuf encode(ByteBuf source) {
int uncompressedLength = source.readableBytes();
int maxLength = compressor.maxCompressedLength(uncompressedLength);

ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
int maxLength = Lz4RawCompressor.maxCompressedLength(uncompressedLength);

ByteBuf target = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
ByteBuffer targetNio = target.nioBuffer(0, maxLength);

int compressedLength = compressor.compress(sourceNio, 0, uncompressedLength, targetNio, 0, maxLength);
int compressedLength;
if (source.hasMemoryAddress() && target.hasMemoryAddress()) {
compressedLength = Lz4RawCompressor.compress(
null,
source.memoryAddress() + source.readerIndex(),
source.readableBytes(),
null,
target.memoryAddress(),
maxLength,
LZ4_TABLE.get());
} else {
ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
ByteBuffer targetNio = target.nioBuffer(0, maxLength);

LZ4_COMPRESSOR.get().compress(sourceNio, targetNio);
compressedLength = targetNio.position();
}

target.writerIndex(compressedLength);
return target;
}

@Override
public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);

ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
decompressor.decompress(encodedNio, encodedNio.position(), uncompressedNio, uncompressedNio.position(),
uncompressedNio.remaining());
if (encoded.hasMemoryAddress() && uncompressed.hasMemoryAddress()) {
Lz4RawDecompressor.decompress(null, encoded.memoryAddress() + encoded.readerIndex(),
encoded.memoryAddress() + encoded.writerIndex(), null, uncompressed.memoryAddress(),
uncompressed.memoryAddress() + uncompressedLength);
} else {
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());

LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}

uncompressed.writerIndex(uncompressedLength);
return uncompressed;
Expand Down
Loading

0 comments on commit ce5e889

Please sign in to comment.