From db68ff5a330cae630ef565a1dc7700a462563719 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Sat, 9 Jun 2018 23:37:31 +0200 Subject: [PATCH] ORC-386 Add spark benchmarks. I also refactored all of the old benchmarks to reduce the common code. I also split it into three modules so that I could separate the common code, the code that depends on hive, and the code that depends on spark. Avoiding building an uber jar that has both hive and spark made life much easier. --- java/bench/README.md | 22 +- java/bench/core/pom.xml | 141 ++++ java/bench/{ => core}/src/assembly/uber.xml | 0 .../bench/{ => core}/src/findbugs/exclude.xml | 0 .../hadoop/fs/TrackingLocalFileSystem.java | 10 +- .../orc/bench/core/BenchmarkOptions.java | 63 ++ .../orc/bench/core}/CompressionKind.java | 19 +- .../org/apache/orc/bench/core/Driver.java | 65 ++ .../orc/bench/core}/NullFileSystem.java | 15 +- .../apache/orc/bench/core/OrcBenchmark.java | 44 ++ .../orc/bench/core}/RandomGenerator.java | 2 +- .../apache/orc/bench/core/ReadCounters.java | 86 +++ .../apache/orc/bench/core/RecordCounters.java | 52 ++ .../orc/bench/core}/SalesGenerator.java | 4 +- .../org/apache/orc/bench/core}/Utilities.java | 111 ++- .../orc/bench/core}/convert/BatchReader.java | 2 +- .../orc/bench/core}/convert/BatchWriter.java | 5 +- .../bench/core}/convert/GenerateVariants.java | 130 ++-- .../orc/bench/core}/convert/ScanVariants.java | 55 +- .../bench/core}/convert/avro/AvroReader.java | 25 +- .../core}/convert/avro/AvroSchemaUtils.java | 2 +- .../bench/core}/convert/avro/AvroWriter.java | 52 +- .../bench/core}/convert/csv/CsvReader.java | 9 +- .../bench/core}/convert/json/JsonReader.java | 8 +- .../bench/core}/convert/json/JsonWriter.java | 6 +- .../bench/core}/convert/orc/OrcReader.java | 4 +- .../bench/core}/convert/orc/OrcWriter.java | 8 +- .../core/convert/parquet/ParquetReader.java | 66 ++ .../core}/convert/parquet/ParquetWriter.java | 81 ++- .../main => core/src}/resources/github.schema | 0 .../src}/resources/log4j.properties | 0 .../main => core/src}/resources/sales.schema | 0 .../main => core/src}/resources/taxi.schema | 0 java/bench/hive/pom.xml | 138 ++++ java/bench/hive/src/assembly/uber.xml | 33 + java/bench/hive/src/findbugs/exclude.xml | 25 + .../hive/ql/io/orc/OrcBenchmarkUtilities.java | 2 +- .../hive}/ColumnProjectionBenchmark.java | 99 +-- .../apache/orc/bench/hive}/DecimalBench.java | 97 ++- .../orc/bench/hive}/FullReadBenchmark.java | 122 ++-- java/bench/pom.xml | 643 +++++++++++++----- java/bench/spark/pom.xml | 203 ++++++ .../orc/bench/spark/SparkBenchmark.java | 292 ++++++++ .../apache/orc/bench/spark/SparkSchema.java | 95 +++ .../src/java/org/apache/orc/bench/Driver.java | 82 --- .../bench/convert/parquet/ParquetReader.java | 297 -------- java/pom.xml | 4 + 47 files changed, 2207 insertions(+), 1012 deletions(-) create mode 100644 java/bench/core/pom.xml rename java/bench/{ => core}/src/assembly/uber.xml (100%) rename java/bench/{ => core}/src/findbugs/exclude.xml (100%) rename java/bench/{ => core}/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java (93%) create mode 100644 java/bench/core/src/java/org/apache/orc/bench/core/BenchmarkOptions.java rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/CompressionKind.java (85%) create mode 100644 java/bench/core/src/java/org/apache/orc/bench/core/Driver.java rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/NullFileSystem.java (87%) create mode 100644 java/bench/core/src/java/org/apache/orc/bench/core/OrcBenchmark.java rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/RandomGenerator.java (99%) create mode 100644 java/bench/core/src/java/org/apache/orc/bench/core/ReadCounters.java create mode 100644 java/bench/core/src/java/org/apache/orc/bench/core/RecordCounters.java rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/SalesGenerator.java (99%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/Utilities.java (52%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/BatchReader.java (96%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/BatchWriter.java (90%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/GenerateVariants.java (86%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/ScanVariants.java (68%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/avro/AvroReader.java (94%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/avro/AvroSchemaUtils.java (99%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/avro/AvroWriter.java (90%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/csv/CsvReader.java (95%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/json/JsonReader.java (97%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/json/JsonWriter.java (98%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/orc/OrcReader.java (94%) rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/orc/OrcWriter.java (89%) create mode 100644 java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetReader.java rename java/bench/{src/java/org/apache/orc/bench => core/src/java/org/apache/orc/bench/core}/convert/parquet/ParquetWriter.java (53%) rename java/bench/{src/main => core/src}/resources/github.schema (100%) rename java/bench/{src/main => core/src}/resources/log4j.properties (100%) rename java/bench/{src/main => core/src}/resources/sales.schema (100%) rename java/bench/{src/main => core/src}/resources/taxi.schema (100%) create mode 100644 java/bench/hive/pom.xml create mode 100644 java/bench/hive/src/assembly/uber.xml create mode 100644 java/bench/hive/src/findbugs/exclude.xml rename java/bench/{ => hive}/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java (97%) rename java/bench/{src/java/org/apache/orc/bench => hive/src/java/org/apache/orc/bench/hive}/ColumnProjectionBenchmark.java (67%) rename java/bench/{src/java/org/apache/orc/bench => hive/src/java/org/apache/orc/bench/hive}/DecimalBench.java (74%) rename java/bench/{src/java/org/apache/orc/bench => hive/src/java/org/apache/orc/bench/hive}/FullReadBenchmark.java (66%) create mode 100644 java/bench/spark/pom.xml create mode 100644 java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java create mode 100644 java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java delete mode 100644 java/bench/src/java/org/apache/orc/bench/Driver.java delete mode 100644 java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java diff --git a/java/bench/README.md b/java/bench/README.md index 12cedeaf33..f49404d076 100644 --- a/java/bench/README.md +++ b/java/bench/README.md @@ -7,6 +7,12 @@ These big data file format benchmarks, compare: * ORC * Parquet +There are three sub-modules to try to mitigate dependency hell: + +* core - the shared part of the benchmarks +* hive - the Hive benchmarks +* spark - the Spark benchmarks + To build this library: ```% mvn clean package``` @@ -17,17 +23,25 @@ To fetch the source data: To generate the derived data: -```% java -jar target/orc-benchmarks-*-uber.jar generate data``` +```% java -jar core/target/orc-benchmarks-core-*-uber.jar generate data``` To run a scan of all of the data: -```% java -jar target/orc-benchmarks-*-uber.jar scan data``` +```% java -jar core/target/orc-benchmarks-core-*-uber.jar scan data``` To run full read benchmark: -```% java -jar target/orc-benchmarks-*-uber.jar read-all data``` +```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar read-all data``` To run column projection benchmark: -```% java -jar target/orc-benchmarks-*-uber.jar read-some data``` +```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar read-some data``` + +To run decimal/decimal64 benchmark: + +```% java -jar hive/target/orc-benchmarks-hive-*-uber.jar decimal data``` + +To run spark benchmark: + +```% java -jar spark/target/orc-benchmarks-spark-*.jar spark data``` diff --git a/java/bench/core/pom.xml b/java/bench/core/pom.xml new file mode 100644 index 0000000000..d0dcc695c9 --- /dev/null +++ b/java/bench/core/pom.xml @@ -0,0 +1,141 @@ + + + + 4.0.0 + + org.apache.orc + orc-benchmarks + 1.6.0-SNAPSHOT + .. + + + org.apache.orc + orc-benchmarks-core + 1.6.0-SNAPSHOT + jar + ORC Benchmarks Core + + The core parts of the benchmarks for comparing performance across formats. + + + + + com.fasterxml.jackson.core + jackson-core + + + com.google.auto.service + auto-service + + + com.google.code.gson + gson + + + commons-cli + commons-cli + + + io.airlift + aircompressor + + + org.apache.avro + avro + + + org.apache.avro + avro-mapred + hadoop2 + + + org.apache.commons + commons-csv + + + org.apache.hadoop + hadoop-common + + + org.apache.hive + hive-storage-api + + + org.apache.orc + orc-core + + + org.apache.parquet + parquet-avro + + + org.apache.parquet + parquet-hadoop + + + org.openjdk.jmh + jmh-core + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + + + ${basedir}/src/java + ${basedir}/src/test + + + src/resources + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-enforcer-plugin + + + maven-assembly-plugin + + + + org.apache.orc.bench.core.Driver + + + + + + + + + + cmake + + ${build.dir}/bench/core + + + + diff --git a/java/bench/src/assembly/uber.xml b/java/bench/core/src/assembly/uber.xml similarity index 100% rename from java/bench/src/assembly/uber.xml rename to java/bench/core/src/assembly/uber.xml diff --git a/java/bench/src/findbugs/exclude.xml b/java/bench/core/src/findbugs/exclude.xml similarity index 100% rename from java/bench/src/findbugs/exclude.xml rename to java/bench/core/src/findbugs/exclude.xml diff --git a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java b/java/bench/core/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java similarity index 93% rename from java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java rename to java/bench/core/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java index 0440495033..bd3b02748c 100644 --- a/java/bench/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java +++ b/java/bench/core/src/java/org/apache/hadoop/fs/TrackingLocalFileSystem.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,10 +19,13 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; public class TrackingLocalFileSystem extends RawLocalFileSystem { + static final URI NAME = URI.create("track:///"); class TrackingFileInputStream extends RawLocalFileSystem.LocalFSFileInputStream { + public TrackingFileInputStream(Path f) throws IOException { super(f); } @@ -51,6 +54,11 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { new TrackingFileInputStream(f), bufferSize)); } + @Override + public URI getUri() { + return NAME; + } + public FileSystem.Statistics getLocalStatistics() { return statistics; } diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/BenchmarkOptions.java b/java/bench/core/src/java/org/apache/orc/bench/core/BenchmarkOptions.java new file mode 100644 index 0000000000..a64c605759 --- /dev/null +++ b/java/bench/core/src/java/org/apache/orc/bench/core/BenchmarkOptions.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.core; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +public class BenchmarkOptions { + + public static final String HELP = "help"; + public static final String ITERATIONS = "iterations"; + public static final String WARMUP_ITERATIONS = "warmup-iterations"; + public static final String FORK = "fork"; + public static final String TIME = "time"; + public static final String MIN_MEMORY = "min-memory"; + public static final String MAX_MEMORY = "max-memory"; + public static final String GC = "gc"; + + public static CommandLine parseCommandLine(String[] args) { + Options options = new Options() + .addOption("h", HELP, false, "Provide help") + .addOption("i", ITERATIONS, true, "Number of iterations") + .addOption("I", WARMUP_ITERATIONS, true, "Number of warmup iterations") + .addOption("f", FORK, true, "How many forks to use") + .addOption("t", TIME, true, "How long each iteration is in seconds") + .addOption("m", MIN_MEMORY, true, "The minimum size of each JVM") + .addOption("M", MAX_MEMORY, true, "The maximum size of each JVM") + .addOption("g", GC, false, "Should GC be profiled"); + CommandLine result; + try { + result = new DefaultParser().parse(options, args, true); + } catch (ParseException pe) { + System.err.println("Argument exception - " + pe.getMessage()); + result = null; + } + if (result == null || result.hasOption(HELP) || result.getArgs().length == 0) { + new HelpFormatter().printHelp("java -jar ", + options); + System.err.println(); + System.exit(1); + } + return result; + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java b/java/bench/core/src/java/org/apache/orc/bench/core/CompressionKind.java similarity index 85% rename from java/bench/src/java/org/apache/orc/bench/CompressionKind.java rename to java/bench/core/src/java/org/apache/orc/bench/core/CompressionKind.java index 9274de3ea4..2cd783d547 100644 --- a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/CompressionKind.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.core; import io.airlift.compress.snappy.SnappyCodec; import org.apache.hadoop.fs.Path; @@ -31,9 +31,9 @@ * Enum for handling the compression codecs for the benchmark */ public enum CompressionKind { - NONE(".none"), - ZLIB(".gz"), - SNAPPY(".snappy"); + NONE("none"), + ZLIB("gz"), + SNAPPY("snappy"); CompressionKind(String extendsion) { this.extension = extendsion; @@ -77,11 +77,20 @@ public static CompressionKind fromPath(Path path) { if (lastDot >= 0) { String ext = name.substring(lastDot); for (CompressionKind value : values()) { - if (ext.equals(value.getExtension())) { + if (ext.equals("." + value.getExtension())) { return value; } } } return NONE; } + + public static CompressionKind fromExtension(String extension) { + for (CompressionKind value: values()) { + if (value.extension.equals(extension)) { + return value; + } + } + throw new IllegalArgumentException("Unknown compression " + extension); + } } diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/Driver.java b/java/bench/core/src/java/org/apache/orc/bench/core/Driver.java new file mode 100644 index 0000000000..08b1288a0e --- /dev/null +++ b/java/bench/core/src/java/org/apache/orc/bench/core/Driver.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.core; + +import java.util.Arrays; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.TreeMap; + +/** + * A driver tool to call the various benchmark classes. + */ +public class Driver { + private static final ServiceLoader loader = + ServiceLoader.load(OrcBenchmark.class); + + private static Map getBenchmarks() { + Map result = new TreeMap<>(); + for(OrcBenchmark bench: loader) { + result.put(bench.getName(), bench); + } + return result; + } + + private static final String PATTERN = " %10s - %s"; + + private static void printUsageAndExit(Map benchmarks) { + System.err.println("Commands:"); + for(OrcBenchmark bench: benchmarks.values()) { + System.err.println(String.format(PATTERN, bench.getName(), + bench.getDescription())); + } + System.exit(1); + } + + public static void main(String[] args) throws Exception { + Map benchmarks = getBenchmarks(); + if (args.length == 0) { + printUsageAndExit(benchmarks); + } + String command = args[0]; + args = Arrays.copyOfRange(args, 1, args.length); + OrcBenchmark bench = benchmarks.get(command); + if (bench == null) { + printUsageAndExit(benchmarks); + } + bench.run(args); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/NullFileSystem.java b/java/bench/core/src/java/org/apache/orc/bench/core/NullFileSystem.java similarity index 87% rename from java/bench/src/java/org/apache/orc/bench/NullFileSystem.java rename to java/bench/core/src/java/org/apache/orc/bench/core/NullFileSystem.java index 23d19cceae..0907d623e0 100644 --- a/java/bench/src/java/org/apache/orc/bench/NullFileSystem.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/NullFileSystem.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.core; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,10 +43,10 @@ public URI getUri() { } @Override - public FSDataInputStream open(Path path, int i) throws IOException { + public FSDataInputStream open(Path path, int i) { return new FSDataInputStream(new InputStream() { @Override - public int read() throws IOException { + public int read() { return -1; } }); @@ -74,14 +73,14 @@ public FSDataOutputStream create(Path path, short i1, long l, Progressable progressable) throws IOException { - return new FSDataOutputStream(NULL_OUTPUT); + return new FSDataOutputStream(NULL_OUTPUT, null); } @Override public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException { - return new FSDataOutputStream(NULL_OUTPUT); + return new FSDataOutputStream(NULL_OUTPUT, null); } @Override @@ -110,12 +109,12 @@ public Path getWorkingDirectory() { } @Override - public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { + public boolean mkdirs(Path path, FsPermission fsPermission) { return false; } @Override - public FileStatus getFileStatus(Path path) throws IOException { + public FileStatus getFileStatus(Path path) { return null; } } diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/OrcBenchmark.java b/java/bench/core/src/java/org/apache/orc/bench/core/OrcBenchmark.java new file mode 100644 index 0000000000..63290fadfe --- /dev/null +++ b/java/bench/core/src/java/org/apache/orc/bench/core/OrcBenchmark.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.core; + +/** + * API to support adding additional benchmarks to the Driver. + */ +public interface OrcBenchmark { + + /** + * Get the name of the subcommand to invoke this benchmark. + * @return a simple string, hopefully lowercase + */ + String getName(); + + /** + * The human readable description of this benchmark + * @return + */ + String getDescription(); + + /** + * Run the benchmark + * @param args the arguments from the user + * @throws Exception + */ + void run(String[] args) throws Exception; +} diff --git a/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java b/java/bench/core/src/java/org/apache/orc/bench/core/RandomGenerator.java similarity index 99% rename from java/bench/src/java/org/apache/orc/bench/RandomGenerator.java rename to java/bench/core/src/java/org/apache/orc/bench/core/RandomGenerator.java index dfe7d43b25..922077544e 100644 --- a/java/bench/src/java/org/apache/orc/bench/RandomGenerator.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/RandomGenerator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.core; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/ReadCounters.java b/java/bench/core/src/java/org/apache/orc/bench/core/ReadCounters.java new file mode 100644 index 0000000000..6c07458960 --- /dev/null +++ b/java/bench/core/src/java/org/apache/orc/bench/core/ReadCounters.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.core; + +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +/** + * A class to track the number of rows, bytes, and read operations that have + * been read. + */ +@AuxCounters(AuxCounters.Type.EVENTS) +@State(Scope.Thread) +public class ReadCounters { + long bytesRead; + long reads; + RecordCounters recordCounters; + + @Setup(Level.Iteration) + public void setup(RecordCounters records) { + bytesRead = 0; + reads = 0; + recordCounters = records; + } + + @TearDown(Level.Iteration) + public void print() { + if (recordCounters != null) { + recordCounters.print(); + } + System.out.println("Reads: " + reads); + System.out.println("Bytes: " + bytesRead); + } + + public double bytesPerRecord() { + return recordCounters == null || recordCounters.records == 0 ? + 0 : ((double) bytesRead) / recordCounters.records; + } + + public long records() { + return recordCounters == null || recordCounters.invocations == 0 ? + 0 : recordCounters.records / recordCounters.invocations; + } + + public long reads() { + return recordCounters == null || recordCounters.invocations == 0 ? + 0 : reads / recordCounters.invocations; + } + + public void addRecords(long value) { + if (recordCounters != null) { + recordCounters.records += value; + } + } + + public void addInvocation() { + if (recordCounters != null) { + recordCounters.invocations += 1; + } + } + + public void addBytes(long newReads, long newBytes) { + bytesRead += newBytes; + reads += newReads; + } +} diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/RecordCounters.java b/java/bench/core/src/java/org/apache/orc/bench/core/RecordCounters.java new file mode 100644 index 0000000000..7cc079b8f1 --- /dev/null +++ b/java/bench/core/src/java/org/apache/orc/bench/core/RecordCounters.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.core; + +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +/** + * A class to track the number of rows that have been read. + */ +@AuxCounters(AuxCounters.Type.OPERATIONS) +@State(Scope.Thread) +public class RecordCounters { + long records; + long invocations; + + @Setup(Level.Iteration) + public void setup() { + records = 0; + invocations = 0; + } + + public long perRecord() { + return records; + } + + public void print() { + System.out.println(); + System.out.println("Records: " + records); + System.out.println("Invocations: " + invocations); + } +} + diff --git a/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java b/java/bench/core/src/java/org/apache/orc/bench/core/SalesGenerator.java similarity index 99% rename from java/bench/src/java/org/apache/orc/bench/SalesGenerator.java rename to java/bench/core/src/java/org/apache/orc/bench/core/SalesGenerator.java index 2be3537ed1..9ac1264aac 100644 --- a/java/bench/src/java/org/apache/orc/bench/SalesGenerator.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/SalesGenerator.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.core; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchReader; +import org.apache.orc.bench.core.convert.BatchReader; public class SalesGenerator implements BatchReader { private final RandomGenerator generator; diff --git a/java/bench/src/java/org/apache/orc/bench/Utilities.java b/java/bench/core/src/java/org/apache/orc/bench/core/Utilities.java similarity index 52% rename from java/bench/src/java/org/apache/orc/bench/Utilities.java rename to java/bench/core/src/java/org/apache/orc/bench/core/Utilities.java index 7016f5e07b..dad605c521 100644 --- a/java/bench/src/java/org/apache/orc/bench/Utilities.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/Utilities.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,19 +16,20 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.core; -import org.apache.hadoop.conf.Configuration; +import org.apache.commons.cli.CommandLine; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.orc.TypeDescription; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; +import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Properties; public class Utilities { @@ -62,66 +63,48 @@ public static org.apache.orc.CompressionKind getCodec(CompressionKind compressio } } - public static Iterable sliceArray(final String[] array, - final int start) { - return new Iterable() { - String[] values = array; - int posn = start; - - @Override - public Iterator iterator() { - return new Iterator() { - @Override - public boolean hasNext() { - return posn < values.length; - } - - @Override - public String next() { - if (posn >= values.length) { - throw new NoSuchElementException("Index off end of array." + - " index = " + posn + " length = " + values.length); - } else { - return values[posn++]; - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("No remove"); - } - }; - } - }; - } - - public static Properties convertSchemaToHiveConfig(TypeDescription schema) { - Properties result = new Properties(); - if (schema.getCategory() != TypeDescription.Category.STRUCT) { - throw new IllegalArgumentException("Hive requires struct root types" + - " instead of " + schema); - } - StringBuilder columns = new StringBuilder(); - StringBuilder types = new StringBuilder(); - List columnNames = schema.getFieldNames(); - List columnTypes = schema.getChildren(); - for(int c=0; c < columnNames.size(); ++c) { - if (c != 0) { - columns.append(","); - types.append(","); - } - columns.append(columnNames.get(c)); - types.append(columnTypes.get(c)); - } - result.setProperty(serdeConstants.LIST_COLUMNS, columns.toString()); - result.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString()); - return result; - } - public static Path getVariant(Path root, String data, String format, String compress) { return new Path(root, "generated/" + data + "/" + format + "." + compress); } + + private static final String ROOT_PROPERTY_NAME = "bench.root.dir"; + + /** + * Get the benchmark data root in the child jvm. + * @return the path to the benchmark data or null if it wasn't found + */ + public static Path getBenchmarkRoot() { + String value = System.getProperty(ROOT_PROPERTY_NAME); + return value == null ? null : new Path(value); + } + + public static Options parseOptions(String[] args, + Class cls) throws IOException { + CommandLine options = BenchmarkOptions.parseCommandLine(args); + String dataPath = new File(options.getArgs()[0]).getCanonicalPath(); + OptionsBuilder builder = new OptionsBuilder(); + builder.include(cls.getSimpleName()); + if (options.hasOption(BenchmarkOptions.GC)) { + builder.addProfiler("hs_gc"); + } + builder.measurementIterations(Integer.parseInt(options.getOptionValue( + BenchmarkOptions.ITERATIONS, "5"))); + builder.warmupIterations(Integer.parseInt(options.getOptionValue( + BenchmarkOptions.WARMUP_ITERATIONS, "2"))); + builder.forks(Integer.parseInt(options.getOptionValue( + BenchmarkOptions.FORK, "1"))); + TimeValue iterationTime = TimeValue.seconds(Long.parseLong( + options.getOptionValue(BenchmarkOptions.TIME, "10"))); + builder.measurementTime(iterationTime); + builder.warmupTime(iterationTime); + String minMemory = options.getOptionValue(BenchmarkOptions.MIN_MEMORY, "256m"); + String maxMemory = options.getOptionValue(BenchmarkOptions.MAX_MEMORY, "2g"); + builder.jvmArgs("-server", + "-Xms"+ minMemory, "-Xmx" + maxMemory, + "-D" + ROOT_PROPERTY_NAME + "=" + dataPath); + return builder.build(); + } } diff --git a/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/BatchReader.java similarity index 96% rename from java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/BatchReader.java index b9ea3567ef..9a127ffda9 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/BatchReader.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/BatchReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench.convert; +package org.apache.orc.bench.core.convert; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/BatchWriter.java similarity index 90% rename from java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/BatchWriter.java index c79d93736b..2d75ee1e63 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/BatchWriter.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/BatchWriter.java @@ -16,16 +16,17 @@ * limitations under the License. */ -package org.apache.orc.bench.convert; +package org.apache.orc.bench.core.convert; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import java.io.Closeable; import java.io.IOException; /** * Generic interface for writing data. */ -public interface BatchWriter extends AutoCloseable { +public interface BatchWriter extends Closeable { void writeBatch(VectorizedRowBatch batch) throws IOException; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/GenerateVariants.java similarity index 86% rename from java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/GenerateVariants.java index 57cf4c9370..f4c9bc6824 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/GenerateVariants.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/GenerateVariants.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.orc.bench.convert; +package org.apache.orc.bench.core.convert; +import com.google.auto.service.AutoService; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; @@ -30,25 +31,27 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.CompressionKind; -import org.apache.orc.bench.SalesGenerator; -import org.apache.orc.bench.Utilities; -import org.apache.orc.bench.convert.avro.AvroReader; -import org.apache.orc.bench.convert.avro.AvroWriter; -import org.apache.orc.bench.convert.csv.CsvReader; -import org.apache.orc.bench.convert.json.JsonReader; -import org.apache.orc.bench.convert.json.JsonWriter; -import org.apache.orc.bench.convert.orc.OrcReader; -import org.apache.orc.bench.convert.orc.OrcWriter; -import org.apache.orc.bench.convert.parquet.ParquetReader; -import org.apache.orc.bench.convert.parquet.ParquetWriter; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.SalesGenerator; +import org.apache.orc.bench.core.Utilities; +import org.apache.orc.bench.core.convert.avro.AvroReader; +import org.apache.orc.bench.core.convert.avro.AvroWriter; +import org.apache.orc.bench.core.convert.csv.CsvReader; +import org.apache.orc.bench.core.convert.json.JsonReader; +import org.apache.orc.bench.core.convert.json.JsonWriter; +import org.apache.orc.bench.core.convert.orc.OrcReader; +import org.apache.orc.bench.core.convert.orc.OrcWriter; +import org.apache.orc.bench.core.convert.parquet.ParquetReader; +import org.apache.orc.bench.core.convert.parquet.ParquetWriter; import java.io.IOException; /** * A tool to create the different variants that we need to benchmark against. */ -public class GenerateVariants { +@AutoService(OrcBenchmark.class) +public class GenerateVariants implements OrcBenchmark { public static BatchWriter createFileWriter(Path file, String format, @@ -95,6 +98,61 @@ public static BatchReader createFileReader(Path file, } } + @Override + public String getName() { + return "generate"; + } + + @Override + public String getDescription() { + return "generate all of the data variants"; + } + + @Override + public void run(String[] args) throws Exception { + CommandLine cli = parseCommandLine(args); + String[] compressList = + cli.getOptionValue("compress", "none,snappy,zlib").split(","); + String[] dataList = + cli.getOptionValue("data", "taxi,sales,github").split(","); + String[] formatList = + cli.getOptionValue("format", "avro,json,orc,parquet").split(","); + long records = Long.parseLong(cli.getOptionValue("sales", "25000000")); + Configuration conf = new Configuration(); + Path root = new Path(cli.getArgs()[0]); + for(String data: dataList) { + // Set up the reader + TypeDescription schema = Utilities.loadSchema(data + ".schema"); + BatchReader reader = createReader(root, data, schema, conf, records); + + // Set up the writers for each combination + BatchWriter[] writers = new BatchWriter[compressList.length * formatList.length]; + for(int compress=0; compress < compressList.length; ++compress) { + CompressionKind compressionKind = + CompressionKind.valueOf(compressList[compress].toUpperCase()); + for(int format=0; format < formatList.length; ++format) { + Path outPath = Utilities.getVariant(root, data, formatList[format], + compressionKind.getExtension()); + writers[compress * formatList.length + format] = + createFileWriter(outPath, formatList[format], schema, conf, + compressionKind); + } + } + + // Copy the rows + VectorizedRowBatch batch = schema.createRowBatch(); + while (reader.nextBatch(batch)) { + for(BatchWriter writer: writers) { + writer.writeBatch(batch); + } + } + reader.close(); + for(BatchWriter writer: writers) { + writer.close(); + } + } + } + public static class RecursiveReader implements BatchReader { private final RemoteIterator filenames; private final String format; @@ -173,48 +231,4 @@ static CommandLine parseCommandLine(String[] args) throws ParseException { } return result; } - - public static void main(String[] args) throws Exception { - CommandLine cli = parseCommandLine(args); - String[] compressList = - cli.getOptionValue("compress", "none,snappy,zlib").split(","); - String[] dataList = - cli.getOptionValue("data", "taxi,sales,github").split(","); - String[] formatList = - cli.getOptionValue("format", "avro,json,orc,parquet").split(","); - long records = Long.parseLong(cli.getOptionValue("sales", "25000000")); - Configuration conf = new Configuration(); - Path root = new Path(cli.getArgs()[0]); - for(String data: dataList) { - // Set up the reader - TypeDescription schema = Utilities.loadSchema(data + ".schema"); - BatchReader reader = createReader(root, data, schema, conf, records); - - // Set up the writers for each combination - BatchWriter[] writers = new BatchWriter[compressList.length * formatList.length]; - for(int compress=0; compress < compressList.length; ++compress) { - CompressionKind compressionKind = - CompressionKind.valueOf(compressList[compress].toUpperCase()); - for(int format=0; format < formatList.length; ++format) { - Path outPath = Utilities.getVariant(root, data, formatList[format], - compressList[compress]); - writers[compress * formatList.length + format] = - createFileWriter(outPath, formatList[format], schema, conf, - compressionKind); - } - } - - // Copy the rows - VectorizedRowBatch batch = schema.createRowBatch(); - while (reader.nextBatch(batch)) { - for(BatchWriter writer: writers) { - writer.writeBatch(batch); - } - } - reader.close(); - for(BatchWriter writer: writers) { - writer.close(); - } - } - } } diff --git a/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/ScanVariants.java similarity index 68% rename from java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/ScanVariants.java index ae76238e8e..14c570d747 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/ScanVariants.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/ScanVariants.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.orc.bench.convert; +package org.apache.orc.bench.core.convert; +import com.google.auto.service.AutoService; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; @@ -27,13 +28,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.CompressionKind; -import org.apache.orc.bench.Utilities; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.Utilities; /** * A tool to create the different variants that we need to benchmark against. */ -public class ScanVariants { +@AutoService(OrcBenchmark.class) +public class ScanVariants implements OrcBenchmark { static CommandLine parseCommandLine(String[] args) throws ParseException { @@ -50,10 +53,21 @@ static CommandLine parseCommandLine(String[] args) throws ParseException { return result; } - public static void main(String[] args) throws Exception { + @Override + public String getName() { + return "scan"; + } + + @Override + public String getDescription() { + return "scan all of the data variants"; + } + + @Override + public void run(String[] args) throws Exception { CommandLine cli = parseCommandLine(args); String[] compressList = - cli.getOptionValue("compress", "none,snappy,zlib").split(","); + cli.getOptionValue("compress", "none,snappy,gz").split(","); String[] dataList = cli.getOptionValue("data", "taxi,sales,github").split(","); String[] formatList = @@ -64,22 +78,21 @@ public static void main(String[] args) throws Exception { TypeDescription schema = Utilities.loadSchema(data + ".schema"); VectorizedRowBatch batch = schema.createRowBatch(); for (String compress : compressList) { - CompressionKind compressKind = - CompressionKind.valueOf(compress.toUpperCase()); + CompressionKind compressKind = CompressionKind.fromExtension(compress); for (String format : formatList) { - Path filename = Utilities.getVariant(root, data, format, - compress); - BatchReader reader = GenerateVariants.createFileReader(filename, - format, schema, conf, compressKind); - long rows = 0; - long batches = 0; - while (reader.nextBatch(batch)) { - batches += 1; - rows += batch.size; - } - System.out.println(filename + " rows: " + rows + " batches: " - + batches); - reader.close(); + Path filename = Utilities.getVariant(root, data, format, + compress); + BatchReader reader = GenerateVariants.createFileReader(filename, + format, schema, conf, compressKind); + long rows = 0; + long batches = 0; + while (reader.nextBatch(batch)) { + batches += 1; + rows += batch.size; + } + System.out.println(filename + " rows: " + rows + " batches: " + + batches); + reader.close(); } } } diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java similarity index 94% rename from java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java index fc354d6c49..0db7746f51 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroReader.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench.convert.avro; +package org.apache.orc.bench.core.convert.avro; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericData; @@ -38,7 +38,7 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchReader; +import org.apache.orc.bench.core.convert.BatchReader; import java.io.IOException; import java.math.BigInteger; @@ -56,11 +56,7 @@ public AvroReader(Path path, FsInput file = new FsInput(path, conf); DatumReader datumReader = new GenericDatumReader<>(); dataFileReader = new DataFileReader<>(file, datumReader); - List children = schema.getChildren(); - converters = new AvroConverter[children.size()]; - for(int c=0; c < converters.length; ++c) { - converters[c] = createConverter(children.get(c)); - } + converters = buildConverters(schema); } @Override @@ -82,10 +78,19 @@ public void close() throws IOException { dataFileReader.close(); } - interface AvroConverter { + public interface AvroConverter { void convert(ColumnVector vector, int row, Object value); } + public static AvroConverter[] buildConverters(TypeDescription orcType) { + List children = orcType.getChildren(); + AvroConverter[] result = new AvroConverter[children.size()]; + for(int c=0; c < result.length; ++c) { + result[c] = createConverter(children.get(c)); + } + return result; + } + private static class BooleanConverter implements AvroConverter { public void convert(ColumnVector cv, int row, Object value) { if (value == null) { @@ -213,12 +218,12 @@ public void convert(ColumnVector cv, int row, Object value) { cv.isNull[row] = true; } else { ListColumnVector tc = (ListColumnVector) cv; - GenericData.Array array = (GenericData.Array) value; + List array = (List) value; int start = tc.childCount; int len = array.size(); tc.childCount += len; tc.child.ensureSize(tc.childCount, true); - for(int i=0; i < len; ++i) { + for (int i = 0; i < len; ++i) { childConverter.convert(tc.child, start + i, array.get(i)); } } diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java similarity index 99% rename from java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java index 6c72a0ee61..96df6b5ba1 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroSchemaUtils.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroSchemaUtils.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.orc.bench.convert.avro; +package org.apache.orc.bench.core.convert.avro; import org.apache.avro.Schema; import org.apache.orc.TypeDescription; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java similarity index 90% rename from java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java index 44defbf579..13e148e242 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/avro/AvroWriter.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/avro/AvroWriter.java @@ -16,14 +16,13 @@ * limitations under the License. */ -package org.apache.orc.bench.convert.avro; +package org.apache.orc.bench.core.convert.avro; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -37,8 +36,8 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchWriter; -import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.core.convert.BatchWriter; +import org.apache.orc.bench.core.CompressionKind; import java.io.IOException; import java.nio.Buffer; @@ -48,7 +47,7 @@ public class AvroWriter implements BatchWriter { - interface AvroConverter { + public interface AvroConverter { Object convert(ColumnVector vector, int row); } @@ -195,6 +194,7 @@ private static class ListConverter implements AvroConverter { removeNullable(avroSchema.getElementType())); } + @SuppressWarnings("unchecked") public Object convert(ColumnVector cv, int row) { if (cv.isRepeating) { row = 0; @@ -246,8 +246,8 @@ public Object convert(ColumnVector cv, int row) { } } - static AvroConverter createConverter(TypeDescription types, - Schema avroSchema) { + public static AvroConverter createConverter(TypeDescription types, + Schema avroSchema) { switch (types.getCategory()) { case BINARY: return new BinaryConverter(); @@ -302,22 +302,28 @@ static Schema removeNullable(Schema avro) { } private final AvroConverter[] converters; - private final DataFileWriter writer; - private final GenericRecord record; + private final DataFileWriter writer; + private final GenericData.Record record; + + public static AvroConverter[] buildConverters(TypeDescription orcType, + Schema avroSchema) { + List childTypes = orcType.getChildren(); + List avroFields = avroSchema.getFields(); + AvroConverter[] result = new AvroConverter[childTypes.size()]; + for(int c=0; c < result.length; ++c) { + result[c] = createConverter(childTypes.get(c), + removeNullable(avroFields.get(c).schema())); + } + return result; + } public AvroWriter(Path path, TypeDescription schema, Configuration conf, CompressionKind compression) throws IOException { - List childTypes = schema.getChildren(); Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema); - List avroFields = avroSchema.getFields(); - converters = new AvroConverter[childTypes.size()]; - for(int c=0; c < converters.length; ++c) { - converters[c] = createConverter(childTypes.get(c), - removeNullable(avroFields.get(c).schema())); - } - GenericDatumWriter gdw = new GenericDatumWriter(avroSchema); - writer = new DataFileWriter(gdw); + GenericDatumWriter gdw = new GenericDatumWriter<>(avroSchema); + writer = new DataFileWriter<>(gdw); + converters = buildConverters(schema, avroSchema); switch (compression) { case NONE: break; @@ -347,17 +353,11 @@ public void close() throws IOException { writer.close(); } - static Buffer getBufferFromBytes(byte[] input) { - ByteBuffer bb = ByteBuffer.wrap(input); - return bb.rewind(); - } - - public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { + static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { if (dec == null) { return null; } - dec = dec.setScale(scale); - return getBufferFromBytes(dec.unscaledValue().toByteArray()); + return ByteBuffer.wrap(dec.bigIntegerBytesScaled(scale)); } } diff --git a/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/csv/CsvReader.java similarity index 95% rename from java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/csv/CsvReader.java index 3246e69c1e..3c68adc5e5 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/csv/CsvReader.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/csv/CsvReader.java @@ -15,13 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.orc.bench.convert.csv; +package org.apache.orc.bench.core.convert.csv; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -33,10 +32,9 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.CompressionKind; -import org.apache.orc.bench.convert.BatchReader; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.convert.BatchReader; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -44,7 +42,6 @@ import java.sql.Timestamp; import java.util.Iterator; import java.util.List; -import java.util.zip.GZIPInputStream; public class CsvReader implements BatchReader { private final Iterator parser; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/json/JsonReader.java similarity index 97% rename from java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/json/JsonReader.java index b4ff3122bb..e12a36a7f4 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonReader.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/json/JsonReader.java @@ -15,14 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.orc.bench.convert.json; +package org.apache.orc.bench.core.convert.json; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonStreamParser; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -36,8 +35,8 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.CompressionKind; -import org.apache.orc.bench.convert.BatchReader; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.convert.BatchReader; import java.io.IOException; import java.io.InputStream; @@ -45,7 +44,6 @@ import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.List; -import java.util.zip.GZIPInputStream; public class JsonReader implements BatchReader { private final TypeDescription schema; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/json/JsonWriter.java similarity index 98% rename from java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/json/JsonWriter.java index bd411154e4..36b06701dc 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/json/JsonWriter.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/json/JsonWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench.convert.json; +package org.apache.orc.bench.core.convert.json; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -35,8 +35,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchWriter; -import org.apache.orc.bench.CompressionKind; +import org.apache.orc.bench.core.convert.BatchWriter; +import org.apache.orc.bench.core.CompressionKind; import java.io.IOException; import java.io.OutputStream; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/orc/OrcReader.java similarity index 94% rename from java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/orc/OrcReader.java index e648856577..c87e8f46bc 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcReader.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/orc/OrcReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench.convert.orc; +package org.apache.orc.bench.core.convert.orc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -25,7 +25,7 @@ import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchReader; +import org.apache.orc.bench.core.convert.BatchReader; import java.io.IOException; diff --git a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/orc/OrcWriter.java similarity index 89% rename from java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/orc/OrcWriter.java index af5de9b8c0..baa2260d5c 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/orc/OrcWriter.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/orc/OrcWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.orc.bench.convert.orc; +package org.apache.orc.bench.core.convert.orc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -24,9 +24,9 @@ import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; -import org.apache.orc.bench.convert.BatchWriter; -import org.apache.orc.bench.CompressionKind; -import org.apache.orc.bench.Utilities; +import org.apache.orc.bench.core.convert.BatchWriter; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.Utilities; import java.io.IOException; diff --git a/java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetReader.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetReader.java new file mode 100644 index 0000000000..035ee86cea --- /dev/null +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.core.convert.parquet; + +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.convert.BatchReader; +import org.apache.orc.bench.core.convert.avro.AvroReader; +import org.apache.parquet.avro.AvroParquetReader; + +import java.io.IOException; + +public class ParquetReader implements BatchReader { + private final org.apache.parquet.hadoop.ParquetReader + reader; + private final AvroReader.AvroConverter[] converters; + + public ParquetReader(Path path, + TypeDescription schema, + Configuration conf) throws IOException { + reader = AvroParquetReader.builder(path) + .withCompatibility(true).build(); + converters = AvroReader.buildConverters(schema); + } + + @Override + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (batch.size < maxSize) { + GenericData.Record value = reader.read(); + if (value == null) { + break; + } + int row = batch.size++; + for(int c=0; c < converters.length; ++c) { + converters[c].convert(batch.cols[c], row, value.get(c)); + } + } + return batch.size != 0; + } + + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java b/java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetWriter.java similarity index 53% rename from java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java rename to java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetWriter.java index 075060e042..5077fef5d2 100644 --- a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetWriter.java +++ b/java/bench/core/src/java/org/apache/orc/bench/core/convert/parquet/ParquetWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,71 +16,68 @@ * limitations under the License. */ -package org.apache.orc.bench.convert.parquet; +package org.apache.orc.bench.core.convert.parquet; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.orc.OrcBenchmarkUtilities; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchWriter; -import org.apache.orc.bench.CompressionKind; -import org.apache.orc.bench.Utilities; -import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.orc.bench.core.convert.BatchWriter; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.convert.avro.AvroSchemaUtils; +import org.apache.orc.bench.core.convert.avro.AvroWriter; +import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; -import java.util.Properties; public class ParquetWriter implements BatchWriter { - private final FileSinkOperator.RecordWriter writer; - private final TypeDescription schema; - private final ParquetHiveRecord record; + private final org.apache.parquet.hadoop.ParquetWriter + writer; + private final AvroWriter.AvroConverter[] converters; + private final GenericData.Record record; + + static CompressionCodecName getParquetCompression(CompressionKind kind) { + switch (kind) { + case NONE: + return CompressionCodecName.UNCOMPRESSED; + case ZLIB: + return CompressionCodecName.GZIP; + case SNAPPY: + return CompressionCodecName.SNAPPY; + default: + throw new IllegalArgumentException("Unhandled compression type " + kind); + } + } public ParquetWriter(Path path, TypeDescription schema, Configuration conf, CompressionKind compression ) throws IOException { - JobConf jobConf = new JobConf(conf); - Properties tableProperties = Utilities.convertSchemaToHiveConfig(schema); - this.schema = schema; - jobConf.set(ParquetOutputFormat.COMPRESSION, getCodec(compression).name()); - writer = new MapredParquetOutputFormat().getHiveRecordWriter(jobConf, path, - ParquetHiveRecord.class, compression != CompressionKind.NONE, - tableProperties, Reporter.NULL); - record = new ParquetHiveRecord(null, - OrcBenchmarkUtilities.createObjectInspector(schema)); + Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema); + writer = AvroParquetWriter + .builder(path) + .withSchema(avroSchema) + .withConf(conf) + .withCompressionCodec(getParquetCompression(compression)) + .build(); + converters = AvroWriter.buildConverters(schema, avroSchema); + record = new GenericData.Record(avroSchema); } public void writeBatch(VectorizedRowBatch batch) throws IOException { for(int r=0; r < batch.size; ++r) { - record.value = OrcBenchmarkUtilities.nextObject(batch, schema, r, - (Writable) record.value); + for(int f=0; f < batch.cols.length; ++f) { + record.put(f, converters[f].convert(batch.cols[f], r)); + } writer.write(record); } } public void close() throws IOException { - writer.close(false); - } - - public static CompressionCodecName getCodec(CompressionKind kind) { - switch (kind) { - case NONE: - return CompressionCodecName.UNCOMPRESSED; - case ZLIB: - return CompressionCodecName.GZIP; - case SNAPPY: - return CompressionCodecName.SNAPPY; - default: - throw new IllegalArgumentException("Unsupported codec " + kind); - } + writer.close(); } } diff --git a/java/bench/src/main/resources/github.schema b/java/bench/core/src/resources/github.schema similarity index 100% rename from java/bench/src/main/resources/github.schema rename to java/bench/core/src/resources/github.schema diff --git a/java/bench/src/main/resources/log4j.properties b/java/bench/core/src/resources/log4j.properties similarity index 100% rename from java/bench/src/main/resources/log4j.properties rename to java/bench/core/src/resources/log4j.properties diff --git a/java/bench/src/main/resources/sales.schema b/java/bench/core/src/resources/sales.schema similarity index 100% rename from java/bench/src/main/resources/sales.schema rename to java/bench/core/src/resources/sales.schema diff --git a/java/bench/src/main/resources/taxi.schema b/java/bench/core/src/resources/taxi.schema similarity index 100% rename from java/bench/src/main/resources/taxi.schema rename to java/bench/core/src/resources/taxi.schema diff --git a/java/bench/hive/pom.xml b/java/bench/hive/pom.xml new file mode 100644 index 0000000000..841821939b --- /dev/null +++ b/java/bench/hive/pom.xml @@ -0,0 +1,138 @@ + + + + 4.0.0 + + org.apache.orc + orc-benchmarks + 1.6.0-SNAPSHOT + .. + + + org.apache.orc + orc-benchmarks-hive + 1.6.0-SNAPSHOT + jar + ORC Benchmarks Hive + + File format benchmarks for Hive. + + + + + com.google.auto.service + auto-service + + + com.google.code.gson + gson + + + commons-cli + commons-cli + + + org.apache.avro + avro + + + org.apache.avro + avro-mapred + hadoop2 + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hive + hive-exec + core + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-storage-api + + + org.apache.orc + orc-benchmarks-core + + + org.apache.orc + orc-core + + + org.apache.parquet + parquet-hadoop + + + org.openjdk.jmh + jmh-core + + + org.openjdk.jmh + jmh-generator-annprocess + + + + + ${basedir}/src/java + ${basedir}/src/test + + + ${basedir}/src/test/resources + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-enforcer-plugin + + + maven-assembly-plugin + + + + org.apache.orc.bench.core.Driver + + + + + + + + + + cmake + + ${build.dir}/bench/hive + + + + diff --git a/java/bench/hive/src/assembly/uber.xml b/java/bench/hive/src/assembly/uber.xml new file mode 100644 index 0000000000..014eab951b --- /dev/null +++ b/java/bench/hive/src/assembly/uber.xml @@ -0,0 +1,33 @@ + + + uber + + jar + + false + + + / + true + true + runtime + + + + + metaInf-services + + + diff --git a/java/bench/hive/src/findbugs/exclude.xml b/java/bench/hive/src/findbugs/exclude.xml new file mode 100644 index 0000000000..dde147124e --- /dev/null +++ b/java/bench/hive/src/findbugs/exclude.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java b/java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java similarity index 97% rename from java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java rename to java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java index 18c5d06776..f75c7f0500 100644 --- a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java +++ b/java/bench/hive/src/java/org/apache/hadoop/hive/ql/io/orc/OrcBenchmarkUtilities.java @@ -27,7 +27,7 @@ import java.util.List; /** - * Utilities that need the non-public methods from Hive. + * HiveUtilities that need the non-public methods from Hive. */ public class OrcBenchmarkUtilities { diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java similarity index 67% rename from java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java rename to java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java index 4afaaf1d07..146a6e7d32 100644 --- a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java +++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/ColumnProjectionBenchmark.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.hive; +import com.google.auto.service.AutoService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,85 +35,54 @@ import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.ReadCounters; +import org.apache.orc.bench.core.Utilities; import org.apache.parquet.hadoop.ParquetInputFormat; -import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.options.OptionsBuilder; import java.net.URI; import java.util.List; import java.util.concurrent.TimeUnit; @BenchmarkMode(Mode.AverageTime) -@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) -@State(Scope.Thread) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@Fork(1) -public class ColumnProjectionBenchmark { +@State(Scope.Thread) +@AutoService(OrcBenchmark.class) +public class ColumnProjectionBenchmark implements OrcBenchmark { - private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; - private static final Path root; - static { - String value = System.getProperty(ROOT_ENVIRONMENT_NAME); - root = value == null ? null : new Path(value); - } + private static final Path root = Utilities.getBenchmarkRoot(); @Param({ "github", "sales", "taxi"}) public String dataset; - @Param({"none", "snappy", "zlib"}) + @Param({"none", "snappy", "gz"}) public String compression; - @AuxCounters - @State(Scope.Thread) - public static class ExtraCounters { - long bytesRead; - long reads; - long records; - long invocations; - - @Setup(Level.Iteration) - public void clean() { - bytesRead = 0; - reads = 0; - records = 0; - invocations = 0; - } - - @TearDown(Level.Iteration) - public void print() { - System.out.println(); - System.out.println("Reads: " + reads); - System.out.println("Bytes: " + bytesRead); - System.out.println("Records: " + records); - System.out.println("Invocations: " + invocations); - } + @Override + public String getName() { + return "read-some"; + } - public long kilobytes() { - return bytesRead / 1024; - } + @Override + public String getDescription() { + return "Benchmark column projection"; + } - public long records() { - return records; - } + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, getClass())).run(); } @Benchmark - public void orc(ExtraCounters counters) throws Exception{ + public void orc(ReadCounters counters) throws Exception{ Configuration conf = new Configuration(); TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); fs.initialize(new URI("file:///"), conf); @@ -132,16 +102,15 @@ public void orc(ExtraCounters counters) throws Exception{ .include(include)); VectorizedRowBatch batch = schema.createRowBatch(); while (rows.nextBatch(batch)) { - counters.records += batch.size; + counters.addRecords(batch.size); } rows.close(); - counters.bytesRead += statistics.getBytesRead(); - counters.reads += statistics.getReadOps(); - counters.invocations += 1; + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); } @Benchmark - public void parquet(ExtraCounters counters) throws Exception { + public void parquet(ReadCounters counters) throws Exception { JobConf conf = new JobConf(); conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); conf.set("fs.defaultFS", "track:///"); @@ -171,18 +140,10 @@ public void parquet(ExtraCounters counters) throws Exception { new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); ArrayWritable value = recordReader.createValue(); while (recordReader.next(nada, value)) { - counters.records += 1; + counters.addRecords(1); } recordReader.close(); - counters.bytesRead += statistics.getBytesRead(); - counters.reads += statistics.getReadOps(); - counters.invocations += 1; - } - public static void main(String[] args) throws Exception { - new Runner(new OptionsBuilder() - .include(ColumnProjectionBenchmark.class.getSimpleName()) - .jvmArgs("-server", "-Xms256m", "-Xmx2g", - "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() - ).run(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); } } diff --git a/java/bench/src/java/org/apache/orc/bench/DecimalBench.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java similarity index 74% rename from java/bench/src/java/org/apache/orc/bench/DecimalBench.java rename to java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java index 71a1c33b19..03450353ad 100644 --- a/java/bench/src/java/org/apache/orc/bench/DecimalBench.java +++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/DecimalBench.java @@ -16,77 +16,61 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.hive; -import com.google.gson.JsonStreamParser; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.mapred.FsInput; +import com.google.auto.service.AutoService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.TrackingLocalFileSystem; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; -import org.apache.orc.bench.convert.BatchReader; -import org.apache.orc.bench.convert.GenerateVariants; -import org.apache.orc.bench.convert.csv.CsvReader; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.openjdk.jmh.annotations.AuxCounters; +import org.apache.orc.bench.core.NullFileSystem; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.Utilities; +import org.apache.orc.bench.core.convert.BatchReader; +import org.apache.orc.bench.core.convert.GenerateVariants; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.options.OptionsBuilder; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @BenchmarkMode(Mode.AverageTime) -@Warmup(iterations=2, time=30, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations=10, time=30, timeUnit = TimeUnit.SECONDS) -@State(Scope.Thread) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@Fork(2) -public class DecimalBench { +@AutoService(OrcBenchmark.class) +public class DecimalBench implements OrcBenchmark { - private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; - private static final Path root; - static { - String value = System.getProperty(ROOT_ENVIRONMENT_NAME); - root = value == null ? null : new Path(value); + private static final Path root = Utilities.getBenchmarkRoot(); + + @Override + public String getName() { + return "decimal"; + } + + @Override + public String getDescription() { + return "Benchmark new decimal64 read and write"; + } + + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, getClass())).run(); } /** @@ -149,9 +133,9 @@ public void loadData(ColumnVector vector, long[] values, int offset, int length) @State(Scope.Thread) public static class OutputState { - // try both short and long decimals - @Param({"8", "19"}) - public int precision; + // try both DecimalColumnVector and Decimal64ColumnVector + @Param({"ORIGINAL", "USE_DECIMAL64"}) + public TypeDescription.RowBatchVersion version; long[] total_amount = new long[1024 * 1024]; Configuration conf = new Configuration(); @@ -159,15 +143,20 @@ public static class OutputState { TypeDescription schema; VectorizedRowBatch batch; Loader loader; + int precision; @Setup public void setup() throws IOException { + if (version == TypeDescription.RowBatchVersion.ORIGINAL) { + precision = 19; + loader = new DecimalLoader(precision, 2); + } else { + precision = 8; + loader = new Decimal64Loader(precision, 2); + } schema = TypeDescription.createDecimal() .withScale(2) .withPrecision(precision); - loader = precision <= 18 ? - new Decimal64Loader(precision, 2) : - new DecimalLoader(precision, 2); readCsvData(total_amount, root, "total_amount", conf); batch = schema.createRowBatchV2(); } @@ -200,7 +189,7 @@ static void readCsvData(long[] data, int batchPosn = 0; BatchReader reader = new GenerateVariants.RecursiveReader(new Path(root, "sources/taxi"), "csv", - schema, conf, org.apache.orc.bench.CompressionKind.ZLIB); + schema, conf, org.apache.orc.bench.core.CompressionKind.ZLIB); VectorizedRowBatch batch = schema.createRowBatch(); batch.size = 0; TypeDescription columnSchema = schema.findSubtype(column); @@ -208,9 +197,9 @@ static void readCsvData(long[] data, int scale = columnSchema.getScale(); while (row < data.length) { if (batchPosn >= batch.size) { - if (!reader.nextBatch(batch)) { - throw new IllegalArgumentException("Not enough data"); - } + // Read the next batch and ignore eof. If the file is shorter + // than we need, just reuse the current batch over again. + reader.nextBatch(batch); batchPosn = 0; } data[row++] = cv.vector[batchPosn++].serialize64(scale); @@ -261,12 +250,4 @@ public void read(Blackhole blackhole, InputState state) throws Exception { } rows.close(); } - - public static void main(String[] args) throws Exception { - new Runner(new OptionsBuilder() - .include(DecimalBench.class.getSimpleName()) - .jvmArgs("-server", "-Xms256m", "-Xmx2g", - "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() - ).run(); - } } diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java similarity index 66% rename from java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java rename to java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java index 952f18dd88..2bbcf60aba 100644 --- a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java +++ b/java/bench/hive/src/java/org/apache/orc/bench/hive/FullReadBenchmark.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.orc.bench; +package org.apache.orc.bench.hive; +import com.google.auto.service.AutoService; import com.google.gson.JsonStreamParser; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; @@ -40,23 +41,19 @@ import org.apache.orc.Reader; import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.CompressionKind; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.ReadCounters; +import org.apache.orc.bench.core.Utilities; import org.apache.parquet.hadoop.ParquetInputFormat; -import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.options.OptionsBuilder; import java.io.InputStream; import java.io.InputStreamReader; @@ -65,62 +62,36 @@ import java.util.concurrent.TimeUnit; @BenchmarkMode(Mode.AverageTime) -@Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations=3, time=10, timeUnit = TimeUnit.SECONDS) -@State(Scope.Thread) @OutputTimeUnit(TimeUnit.MICROSECONDS) -@Fork(1) -public class FullReadBenchmark { +@State(Scope.Thread) +@AutoService(OrcBenchmark.class) +public class FullReadBenchmark implements OrcBenchmark { - private static final String ROOT_ENVIRONMENT_NAME = "bench.root.dir"; - private static final Path root; - static { - String value = System.getProperty(ROOT_ENVIRONMENT_NAME); - root = value == null ? null : new Path(value); - } + private static final Path root = Utilities.getBenchmarkRoot(); @Param({"taxi", "sales", "github"}) public String dataset; - @Param({"none", "zlib", "snappy"}) + @Param({"none", "gz", "snappy"}) public String compression; - @AuxCounters - @State(Scope.Thread) - public static class ExtraCounters { - long bytesRead; - long reads; - long records; - long invocations; - - @Setup(Level.Iteration) - public void clean() { - bytesRead = 0; - reads = 0; - records = 0; - invocations = 0; - } - - @TearDown(Level.Iteration) - public void print() { - System.out.println(); - System.out.println("Reads: " + reads); - System.out.println("Bytes: " + bytesRead); - System.out.println("Records: " + records); - System.out.println("Invocations: " + invocations); - } + @Override + public String getName() { + return "read-all"; + } - public long kilobytes() { - return bytesRead / 1024; - } + @Override + public String getDescription() { + return "read all columns and rows"; + } - public long records() { - return records; - } + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, getClass())).run(); } @Benchmark - public void orc(ExtraCounters counters) throws Exception{ + public void orc(ReadCounters counters) throws Exception{ Configuration conf = new Configuration(); TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); fs.initialize(new URI("file:///"), conf); @@ -133,16 +104,15 @@ public void orc(ExtraCounters counters) throws Exception{ RecordReader rows = reader.rows(); VectorizedRowBatch batch = schema.createRowBatch(); while (rows.nextBatch(batch)) { - counters.records += batch.size; + counters.addRecords(batch.size); } rows.close(); - counters.bytesRead += statistics.getBytesRead(); - counters.reads += statistics.getReadOps(); - counters.invocations += 1; + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); } @Benchmark - public void avro(ExtraCounters counters) throws Exception { + public void avro(ReadCounters counters) throws Exception { Configuration conf = new Configuration(); conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); conf.set("fs.defaultFS", "track:///"); @@ -157,15 +127,14 @@ public void avro(ExtraCounters counters) throws Exception { GenericRecord record = null; while (dataFileReader.hasNext()) { record = dataFileReader.next(record); - counters.records += 1; + counters.addRecords(1); } - counters.bytesRead += statistics.getBytesRead(); - counters.reads += statistics.getReadOps(); - counters.invocations += 1; + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); } @Benchmark - public void parquet(ExtraCounters counters) throws Exception { + public void parquet(ReadCounters counters) throws Exception { JobConf conf = new JobConf(); conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); conf.set("fs.defaultFS", "track:///"); @@ -182,42 +151,31 @@ public void parquet(ExtraCounters counters) throws Exception { new ParquetRecordReaderWrapper(inputFormat, split, conf, Reporter.NULL); ArrayWritable value = recordReader.createValue(); while (recordReader.next(nada, value)) { - counters.records += 1; + counters.addRecords(1); } recordReader.close(); - counters.bytesRead += statistics.getBytesRead(); - counters.reads += statistics.getReadOps(); - counters.invocations += 1; + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); } @Benchmark - public void json(ExtraCounters counters) throws Exception { + public void json(ReadCounters counters) throws Exception { Configuration conf = new Configuration(); TrackingLocalFileSystem fs = new TrackingLocalFileSystem(); fs.initialize(new URI("file:///"), conf); FileSystem.Statistics statistics = fs.getLocalStatistics(); statistics.reset(); Path path = Utilities.getVariant(root, dataset, "json", compression); - CompressionKind compress = CompressionKind.valueOf(compression); + CompressionKind compress = CompressionKind.fromExtension(compression); InputStream input = compress.read(fs.open(path)); JsonStreamParser parser = new JsonStreamParser(new InputStreamReader(input, StandardCharsets.UTF_8)); while (parser.hasNext()) { parser.next(); - counters.records += 1; + counters.addRecords(1); } - counters.bytesRead += statistics.getBytesRead(); - counters.reads += statistics.getReadOps(); - counters.invocations += 1; - } - - public static void main(String[] args) throws Exception { - new Runner(new OptionsBuilder() - .include(FullReadBenchmark.class.getSimpleName()) - .addProfiler("hs_gc") - .jvmArgs("-server", "-Xms256m", "-Xmx2g", - "-D" + ROOT_ENVIRONMENT_NAME + "=" + args[0]).build() - ).run(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + counters.addInvocation(); } } diff --git a/java/bench/pom.xml b/java/bench/pom.xml index 2cebf1ad7d..aed26b6d7c 100644 --- a/java/bench/pom.xml +++ b/java/bench/pom.xml @@ -26,7 +26,7 @@ org.apache.orc orc-benchmarks 1.6.0-SNAPSHOT - jar + pom ORC Benchmarks Benchmarks for comparing ORC, Parquet, JSON, and Avro performance. @@ -39,178 +39,507 @@ 1.8.2 2.7.3 2.3.3 + 0.1.3 1.20 - 1.6.0-SNAPSHOT - 1.9.0 - 2.5.0 + 1.5.2 + 1.8.3 + 1.7.25 + 2.3.1 + 2.6.1 3.4.6 - - - com.fasterxml.jackson.core - jackson-core - 2.8.4 - - - com.google.code.gson - gson - 2.2.4 - - - commons-cli - commons-cli - 1.3.1 - - - io.airlift - aircompressor - 0.10 - - - io.airlift - slice - - - - - org.apache.avro - avro - ${avro.version} - - - org.apache.avro - avro-mapred - hadoop2 - ${avro.version} - - - org.apache.commons - commons-csv - 1.4 - - - org.apache.hadoop - hadoop-common - ${hadoop.version} - - - org.apache.hadoop - hadoop-hdfs - ${hadoop.version} - runtime - - - org.apache.hadoop - hadoop-mapreduce-client-core - ${hadoop.version} - - - org.apache.hive - hive-exec - core - ${hive.version} - - - org.apache.hive - hive-serde - ${hive.version} - - - org.apache.hive - hive-storage-api - ${storage-api.version} - - - org.apache.orc - orc-core - ${orc.version} - - - org.apache.parquet - parquet-hadoop-bundle - ${parquet.version} - - - org.jodd - jodd-core - 3.5.2 - runtime - - - org.openjdk.jmh - jmh-core - ${jmh.version} - - - org.openjdk.jmh - jmh-generator-annprocess - ${jmh.version} - - + + core + hive + spark + + + + + + com.databricks + spark-avro_2.11 + 3.2.0 + + + com.fasterxml.jackson.core + jackson-core + 2.8.4 + + + com.google.auto.service + auto-service + 1.0-rc4 + true + + + com.google.code.gson + gson + 2.2.4 + + + commons-cli + commons-cli + 1.3.1 + + + io.airlift + aircompressor + 0.10 + + + io.airlift + slice + + + + + com.netflix.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.orc + orc-core + + + + + com.netflix.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.orc + orc-core + + + + + com.netflix.iceberg + iceberg-spark + ${iceberg.version} + + + org.apache.orc + orc-core + + + + + io.netty + netty-all + 4.1.17.Final + runtime + + + io.netty + netty + 3.9.9.Final + runtime + + + org.apache.avro + avro + ${avro.version} + + + org.apache.avro + avro-mapred + hadoop2 + ${avro.version} + + + org.mortbay.jetty + servlet-api + + + + + org.apache.commons + commons-csv + 1.4 + + + org.apache.commons + commons-lang3 + 3.7 + runtime + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + com.sun.jersey + jersey-server + + + com.sun.jersey + jersey-core + + + commons-beanutils + commons-beanutils + + + commons-beanutils + commons-beanutils-core + + + javax.servlet + servlet-api + + + org.mortbay.jetty + servlet-api + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + runtime + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-server + + + javax.servlet + servlet-api + + + org.fusesource.leveldbjni + leveldbjni-all + + + org.mortbay.jetty + servlet-api + + + + + org.apache.hadoop + hadoop-yarn-common + ${hadoop.version} + runtime + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-server + + + javax.inject + javax.inject + + + javax.servlet + servlet-api + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-server + + + javax.inject + javax.inject + + + javax.servlet + servlet-api + + + + + org.apache.hive + hive-exec + core + ${hive.version} + + + org.apache.calcite.avatica + avatica + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + stax + stax-api + + + + + org.apache.hive + hive-serde + ${hive.version} + + + javax.servlet + servlet-api + + + org.apache.logging.log4j + log4j-1.2-api + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-web + + + org.apache.parquet + parquet-hadoop-bundle + + + org.eclipse.jetty.aggregate + jetty-all + + + org.eclipse.jetty.orbit + javax.servlet + + + + + org.apache.hive + hive-service-rpc + ${hive.version} + + + tomcat + jasper-compiler + + + + + org.apache.hive + hive-storage-api + ${storage-api.version} + + + org.apache.orc + orc-benchmarks-core + 1.6.0-SNAPSHOT + + + org.apache.orc + orc-core + ${orc.version} + + + org.apache.orc + orc-mapreduce + ${orc.version} + runtime + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + org.apache.parquet + parquet-avro + ${parquet.version} + + + org.apache.spark + spark-catalyst_2.11 + ${spark.version} + + + org.apache.spark + spark-core_2.11 + ${spark.version} + + + org.glassfish.hk2.external + aopalliance-repackaged + + + org.mortbay.jetty + servlet-api + + + org.slf4j + jcl-over-slf4j + + + org.fusesource.leveldbjni + leveldbjni-all + + + + + org.apache.spark + spark-sql_2.11 + ${spark.version} + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + + + + org.codehaus.janino + janino + 3.0.8 + runtime + + + org.codehaus.janino + commons-compiler + 3.0.8 + runtime + + + org.jodd + jodd-core + 3.5.2 + runtime + + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + org.scala-lang + scala-library + 2.11.8 + + + org.slf4j + slf4j-api + ${slf4j.version} + runtime + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + runtime + + + ${basedir}/src/java ${basedir}/src/test - - - ${basedir}/src/test/resources - - - - org.apache.maven.plugins - maven-enforcer-plugin - 3.0.0-M1 - - - enforce-maven - - enforce - - - - - 2.2.1 - - - - - - org.apache.maven.plugins maven-compiler-plugin - 3.1 - - 1.7 - 1.7 - - maven-assembly-plugin - 3.1.0 - - - - org.apache.orc.bench.Driver - - - - src/assembly/uber.xml - - - - - make-assembly - package - - single - - - + org.apache.maven.plugins + maven-enforcer-plugin + + + + org.apache.maven.plugins + maven-enforcer-plugin + 3.0.0-M1 + + + enforce-maven + + enforce + + + + + 2.2.1 + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + -Xlint:unchecked + + + + + maven-assembly-plugin + + + src/assembly/uber.xml + + + + + make-assembly + package + + single + + + + + + diff --git a/java/bench/spark/pom.xml b/java/bench/spark/pom.xml new file mode 100644 index 0000000000..90e29a419b --- /dev/null +++ b/java/bench/spark/pom.xml @@ -0,0 +1,203 @@ + + + + 4.0.0 + + org.apache.orc + orc-benchmarks + 1.6.0-SNAPSHOT + .. + + + org.apache.orc + orc-benchmarks-spark + 1.6.0-SNAPSHOT + jar + ORC Benchmarks Spark + + Benchmarks for comparing ORC, Parquet, JSON, and Avro performance under + Spark. + + + + UTF-8 + false + + + + + com.databricks + spark-avro_2.11 + + + com.google.auto.service + auto-service + + + commons-cli + commons-cli + + + io.netty + netty-all + + + io.netty + netty + + + org.apache.commons + commons-lang3 + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hive + hive-storage-api + runtime + + + org.apache.orc + orc-benchmarks-core + + + org.apache.orc + orc-core + + + org.apache.orc + orc-mapreduce + + + org.apache.spark + spark-catalyst_2.11 + + + org.apache.spark + spark-core_2.11 + + + org.apache.spark + spark-sql_2.11 + + + org.codehaus.janino + janino + + + org.codehaus.janino + commons-compiler + + + org.jodd + jodd-core + + + org.openjdk.jmh + jmh-core + + + org.openjdk.jmh + jmh-generator-annprocess + + + org.scala-lang + scala-library + + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + package + + shade + + + false + + + org.codehaus.janino:janino + + META-INF/DUMMY.SF + META-INF/DUMMY.DSA + + + + org.codehaus.janino:commons-compiler + + META-INF/DUMMY.SF + META-INF/DUMMY.DSA + + + + + + org.apache.orc.storage + org.apache.hadoop.hive + + + + + + org.apache.orc.bench.core.Driver + + + + + + + + + + + + + + + cmake + + ${build.dir}/bench/spark + + + + diff --git a/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java new file mode 100644 index 0000000000..87d3277347 --- /dev/null +++ b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkBenchmark.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.spark; + +import com.google.auto.service.AutoService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.TrackingLocalFileSystem; +import org.apache.orc.TypeDescription; +import org.apache.orc.bench.core.OrcBenchmark; +import org.apache.orc.bench.core.ReadCounters; +import org.apache.orc.bench.core.Utilities; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.datasources.FileFormat; +import org.apache.spark.sql.execution.datasources.PartitionedFile; +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat; +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat; +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; +import org.apache.spark.sql.sources.And$; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.sources.GreaterThanOrEqual$; +import org.apache.spark.sql.sources.LessThan$; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import scala.Function1; + +import java.io.IOException; + +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.JavaConverters; +import scala.collection.immutable.Map; +import scala.collection.immutable.Map$; +import scala.collection.Seq; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@AutoService(OrcBenchmark.class) +public class SparkBenchmark implements OrcBenchmark { + + private static final Path root = Utilities.getBenchmarkRoot(); + + @Override + public String getName() { + return "spark"; + } + + @Override + public String getDescription() { + return "Run Spark benchmarks"; + } + + @Override + public void run(String[] args) throws Exception { + new Runner(Utilities.parseOptions(args, this.getClass())).run(); + } + + @State(Scope.Thread) + public static class InputSource { + SparkSession session; + TrackingLocalFileSystem fs; + Configuration conf; + Path path; + StructType schema; + StructType empty = new StructType(); + FileFormat formatObject; + + @Param({"taxi", "sales", "github"}) + String dataset; + + @Param({"none", "gz", "snappy"}) + String compression; + + @Param({"orc", "parquet", "json"}) + String format; + + @Setup(Level.Trial) + public void setup() { + session = SparkSession.builder().appName("benchmark") + .config("spark.master", "local[4]") + .config("spark.sql.orc.filterPushdown", true) + .config("spark.sql.orc.impl", "native") + .getOrCreate(); + conf = session.sparkContext().hadoopConfiguration(); + conf.set("avro.mapred.ignore.inputs.without.extension","false"); + conf.set("fs.track.impl", TrackingLocalFileSystem.class.getName()); + path = new Path("track://", + Utilities.getVariant(root, dataset, format, compression)); + try { + fs = (TrackingLocalFileSystem) path.getFileSystem(conf); + } catch (IOException e) { + throw new IllegalArgumentException("Can't get filesystem", e); + } + try { + TypeDescription orcSchema = Utilities.loadSchema(dataset + ".schema"); + schema = (StructType) SparkSchema.convertToSparkType(orcSchema); + } catch (IOException e) { + throw new IllegalArgumentException("Can't read schema " + dataset, e); + } + switch (format) { + case "avro": + formatObject = new com.databricks.spark.avro.DefaultSource(); + break; + case "orc": + formatObject = new OrcFileFormat(); + break; + case "parquet": + formatObject = new ParquetFileFormat(); + break; + case "json": + formatObject = new JsonFileFormat(); + break; + default: + throw new IllegalArgumentException("Unknown format " + format); + } + } + } + + static void processReader(Iterator reader, + FileSystem.Statistics statistics, + ReadCounters counters, + Blackhole blackhole) { + while (reader.hasNext()) { + Object row = reader.next(); + if (row instanceof ColumnarBatch) { + counters.addRecords(((ColumnarBatch) row).numRows()); + } else { + counters.addRecords(1); + } + blackhole.consume(row); + } + counters.addInvocation(); + counters.addBytes(statistics.getReadOps(), statistics.getBytesRead()); + } + + @Benchmark + public void fullRead(InputSource source, + ReadCounters counters, + Blackhole blackhole) { + FileSystem.Statistics statistics = source.fs.getLocalStatistics(); + statistics.reset(); + List filters = new ArrayList<>(); + List> options = new ArrayList<>(); + switch (source.format) { + case "json": + options.add(new Tuple2<>("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSS")); + break; + default: + break; + } + Seq> optionsScala = JavaConverters + .asScalaBufferConverter(options).asScala().toSeq(); + @SuppressWarnings("unchecked") + Map scalaMap = (Map)Map$.MODULE$.apply(optionsScala); + Function1> factory = + source.formatObject.buildReaderWithPartitionValues(source.session, + source.schema, source.empty, source.schema, + JavaConverters.collectionAsScalaIterableConverter(filters).asScala().toSeq(), + scalaMap, source.conf); + PartitionedFile file = new PartitionedFile(InternalRow.empty(), + source.path.toString(), 0, Long.MAX_VALUE, new String[0]); + processReader(factory.apply(file), statistics, counters, blackhole); + } + + @Benchmark + public void partialRead(InputSource source, + ReadCounters counters, + Blackhole blackhole) { + FileSystem.Statistics statistics = source.fs.getLocalStatistics(); + statistics.reset(); + List filters = new ArrayList<>(); + List> options = new ArrayList<>(); + switch (source.format) { + case "json": + case "avro": + throw new IllegalArgumentException(source.format + " can't handle projection"); + default: + break; + } + TypeDescription readSchema = null; + switch (source.dataset) { + case "taxi": + readSchema = TypeDescription.fromString("struct"); + break; + case "sales": + readSchema = TypeDescription.fromString("struct"); + break; + case "github": + readSchema = TypeDescription.fromString("struct," + + "created_at:timestamp>"); + break; + } + Seq> optionsScala = JavaConverters.asScalaBufferConverter(options).asScala().toSeq(); + @SuppressWarnings("unchecked") + Map scalaMap = (Map)Map$.MODULE$.apply(optionsScala); + Function1> factory = + source.formatObject.buildReaderWithPartitionValues(source.session, + source.schema, source.empty, + (StructType) SparkSchema.convertToSparkType(readSchema), + JavaConverters.collectionAsScalaIterableConverter(filters).asScala().toSeq(), + scalaMap, source.conf); + PartitionedFile file = new PartitionedFile(InternalRow.empty(), + source.path.toString(), 0, Long.MAX_VALUE, new String[0]); + processReader(factory.apply(file), statistics, counters, blackhole); + } + + @Benchmark + public void pushDown(InputSource source, + ReadCounters counters, + Blackhole blackhole) { + FileSystem.Statistics statistics = source.fs.getLocalStatistics(); + statistics.reset(); + List filters = new ArrayList<>(); + switch (source.dataset) { + case "taxi": + filters.add(And$.MODULE$.apply( + GreaterThanOrEqual$.MODULE$.apply("pickup_time", + Timestamp.valueOf("2015-11-01 00:00:00.0")), + LessThan$.MODULE$.apply("pickup_time", + Timestamp.valueOf("2015-11-01 00:01:00.0")))); + break; + case "sales": + filters.add(And$.MODULE$.apply( + GreaterThanOrEqual$.MODULE$.apply("sales_id", 1000000000L), + LessThan$.MODULE$.apply("sales_id", 1000001000L))); + break; + case "github": + filters.add(And$.MODULE$.apply( + GreaterThanOrEqual$.MODULE$.apply("created_at", + Timestamp.valueOf("2015-11-01 00:00:00.0")), + LessThan$.MODULE$.apply("created_at", + Timestamp.valueOf("2015-11-01 00:01:00.0")))); + break; + } + List> options = new ArrayList<>(); + switch (source.format) { + case "json": + case "avro": + throw new IllegalArgumentException(source.format + " can't handle pushdown"); + default: + break; + } + Seq> optionsScala = JavaConverters.asScalaBufferConverter(options).asScala().toSeq(); + @SuppressWarnings("unchecked") + Map scalaMap = (Map)Map$.MODULE$.apply(optionsScala); + Function1> factory = + source.formatObject.buildReaderWithPartitionValues(source.session, + source.schema, source.empty, source.schema, + JavaConverters.collectionAsScalaIterableConverter(filters).asScala().toSeq(), + scalaMap, source.conf); + PartitionedFile file = new PartitionedFile(InternalRow.empty(), + source.path.toString(), 0, Long.MAX_VALUE, new String[0]); + processReader(factory.apply(file), statistics, counters, blackhole); + } +} diff --git a/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java new file mode 100644 index 0000000000..6d4d2a8d38 --- /dev/null +++ b/java/bench/spark/src/java/org/apache/orc/bench/spark/SparkSchema.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.spark; + +import org.apache.orc.TypeDescription; +import org.apache.spark.sql.types.ArrayType$; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType$; +import org.apache.spark.sql.types.ByteType$; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType$; +import org.apache.spark.sql.types.DecimalType$; +import org.apache.spark.sql.types.DoubleType$; +import org.apache.spark.sql.types.FloatType$; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.MapType$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.ShortType$; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType$; + +import java.util.ArrayList; +import java.util.List; + +public class SparkSchema { + + public static DataType convertToSparkType(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return BooleanType$.MODULE$; + case BYTE: + return ByteType$.MODULE$; + case SHORT: + return ShortType$.MODULE$; + case INT: + return IntegerType$.MODULE$; + case LONG: + return LongType$.MODULE$; + case FLOAT: + return FloatType$.MODULE$; + case DOUBLE: + return DoubleType$.MODULE$; + case BINARY: + return BinaryType$.MODULE$; + case STRING: + case CHAR: + case VARCHAR: + return StringType$.MODULE$; + case DATE: + return DateType$.MODULE$; + case TIMESTAMP: + return TimestampType$.MODULE$; + case DECIMAL: + return DecimalType$.MODULE$.apply(schema.getPrecision(), schema.getScale()); + case LIST: + return ArrayType$.MODULE$.apply( + convertToSparkType(schema.getChildren().get(0)), true); + case MAP: + return MapType$.MODULE$.apply( + convertToSparkType(schema.getChildren().get(0)), + convertToSparkType(schema.getChildren().get(1)), true); + case STRUCT: { + int size = schema.getChildren().size(); + List sparkFields = new ArrayList<>(size); + for(int c=0; c < size; ++c) { + sparkFields.add(StructField.apply(schema.getFieldNames().get(c), + convertToSparkType(schema.getChildren().get(c)), true, + Metadata.empty())); + } + return StructType$.MODULE$.apply(sparkFields); + } + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } +} diff --git a/java/bench/src/java/org/apache/orc/bench/Driver.java b/java/bench/src/java/org/apache/orc/bench/Driver.java deleted file mode 100644 index 6a86f90afd..0000000000 --- a/java/bench/src/java/org/apache/orc/bench/Driver.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.DefaultParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.hadoop.conf.Configuration; -import org.apache.orc.bench.convert.GenerateVariants; -import org.apache.orc.bench.convert.ScanVariants; - -import java.util.Arrays; - -/** - * A driver tool to call the various benchmark classes. - */ -public class Driver { - - static CommandLine parseCommandLine(String[] args) throws ParseException { - Options options = new Options() - .addOption("h", "help", false, "Provide help") - .addOption("D", "define", true, "Change configuration settings"); - CommandLine result = new DefaultParser().parse(options, args, true); - if (result.hasOption("help") || result.getArgs().length == 0) { - new HelpFormatter().printHelp("benchmark ", options); - System.err.println(); - System.err.println("Commands:"); - System.err.println(" generate - Generate data variants"); - System.err.println(" scan - Scan data variants"); - System.err.println(" read-all - Full table scan benchmark"); - System.err.println(" read-some - Column projection benchmark"); - System.err.println(" decimal - Decimal benchmark"); - System.exit(1); - } - return result; - } - - public static void main(String[] args) throws Exception { - CommandLine cli = parseCommandLine(args); - args = cli.getArgs(); - String command = args[0]; - args = Arrays.copyOfRange(args, 1, args.length); - switch (command) { - case "generate": - GenerateVariants.main(args); - break; - case "scan": - ScanVariants.main(args); - break; - case "read-all": - FullReadBenchmark.main(args); - break; - case "read-some": - ColumnProjectionBenchmark.main(args); - break; - case "decimal": - DecimalBench.main(args); - break; - default: - System.err.println("Unknown command " + command); - System.exit(1); - } - } -} diff --git a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java b/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java deleted file mode 100644 index 83f70f45e5..0000000000 --- a/java/bench/src/java/org/apache/orc/bench/convert/parquet/ParquetReader.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench.convert.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.TypeDescription; -import org.apache.orc.bench.convert.BatchReader; - -import java.io.IOException; -import java.util.List; - -public class ParquetReader implements BatchReader { - - private final NullWritable nada = NullWritable.get(); - private final RecordReader reader; - private final ArrayWritable value; - private final Converter[] converters; - - public ParquetReader(Path path, - TypeDescription schema, - Configuration conf) throws IOException { - FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{}); - JobConf jobConf = new JobConf(conf); - reader = new MapredParquetInputFormat().getRecordReader(split, jobConf, - Reporter.NULL); - value = reader.createValue(); - converters = new Converter[schema.getChildren().size()]; - List children = schema.getChildren(); - for(int c = 0; c < converters.length; ++c) { - converters[c] = createConverter(children.get(c)); - } - } - - @Override - public boolean nextBatch(VectorizedRowBatch batch) throws IOException { - batch.reset(); - int maxSize = batch.getMaxSize(); - while (batch.size < maxSize && reader.next(nada, value)) { - Writable[] values = value.get(); - int row = batch.size++; - for(int c=0; c < batch.cols.length; ++c) { - converters[c].convert(batch.cols[c], row, values[c]); - } - } - return batch.size != 0; - } - - @Override - public void close() throws IOException { - reader.close(); - } - - interface Converter { - void convert(ColumnVector vector, int row, Object value); - } - - private static class BooleanConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - ((LongColumnVector) cv).vector[row] = - ((BooleanWritable) value).get() ? 1 : 0; - } - } - } - - private static class IntConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - ((LongColumnVector) cv).vector[row] = - ((IntWritable) value).get(); - } - } - } - - private static class LongConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - ((LongColumnVector) cv).vector[row] = - ((LongWritable) value).get(); - } - } - } - - private static class FloatConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - ((DoubleColumnVector) cv).vector[row] = - ((FloatWritable) value).get(); - } - } - } - - private static class DoubleConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - ((DoubleColumnVector) cv).vector[row] = - ((DoubleWritable) value).get(); - } - } - } - - private static class StringConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - Text castValue = (Text) value; - ((BytesColumnVector) cv).setVal(row, castValue.getBytes(), 0, - castValue.getLength()); - } - } - } - - private static class BinaryConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - BytesWritable buf = (BytesWritable) value; - ((BytesColumnVector) cv).setVal(row, buf.getBytes(), 0, - buf.getLength()); - } - } - } - - private static class TimestampConverter implements Converter { - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - TimestampColumnVector tc = (TimestampColumnVector) cv; - tc.time[row] = ((TimestampWritable) value).getSeconds(); - tc.nanos[row] = ((TimestampWritable) value).getNanos(); - } - } - } - - private static class DecimalConverter implements Converter { - final int scale; - DecimalConverter(int scale) { - this.scale = scale; - } - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - DecimalColumnVector tc = (DecimalColumnVector) cv; - tc.vector[row].set((HiveDecimalWritable) value); - } - } - } - - private static class ListConverter implements Converter { - final Converter childConverter; - - ListConverter(TypeDescription schema) { - childConverter = createConverter(schema.getChildren().get(0)); - } - - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - ListColumnVector tc = (ListColumnVector) cv; - Writable[] array = ((ArrayWritable) value).get(); - int start = tc.childCount; - int len = array.length; - tc.childCount += len; - tc.child.ensureSize(tc.childCount, true); - for(int i=0; i < len; ++i) { - childConverter.convert(tc.child, start + i, array[i]); - } - } - } - } - - private static class StructConverter implements Converter { - final Converter[] childConverters; - - StructConverter(TypeDescription schema) { - List children = schema.getChildren(); - childConverters = new Converter[children.size()]; - for(int i=0; i < childConverters.length; ++i) { - childConverters[i] = createConverter(children.get(i)); - } - } - - public void convert(ColumnVector cv, int row, Object value) { - if (value == null) { - cv.noNulls = false; - cv.isNull[row] = true; - } else { - StructColumnVector tc = (StructColumnVector) cv; - Writable[] record = ((ArrayWritable) value).get(); - for(int c=0; c < tc.fields.length; ++c) { - childConverters[c].convert(tc.fields[c], row, record[c]); - } - } - } - } - - static Converter createConverter(TypeDescription types) { - switch (types.getCategory()) { - case BINARY: - return new BinaryConverter(); - case BOOLEAN: - return new BooleanConverter(); - case BYTE: - case SHORT: - case INT: - return new IntConverter(); - case LONG: - return new LongConverter(); - case FLOAT: - return new FloatConverter(); - case DOUBLE: - return new DoubleConverter(); - case CHAR: - case VARCHAR: - case STRING: - return new StringConverter(); - case TIMESTAMP: - return new TimestampConverter(); - case DECIMAL: - return new DecimalConverter(types.getScale()); - case LIST: - return new ListConverter(types); - case STRUCT: - return new StructConverter(types); - default: - throw new IllegalArgumentException("Unhandled type " + types); - } - } -} diff --git a/java/pom.xml b/java/pom.xml index d8ad10c90b..6b30faf066 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -520,6 +520,10 @@ com.google.code.findbugs jsr305 + + com.sun.jersey + jersey-server + commons-daemon commons-daemon