From 1775f51e2505e32e5db24bf6f18a3cfefa5d69e0 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 16 Jan 2024 16:13:19 -0800 Subject: [PATCH] ORC-XXX: Support orc.compression.zstd.workers --- .../core/src/java/org/apache/orc/OrcConf.java | 3 +++ .../core/src/java/org/apache/orc/OrcFile.java | 12 ++++++++++ .../org/apache/orc/impl/PhysicalFsWriter.java | 1 + .../java/org/apache/orc/impl/ZstdCodec.java | 24 ++++++++++++++----- 4 files changed, 34 insertions(+), 6 deletions(-) diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index d4bebe2cd4..59ac2c8364 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -80,6 +80,9 @@ public enum OrcConf { "hive.exec.orc.compression.zstd.windowlog", 0, "Set the maximum allowed back-reference distance for " + "ZStandard codec, expressed as power of 2."), + COMPRESSION_ZSTD_WORKERS("orc.compression.zstd.workers", + "hive.exec.orc.compression.zstd.workers", 0, + "Define the number of workers to use with ZStandard codec while writing data."), BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance", "hive.exec.orc.block.padding.tolerance", 0.05, "Define the tolerance for block padding as a decimal fraction of\n" + diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index dfe3088fbc..f2bb68c6ab 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -430,6 +430,8 @@ public static class ZstdCompressOptions { private int compressionZstdLevel; private int compressionZstdWindowLog; + private int compressionZstdWorkers; + public int getCompressionZstdLevel() { return compressionZstdLevel; } @@ -445,6 +447,14 @@ public int getCompressionZstdWindowLog() { public void setCompressionZstdWindowLog(int compressionZstdWindowLog) { this.compressionZstdWindowLog = compressionZstdWindowLog; } + + public int getCompressionZstdWorkers() { + return compressionZstdWorkers; + } + + public void setCompressionZstdWorkers(int compressionZstdWorkers) { + this.compressionZstdWorkers = compressionZstdWorkers; + } } /** @@ -520,6 +530,8 @@ protected WriterOptions(Properties tableProperties, Configuration conf) { OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf)); zstdCompressOptions.setCompressionZstdWindowLog( OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf)); + zstdCompressOptions.setCompressionZstdWorkers( + OrcConf.COMPRESSION_ZSTD_WORKERS.getInt(tableProperties, conf)); paddingTolerance = OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf); diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 4eb5f85623..39b71a8771 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -122,6 +122,7 @@ public PhysicalFsWriter(FSDataOutputStream outputStream, if (zstdCompressOptions != null) { options.setLevel(zstdCompressOptions.getCompressionZstdLevel()); options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog()); + options.setWorkers(zstdCompressOptions.getCompressionZstdWorkers()); } } compress.withCodec(codec, tempOptions); diff --git a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java index cdbf1f3fda..60182c9e34 100644 --- a/java/core/src/java/org/apache/orc/impl/ZstdCodec.java +++ b/java/core/src/java/org/apache/orc/impl/ZstdCodec.java @@ -29,12 +29,12 @@ public class ZstdCodec implements CompressionCodec { private ZstdOptions zstdOptions = null; private ZstdCompressCtx zstdCompressCtx = null; - public ZstdCodec(int level, int windowLog) { - this.zstdOptions = new ZstdOptions(level, windowLog); + public ZstdCodec(int level, int windowLog, int workers) { + this.zstdOptions = new ZstdOptions(level, windowLog, workers); } public ZstdCodec() { - this(1, 0); + this(1, 0, 0); } public ZstdOptions getZstdOptions() { @@ -58,14 +58,17 @@ static class ZstdOptions implements Options { private int level; private int windowLog; - ZstdOptions(int level, int windowLog) { + private int workers; + + ZstdOptions(int level, int windowLog, int workers) { this.level = level; this.windowLog = windowLog; + this.workers = workers; } @Override public ZstdOptions copy() { - return new ZstdOptions(level, windowLog); + return new ZstdOptions(level, windowLog, workers); } @Override @@ -123,6 +126,14 @@ public ZstdOptions setLevel(int newValue) { return this; } + public ZstdOptions setWorkers(int newValue) { + if (newValue < 0) { + throw new IllegalArgumentException("The number of workers should be non-negative."); + } + workers = newValue; + return this; + } + @Override public ZstdOptions setData(DataKind newValue) { return this; // We don't support setting DataKind in ZstdCodec. @@ -148,7 +159,7 @@ public int hashCode() { } private static final ZstdOptions DEFAULT_OPTIONS = - new ZstdOptions(1, 0); + new ZstdOptions(1, 0, 0); @Override public Options getDefaultOptions() { @@ -177,6 +188,7 @@ public boolean compress(ByteBuffer in, ByteBuffer out, zstdCompressCtx.setLevel(zso.level); zstdCompressCtx.setLong(zso.windowLog); zstdCompressCtx.setChecksum(false); + zstdCompressCtx.setWorkers(zso.workers); try { int inBytes = in.remaining();