Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add compress level #875

Merged
merged 4 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.alipay.sofa.jraft.rhea.options.configured.RpcOptionsConfigured;
import com.alipay.sofa.jraft.util.Utils;

import java.util.zip.Deflater;

/**
*
* @author jiachun.fjc
Expand Down Expand Up @@ -53,6 +55,8 @@ public class RheaKVStoreOptions {
private int compressThreads = Utils.cpus();
private int deCompressThreads = Utils.cpus() + 1;

private int compressLevel = Deflater.DEFAULT_COMPRESSION;

public long getClusterId() {
return clusterId;
}
Expand Down Expand Up @@ -179,6 +183,14 @@ public void setDeCompressThreads(int deCompressThreads) {
this.deCompressThreads = deCompressThreads;
}

public void setCompressLevel(int compressLevel) {
this.compressLevel = compressLevel;
}

public int getCompressLevel() {
return this.compressLevel;
}

@Override
public String toString() {
return "RheaKVStoreOptions{" + "clusterId=" + clusterId + ", clusterName='" + clusterName + '\''
Expand All @@ -187,7 +199,8 @@ public String toString() {
+ ", rpcOptions=" + rpcOptions + ", failoverRetries=" + failoverRetries + ", futureTimeoutMillis="
+ futureTimeoutMillis + ", useParallelKVExecutor=" + useParallelKVExecutor + ", batchingOptions="
+ batchingOptions + ", useParallelCompress=" + useParallelCompress + ", compressThreads="
+ compressThreads + ", deCompressThreads=" + deCompressThreads + '}';
+ compressThreads + ", deCompressThreads=" + deCompressThreads + ", compressLevel=" + compressLevel
+ '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@

import com.alipay.sofa.jraft.rhea.util.ZipUtil;
import java.util.zip.Checksum;
import java.util.zip.Deflater;

/**
* @author hzh
*/
public class JDKZipStrategy implements ZipStrategy {

private final int level;

public JDKZipStrategy() {
this(Deflater.DEFAULT_COMPRESSION);
}

public JDKZipStrategy(int level) {
this.level = level;
}

@Override
public void compress(final String rootDir, final String sourceDir, final String outputZipFile,
final Checksum checksum) throws Throwable {
ZipUtil.compress(rootDir, sourceDir, outputZipFile, checksum);
ZipUtil.compress(rootDir, sourceDir, outputZipFile, checksum, level);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;

import java.util.zip.Deflater;

/**
* @author hzh
*/
Expand Down Expand Up @@ -58,6 +60,11 @@ public static void init(final RheaKVStoreOptions opts) {
DEFAULT_STRATEGY = PARALLEL_STRATEGY;
}
}

if (DEFAULT_STRATEGY == JDK_STRATEGY && opts.getCompressLevel() > Deflater.DEFAULT_COMPRESSION
&& opts.getCompressLevel() <= Deflater.BEST_COMPRESSION) {
addZipStrategy(JDK_STRATEGY, new JDKZipStrategy(opts.getCompressLevel()));
}
}

private ZipStrategyManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Paths;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
Expand All @@ -38,36 +41,44 @@
import com.alipay.sofa.jraft.util.Utils;

/**
*
* @author jiachun.fjc
*/
public final class ZipUtil {

private static final int BUFFER_SIZE = 2097152;

public static void compress(final String rootDir, final String sourceDir, final String outputFile,
final Checksum checksum) throws IOException {
final Checksum checksum, int level) throws IOException {
try (final FileOutputStream fos = new FileOutputStream(outputFile);
final CheckedOutputStream cos = new CheckedOutputStream(fos, checksum);
final ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(cos))) {
ZipUtil.compressDirectoryToZipFile(rootDir, sourceDir, zos);
zos.flush();
ZipOutputStream zipOutputStream = new ZipOutputStream(new BufferedOutputStream(cos, BUFFER_SIZE));) {
WritableByteChannel writableByteChannel = Channels.newChannel(zipOutputStream);
zipOutputStream.setLevel(level);
compressDirectoryToZipFile(rootDir, sourceDir, zipOutputStream, writableByteChannel);
zipOutputStream.flush();
fos.getFD().sync();
}
Utils.fsync(new File(outputFile));
}

private static void compressDirectoryToZipFile(final String rootDir, final String sourceDir,
final ZipOutputStream zos) throws IOException {
final ZipOutputStream zos, WritableByteChannel writableByteChannel)
throws IOException {
final String dir = Paths.get(rootDir, sourceDir).toString();
final File[] files = Requires.requireNonNull(new File(dir).listFiles(), "files");
for (final File file : files) {
final String child = Paths.get(sourceDir, file.getName()).toString();
if (file.isDirectory()) {
compressDirectoryToZipFile(rootDir, child, zos);
compressDirectoryToZipFile(rootDir, child, zos, writableByteChannel);
} else {
zos.putNextEntry(new ZipEntry(child));
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
IOUtils.copy(bis, zos);
ZipEntry entry = new ZipEntry(child);
zos.putNextEntry(entry);
long length = file.length();
if (length == 0) {
continue;
}
try (FileChannel fileChannel = new FileInputStream(file).getChannel()) {
fileChannel.transferTo(0, length, writableByteChannel);
}
}
}
Expand All @@ -77,15 +88,25 @@ public static void decompress(final String sourceFile, final String outputDir, f
throws IOException {
try (final FileInputStream fis = new FileInputStream(sourceFile);
final CheckedInputStream cis = new CheckedInputStream(fis, checksum);
final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis))) {
final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis, BUFFER_SIZE))) {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
final String fileName = entry.getName();
final File entryFile = new File(Paths.get(outputDir, fileName).toString());
FileUtils.forceMkdir(entryFile.getParentFile());
long length = entryFile.length();
int bufferSize = (int) length;
if (length > BUFFER_SIZE) {
bufferSize = BUFFER_SIZE;
} else if (length <= 0) {
bufferSize = 1;
}
try (final FileOutputStream fos = new FileOutputStream(entryFile);
final BufferedOutputStream bos = new BufferedOutputStream(fos)) {
IOUtils.copy(zis, bos);
final BufferedOutputStream bos = new BufferedOutputStream(fos, bufferSize)) {
IOUtils.copy(zis, bos, bufferSize);
bos.flush();
fos.getFD().sync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum;
import java.util.zip.Deflater;

import org.apache.commons.io.FileUtils;

Expand Down Expand Up @@ -305,7 +306,7 @@ private void doCompressSnapshot(final String path, final LocalFileMeta.Builder m
final String outputFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString();
try {
final Checksum checksum = new CRC64();
ZipUtil.compress(path, SNAPSHOT_DIR, outputFile, checksum);
ZipUtil.compress(path, SNAPSHOT_DIR, outputFile, checksum, Deflater.DEFAULT_COMPRESSION);
metaBuilder.setChecksum(Long.toHexString(checksum.getValue()));
} catch (final Throwable t) {
t.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
import org.junit.Test;

import java.util.zip.Deflater;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

Expand All @@ -30,9 +32,15 @@ public class ZipStrategyManagerTest {
@Test
public void testInit() {
RheaKVStoreOptions opts = new RheaKVStoreOptions();
opts.setCompressLevel(Deflater.NO_COMPRESSION);
ZipStrategyManager.init(opts);
ZipStrategy zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.JDK_STRATEGY);
assertNotNull(zipStrategy);
assertTrue(zipStrategy instanceof JDKZipStrategy);

opts.setUseParallelCompress(true);
ZipStrategyManager.init(opts);
ZipStrategy zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.PARALLEL_STRATEGY);
zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.PARALLEL_STRATEGY);
assertNotNull(zipStrategy);
assertTrue(zipStrategy instanceof ParallelZipStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.zip.Checksum;
import java.util.zip.Deflater;

import org.apache.commons.io.FileUtils;
import org.junit.After;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void zipTest() throws IOException {
final String rootPath = this.sourceDir.toPath().toAbsolutePath().getParent().toString();
final Path outPath = Paths.get(rootPath, "kv.zip");
final Checksum c1 = new CRC64();
ZipUtil.compress(rootPath, "zip_test", outPath.toString(), c1);
ZipUtil.compress(rootPath, "zip_test", outPath.toString(), c1, Deflater.DEFAULT_COMPRESSION);

System.out.println(Long.toHexString(c1.getValue()));

Expand Down