Skip to content

Commit

Permalink
add compress level (#875)
Browse files Browse the repository at this point in the history
* add compress level
  • Loading branch information
qiujiayu authored Aug 16, 2022
1 parent 1abbc5e commit 489e5b7
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.alipay.sofa.jraft.rhea.options.configured.RpcOptionsConfigured;
import com.alipay.sofa.jraft.util.Utils;

import java.util.zip.Deflater;

/**
*
* @author jiachun.fjc
Expand Down Expand Up @@ -53,6 +55,8 @@ public class RheaKVStoreOptions {
private int compressThreads = Utils.cpus();
private int deCompressThreads = Utils.cpus() + 1;

private int compressLevel = Deflater.DEFAULT_COMPRESSION;

public long getClusterId() {
return clusterId;
}
Expand Down Expand Up @@ -179,6 +183,14 @@ public void setDeCompressThreads(int deCompressThreads) {
this.deCompressThreads = deCompressThreads;
}

public void setCompressLevel(int compressLevel) {
this.compressLevel = compressLevel;
}

public int getCompressLevel() {
return this.compressLevel;
}

@Override
public String toString() {
return "RheaKVStoreOptions{" + "clusterId=" + clusterId + ", clusterName='" + clusterName + '\''
Expand All @@ -187,7 +199,8 @@ public String toString() {
+ ", rpcOptions=" + rpcOptions + ", failoverRetries=" + failoverRetries + ", futureTimeoutMillis="
+ futureTimeoutMillis + ", useParallelKVExecutor=" + useParallelKVExecutor + ", batchingOptions="
+ batchingOptions + ", useParallelCompress=" + useParallelCompress + ", compressThreads="
+ compressThreads + ", deCompressThreads=" + deCompressThreads + '}';
+ compressThreads + ", deCompressThreads=" + deCompressThreads + ", compressLevel=" + compressLevel
+ '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@

import com.alipay.sofa.jraft.rhea.util.ZipUtil;
import java.util.zip.Checksum;
import java.util.zip.Deflater;

/**
* @author hzh
*/
public class JDKZipStrategy implements ZipStrategy {

private final int level;

public JDKZipStrategy() {
this(Deflater.DEFAULT_COMPRESSION);
}

public JDKZipStrategy(int level) {
this.level = level;
}

@Override
public void compress(final String rootDir, final String sourceDir, final String outputZipFile,
final Checksum checksum) throws Throwable {
ZipUtil.compress(rootDir, sourceDir, outputZipFile, checksum);
ZipUtil.compress(rootDir, sourceDir, outputZipFile, checksum, level);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;

import java.util.zip.Deflater;

/**
* @author hzh
*/
Expand Down Expand Up @@ -58,6 +60,11 @@ public static void init(final RheaKVStoreOptions opts) {
DEFAULT_STRATEGY = PARALLEL_STRATEGY;
}
}

if (DEFAULT_STRATEGY == JDK_STRATEGY && opts.getCompressLevel() > Deflater.DEFAULT_COMPRESSION
&& opts.getCompressLevel() <= Deflater.BEST_COMPRESSION) {
addZipStrategy(JDK_STRATEGY, new JDKZipStrategy(opts.getCompressLevel()));
}
}

private ZipStrategyManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Paths;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
Expand All @@ -38,36 +41,44 @@
import com.alipay.sofa.jraft.util.Utils;

/**
*
* @author jiachun.fjc
*/
public final class ZipUtil {

private static final int BUFFER_SIZE = 2097152;

public static void compress(final String rootDir, final String sourceDir, final String outputFile,
final Checksum checksum) throws IOException {
final Checksum checksum, int level) throws IOException {
try (final FileOutputStream fos = new FileOutputStream(outputFile);
final CheckedOutputStream cos = new CheckedOutputStream(fos, checksum);
final ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(cos))) {
ZipUtil.compressDirectoryToZipFile(rootDir, sourceDir, zos);
zos.flush();
ZipOutputStream zipOutputStream = new ZipOutputStream(new BufferedOutputStream(cos, BUFFER_SIZE));) {
WritableByteChannel writableByteChannel = Channels.newChannel(zipOutputStream);
zipOutputStream.setLevel(level);
compressDirectoryToZipFile(rootDir, sourceDir, zipOutputStream, writableByteChannel);
zipOutputStream.flush();
fos.getFD().sync();
}
Utils.fsync(new File(outputFile));
}

private static void compressDirectoryToZipFile(final String rootDir, final String sourceDir,
final ZipOutputStream zos) throws IOException {
final ZipOutputStream zos, WritableByteChannel writableByteChannel)
throws IOException {
final String dir = Paths.get(rootDir, sourceDir).toString();
final File[] files = Requires.requireNonNull(new File(dir).listFiles(), "files");
for (final File file : files) {
final String child = Paths.get(sourceDir, file.getName()).toString();
if (file.isDirectory()) {
compressDirectoryToZipFile(rootDir, child, zos);
compressDirectoryToZipFile(rootDir, child, zos, writableByteChannel);
} else {
zos.putNextEntry(new ZipEntry(child));
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
IOUtils.copy(bis, zos);
ZipEntry entry = new ZipEntry(child);
zos.putNextEntry(entry);
long length = file.length();
if (length == 0) {
continue;
}
try (FileChannel fileChannel = new FileInputStream(file).getChannel()) {
fileChannel.transferTo(0, length, writableByteChannel);
}
}
}
Expand All @@ -77,15 +88,25 @@ public static void decompress(final String sourceFile, final String outputDir, f
throws IOException {
try (final FileInputStream fis = new FileInputStream(sourceFile);
final CheckedInputStream cis = new CheckedInputStream(fis, checksum);
final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis))) {
final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis, BUFFER_SIZE))) {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
final String fileName = entry.getName();
final File entryFile = new File(Paths.get(outputDir, fileName).toString());
FileUtils.forceMkdir(entryFile.getParentFile());
long length = entryFile.length();
int bufferSize = (int) length;
if (length > BUFFER_SIZE) {
bufferSize = BUFFER_SIZE;
} else if (length <= 0) {
bufferSize = 1;
}
try (final FileOutputStream fos = new FileOutputStream(entryFile);
final BufferedOutputStream bos = new BufferedOutputStream(fos)) {
IOUtils.copy(zis, bos);
final BufferedOutputStream bos = new BufferedOutputStream(fos, bufferSize)) {
IOUtils.copy(zis, bos, bufferSize);
bos.flush();
fos.getFD().sync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum;
import java.util.zip.Deflater;

import org.apache.commons.io.FileUtils;

Expand Down Expand Up @@ -305,7 +306,7 @@ private void doCompressSnapshot(final String path, final LocalFileMeta.Builder m
final String outputFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString();
try {
final Checksum checksum = new CRC64();
ZipUtil.compress(path, SNAPSHOT_DIR, outputFile, checksum);
ZipUtil.compress(path, SNAPSHOT_DIR, outputFile, checksum, Deflater.DEFAULT_COMPRESSION);
metaBuilder.setChecksum(Long.toHexString(checksum.getValue()));
} catch (final Throwable t) {
t.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
import org.junit.Test;

import java.util.zip.Deflater;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

Expand All @@ -30,9 +32,15 @@ public class ZipStrategyManagerTest {
@Test
public void testInit() {
RheaKVStoreOptions opts = new RheaKVStoreOptions();
opts.setCompressLevel(Deflater.NO_COMPRESSION);
ZipStrategyManager.init(opts);
ZipStrategy zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.JDK_STRATEGY);
assertNotNull(zipStrategy);
assertTrue(zipStrategy instanceof JDKZipStrategy);

opts.setUseParallelCompress(true);
ZipStrategyManager.init(opts);
ZipStrategy zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.PARALLEL_STRATEGY);
zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.PARALLEL_STRATEGY);
assertNotNull(zipStrategy);
assertTrue(zipStrategy instanceof ParallelZipStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.zip.Checksum;
import java.util.zip.Deflater;

import org.apache.commons.io.FileUtils;
import org.junit.After;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void zipTest() throws IOException {
final String rootPath = this.sourceDir.toPath().toAbsolutePath().getParent().toString();
final Path outPath = Paths.get(rootPath, "kv.zip");
final Checksum c1 = new CRC64();
ZipUtil.compress(rootPath, "zip_test", outPath.toString(), c1);
ZipUtil.compress(rootPath, "zip_test", outPath.toString(), c1, Deflater.DEFAULT_COMPRESSION);

System.out.println(Long.toHexString(c1.getValue()));

Expand Down

0 comments on commit 489e5b7

Please sign in to comment.