diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/options/RheaKVStoreOptions.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/options/RheaKVStoreOptions.java index 66263b375..e5368c0d3 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/options/RheaKVStoreOptions.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/options/RheaKVStoreOptions.java @@ -20,6 +20,8 @@ import com.alipay.sofa.jraft.rhea.options.configured.RpcOptionsConfigured; import com.alipay.sofa.jraft.util.Utils; +import java.util.zip.Deflater; + /** * * @author jiachun.fjc @@ -53,6 +55,8 @@ public class RheaKVStoreOptions { private int compressThreads = Utils.cpus(); private int deCompressThreads = Utils.cpus() + 1; + private int compressLevel = Deflater.DEFAULT_COMPRESSION; + public long getClusterId() { return clusterId; } @@ -179,6 +183,14 @@ public void setDeCompressThreads(int deCompressThreads) { this.deCompressThreads = deCompressThreads; } + public void setCompressLevel(int compressLevel) { + this.compressLevel = compressLevel; + } + + public int getCompressLevel() { + return this.compressLevel; + } + @Override public String toString() { return "RheaKVStoreOptions{" + "clusterId=" + clusterId + ", clusterName='" + clusterName + '\'' @@ -187,7 +199,8 @@ public String toString() { + ", rpcOptions=" + rpcOptions + ", failoverRetries=" + failoverRetries + ", futureTimeoutMillis=" + futureTimeoutMillis + ", useParallelKVExecutor=" + useParallelKVExecutor + ", batchingOptions=" + batchingOptions + ", useParallelCompress=" + useParallelCompress + ", compressThreads=" - + compressThreads + ", deCompressThreads=" + deCompressThreads + '}'; + + compressThreads + ", deCompressThreads=" + deCompressThreads + ", compressLevel=" + compressLevel + + '}'; } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/JDKZipStrategy.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/JDKZipStrategy.java index aa4391e4e..4968dad12 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/JDKZipStrategy.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/JDKZipStrategy.java @@ -18,16 +18,27 @@ import com.alipay.sofa.jraft.rhea.util.ZipUtil; import java.util.zip.Checksum; +import java.util.zip.Deflater; /** * @author hzh */ public class JDKZipStrategy implements ZipStrategy { + private final int level; + + public JDKZipStrategy() { + this(Deflater.DEFAULT_COMPRESSION); + } + + public JDKZipStrategy(int level) { + this.level = level; + } + @Override public void compress(final String rootDir, final String sourceDir, final String outputZipFile, final Checksum checksum) throws Throwable { - ZipUtil.compress(rootDir, sourceDir, outputZipFile, checksum); + ZipUtil.compress(rootDir, sourceDir, outputZipFile, checksum, level); } @Override diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManager.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManager.java index eee7d635b..0d0d836cf 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManager.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManager.java @@ -18,6 +18,8 @@ import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions; +import java.util.zip.Deflater; + /** * @author hzh */ @@ -58,6 +60,11 @@ public static void init(final RheaKVStoreOptions opts) { DEFAULT_STRATEGY = PARALLEL_STRATEGY; } } + + if (DEFAULT_STRATEGY == JDK_STRATEGY && opts.getCompressLevel() > Deflater.DEFAULT_COMPRESSION + && opts.getCompressLevel() <= Deflater.BEST_COMPRESSION) { + addZipStrategy(JDK_STRATEGY, new JDKZipStrategy(opts.getCompressLevel())); + } } private ZipStrategyManager() { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ZipUtil.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ZipUtil.java index d630f53ce..ccb13d40a 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ZipUtil.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/util/ZipUtil.java @@ -22,6 +22,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; import java.nio.file.Paths; import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; @@ -38,36 +41,44 @@ import com.alipay.sofa.jraft.util.Utils; /** - * * @author jiachun.fjc */ public final class ZipUtil { + private static final int BUFFER_SIZE = 2097152; + public static void compress(final String rootDir, final String sourceDir, final String outputFile, - final Checksum checksum) throws IOException { + final Checksum checksum, int level) throws IOException { try (final FileOutputStream fos = new FileOutputStream(outputFile); final CheckedOutputStream cos = new CheckedOutputStream(fos, checksum); - final ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(cos))) { - ZipUtil.compressDirectoryToZipFile(rootDir, sourceDir, zos); - zos.flush(); + ZipOutputStream zipOutputStream = new ZipOutputStream(new BufferedOutputStream(cos, BUFFER_SIZE));) { + WritableByteChannel writableByteChannel = Channels.newChannel(zipOutputStream); + zipOutputStream.setLevel(level); + compressDirectoryToZipFile(rootDir, sourceDir, zipOutputStream, writableByteChannel); + zipOutputStream.flush(); fos.getFD().sync(); } Utils.fsync(new File(outputFile)); } private static void compressDirectoryToZipFile(final String rootDir, final String sourceDir, - final ZipOutputStream zos) throws IOException { + final ZipOutputStream zos, WritableByteChannel writableByteChannel) + throws IOException { final String dir = Paths.get(rootDir, sourceDir).toString(); final File[] files = Requires.requireNonNull(new File(dir).listFiles(), "files"); for (final File file : files) { final String child = Paths.get(sourceDir, file.getName()).toString(); if (file.isDirectory()) { - compressDirectoryToZipFile(rootDir, child, zos); + compressDirectoryToZipFile(rootDir, child, zos, writableByteChannel); } else { - zos.putNextEntry(new ZipEntry(child)); - try (final FileInputStream fis = new FileInputStream(file); - final BufferedInputStream bis = new BufferedInputStream(fis)) { - IOUtils.copy(bis, zos); + ZipEntry entry = new ZipEntry(child); + zos.putNextEntry(entry); + long length = file.length(); + if (length == 0) { + continue; + } + try (FileChannel fileChannel = new FileInputStream(file).getChannel()) { + fileChannel.transferTo(0, length, writableByteChannel); } } } @@ -77,15 +88,25 @@ public static void decompress(final String sourceFile, final String outputDir, f throws IOException { try (final FileInputStream fis = new FileInputStream(sourceFile); final CheckedInputStream cis = new CheckedInputStream(fis, checksum); - final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis))) { + final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis, BUFFER_SIZE))) { ZipEntry entry; while ((entry = zis.getNextEntry()) != null) { + if (entry.isDirectory()) { + continue; + } final String fileName = entry.getName(); final File entryFile = new File(Paths.get(outputDir, fileName).toString()); FileUtils.forceMkdir(entryFile.getParentFile()); + long length = entryFile.length(); + int bufferSize = (int) length; + if (length > BUFFER_SIZE) { + bufferSize = BUFFER_SIZE; + } else if (length <= 0) { + bufferSize = 1; + } try (final FileOutputStream fos = new FileOutputStream(entryFile); - final BufferedOutputStream bos = new BufferedOutputStream(fos)) { - IOUtils.copy(zis, bos); + final BufferedOutputStream bos = new BufferedOutputStream(fos, bufferSize)) { + IOUtils.copy(zis, bos, bufferSize); bos.flush(); fos.getFD().sync(); } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java index 38c9459a3..210cfc5eb 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/benchmark/raw/SnapshotBenchmark.java @@ -26,6 +26,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.zip.Checksum; +import java.util.zip.Deflater; import org.apache.commons.io.FileUtils; @@ -305,7 +306,7 @@ private void doCompressSnapshot(final String path, final LocalFileMeta.Builder m final String outputFile = Paths.get(path, SNAPSHOT_ARCHIVE).toString(); try { final Checksum checksum = new CRC64(); - ZipUtil.compress(path, SNAPSHOT_DIR, outputFile, checksum); + ZipUtil.compress(path, SNAPSHOT_DIR, outputFile, checksum, Deflater.DEFAULT_COMPRESSION); metaBuilder.setChecksum(Long.toHexString(checksum.getValue())); } catch (final Throwable t) { t.printStackTrace(); diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManagerTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManagerTest.java index 3e039e4f4..f8fb57700 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManagerTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/zip/ZipStrategyManagerTest.java @@ -19,6 +19,8 @@ import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions; import org.junit.Test; +import java.util.zip.Deflater; + import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -30,9 +32,15 @@ public class ZipStrategyManagerTest { @Test public void testInit() { RheaKVStoreOptions opts = new RheaKVStoreOptions(); + opts.setCompressLevel(Deflater.NO_COMPRESSION); + ZipStrategyManager.init(opts); + ZipStrategy zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.JDK_STRATEGY); + assertNotNull(zipStrategy); + assertTrue(zipStrategy instanceof JDKZipStrategy); + opts.setUseParallelCompress(true); ZipStrategyManager.init(opts); - ZipStrategy zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.PARALLEL_STRATEGY); + zipStrategy = ZipStrategyManager.getZipStrategy(ZipStrategyManager.PARALLEL_STRATEGY); assertNotNull(zipStrategy); assertTrue(zipStrategy instanceof ParallelZipStrategy); } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/util/ZipUtilTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/util/ZipUtilTest.java index ed5c63553..d0ce4a1cd 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/util/ZipUtilTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/util/ZipUtilTest.java @@ -21,6 +21,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.zip.Checksum; +import java.util.zip.Deflater; import org.apache.commons.io.FileUtils; import org.junit.After; @@ -72,7 +73,7 @@ public void zipTest() throws IOException { final String rootPath = this.sourceDir.toPath().toAbsolutePath().getParent().toString(); final Path outPath = Paths.get(rootPath, "kv.zip"); final Checksum c1 = new CRC64(); - ZipUtil.compress(rootPath, "zip_test", outPath.toString(), c1); + ZipUtil.compress(rootPath, "zip_test", outPath.toString(), c1, Deflater.DEFAULT_COMPRESSION); System.out.println(Long.toHexString(c1.getValue()));