diff --git a/.gitignore b/.gitignore index cd3c0669ca..aa67d3d37a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ dependency-reduced-pom.xml parquet-scrooge/.cache .idea/* target/ +.cache +*~ +mvn_install.log diff --git a/README.md b/README.md index 2d9a50a124..9bb0be661c 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ sudo make install Once protobuf and thrift are available in your path, you can build the project by running: ``` -mvn clean install +LC_ALL=C mvn clean install ``` ## Features diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/SchemaIntersection.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleReadSupport.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/TupleWriteSupport.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java similarity index 100% rename from parquet-cascading/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java rename to parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleRecordMaterializer.java diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java b/parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java similarity index 100% rename from parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java rename to parquet-cascading-common23/src/test/java/org/apache/parquet/cascading/TestParquetTupleScheme.java diff --git a/parquet-cascading/src/test/resources/names.txt b/parquet-cascading-common23/src/test/resources/names.txt similarity index 100% rename from parquet-cascading/src/test/resources/names.txt rename to parquet-cascading-common23/src/test/resources/names.txt diff --git a/parquet-cascading/src/test/thrift/test.thrift b/parquet-cascading-common23/src/test/thrift/test.thrift similarity index 100% rename from parquet-cascading/src/test/thrift/test.thrift rename to parquet-cascading-common23/src/test/thrift/test.thrift diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml index 0cd858886e..cabb0038f7 100644 --- a/parquet-cascading/pom.xml +++ b/parquet-cascading/pom.xml @@ -102,6 +102,51 @@ + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-source + generate-sources + + add-source + + + + ../parquet-cascading-common23/src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ../parquet-cascading-common23/src/test/java + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ../parquet-cascading-common23/src/test/resources + + + + + + maven-enforcer-plugin @@ -115,6 +160,8 @@ 0.1.10 ${thrift.executable} + ../parquet-cascading-common23/src/main/thrift + ../parquet-cascading-common23/src/test/thrift diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java index ea70d43f8e..b34ee7d24a 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; import org.apache.parquet.thrift.TBaseRecordConverter; +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class ParquetTBaseScheme> extends ParquetValueScheme { // In the case of reads, we can read the thrift class from the file metadata diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java index 41b56d0fcb..3b7d715273 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -59,6 +59,7 @@ * @author Avi Bryant */ +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class ParquetTupleScheme extends Scheme{ private static final long serialVersionUID = 0L; diff --git a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java index 9549ef43f6..6c34a8494b 100644 --- a/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java +++ b/parquet-cascading/src/main/java/org/apache/parquet/cascading/ParquetValueScheme.java @@ -47,6 +47,7 @@ * This is an abstract class; implementations are expected to set up their Input/Output Formats * correctly in the respective Init methods. */ +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public abstract class ParquetValueScheme extends Scheme{ public static final class Config implements Serializable { diff --git a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java index 841314ca7c..e0f33e1161 100644 --- a/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java +++ b/parquet-cascading/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java @@ -58,8 +58,9 @@ import java.util.HashMap; import java.util.Map; +@Deprecated // The parquet-cascading module depends on Cascading 2.x, and is being superseded with parquet-cascading3 for Cascading 3.x public class TestParquetTBaseScheme { - final String txtInputPath = "src/test/resources/names.txt"; + final String txtInputPath = "target/test-classes/names.txt"; final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in"; final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out"; final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out"; diff --git a/parquet-cascading3/REVIEWERS.md b/parquet-cascading3/REVIEWERS.md new file mode 100644 index 0000000000..f7972357e9 --- /dev/null +++ b/parquet-cascading3/REVIEWERS.md @@ -0,0 +1,27 @@ + + +The following reviewers had reviewed the parquet-cascading (pre-Cascading 3.0) project: + +| Name | Apache Id | github id | +|--------------------|------------|-------------| +| Dmitriy Ryaboy | dvryaboy | dvryaboy | +| Tianshuo Deng | tianshuo | tsdeng | + + diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml new file mode 100644 index 0000000000..ea552ad004 --- /dev/null +++ b/parquet-cascading3/pom.xml @@ -0,0 +1,178 @@ + + + + org.apache.parquet + parquet + ../pom.xml + 1.8.2-SNAPSHOT + + + 4.0.0 + + parquet-cascading3 + jar + + Apache Parquet Cascading (for Cascading 3.0 onwards) + https://parquet.apache.org + + + + conjars.org + http://conjars.org/repo + + + + + + org.apache.parquet + parquet-column + ${project.version} + + + org.apache.parquet + parquet-hadoop + ${project.version} + + + org.apache.parquet + parquet-thrift + ${project.version} + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + provided + + + org.apache.parquet + parquet-column + ${project.version} + test-jar + test + + + org.mockito + mockito-all + 1.9.5 + test + + + cascading + cascading-hadoop + ${cascading3.version} + provided + + + + + + + + maven-enforcer-plugin + + + none + + + + true + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + add-source + generate-sources + + add-source + + + + ../parquet-cascading-common23/src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ../parquet-cascading-common23/src/test/java + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ../parquet-cascading-common23/src/test/resources + + + + + + + + maven-enforcer-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.thrift.tools + maven-thrift-plugin + 0.1.10 + + ${thrift.executable} + ../parquet-cascading-common23/src/main/thrift + ../parquet-cascading-common23/src/test/thrift + + + + thrift-sources + generate-test-sources + + testCompile + + + + + + + diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java new file mode 100644 index 0000000000..af04b47c8e --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTBaseScheme.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.cascading; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.thrift.TBase; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.hadoop.thrift.ThriftReadSupport; +import org.apache.parquet.hadoop.thrift.TBaseWriteSupport; +import org.apache.parquet.thrift.TBaseRecordConverter; + +public class ParquetTBaseScheme> extends ParquetValueScheme { + + // In the case of reads, we can read the thrift class from the file metadata + public ParquetTBaseScheme() { + this(new Config()); + } + + public ParquetTBaseScheme(Class thriftClass) { + this(new Config().withRecordClass(thriftClass)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate) { + this(new Config().withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(FilterPredicate filterPredicate, Class thriftClass) { + this(new Config().withRecordClass(thriftClass).withFilterPredicate(filterPredicate)); + } + + public ParquetTBaseScheme(Config config) { + super(config); + } + + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + super.sourceConfInit(fp, tap, jobConf); + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, ThriftReadSupport.class); + ThriftReadSupport.setRecordConverterClass(jobConf, TBaseRecordConverter.class); + } + + @Override + public void sinkConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (this.config.getKlass() == null) { + throw new IllegalArgumentException("To use ParquetTBaseScheme as a sink, you must specify a thrift class in the constructor"); + } + + DeprecatedParquetOutputFormat.setAsOutputFormat(jobConf); + DeprecatedParquetOutputFormat.setWriteSupportClass(jobConf, TBaseWriteSupport.class); + TBaseWriteSupport.setThriftClass(jobConf, this.config.getKlass()); + } +} diff --git a/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java new file mode 100644 index 0000000000..4532d3b3f8 --- /dev/null +++ b/parquet-cascading3/src/main/java/org/apache/parquet/cascading/ParquetTupleScheme.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.parquet.cascading; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.CompositeTap; +import cascading.tap.Tap; +import cascading.tap.TapException; +import cascading.tap.hadoop.Hfs; +import cascading.tuple.Fields; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.mapred.Container; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetInputFormat; +import org.apache.parquet.hadoop.mapred.DeprecatedParquetOutputFormat; +import org.apache.parquet.schema.MessageType; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A Cascading Scheme that converts Parquet groups into Cascading tuples. + * If you provide it with sourceFields, it will selectively materialize only the columns for those fields. + * The names must match the names in the Parquet schema. + * If you do not provide sourceFields, or use Fields.ALL or Fields.UNKNOWN, it will create one from the + * Parquet schema. + * Currently, only primitive types are supported. TODO: allow nested fields in the Parquet schema to be + * flattened to a top-level field in the Cascading tuple. + * + * @author Avi Bryant + */ + +public class ParquetTupleScheme extends Scheme{ + + private static final long serialVersionUID = 0L; + private String parquetSchema; + private final FilterPredicate filterPredicate; + + public ParquetTupleScheme() { + super(); + this.filterPredicate = null; + } + + public ParquetTupleScheme(Fields sourceFields) { + super(sourceFields); + this.filterPredicate = null; + } + + public ParquetTupleScheme(FilterPredicate filterPredicate) { + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + public ParquetTupleScheme(FilterPredicate filterPredicate, Fields sourceFields) { + super(sourceFields); + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + } + + /** + * ParquetTupleScheme constructor used a sink need to be implemented + * + * @param sourceFields used for the reading step + * @param sinkFields used for the writing step + * @param schema is mandatory if you add sinkFields and needs to be the + * toString() from a MessageType. This value is going to be parsed when the + * parquet file will be created. + */ + public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) { + super(sourceFields, sinkFields); + parquetSchema = schema; + this.filterPredicate = null; + } + + @SuppressWarnings("rawtypes") + @Override + public void sourceConfInit(FlowProcess fp, + Tap tap, JobConf jobConf) { + + if (filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(jobConf, filterPredicate); + } + + jobConf.setInputFormat(DeprecatedParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(jobConf, TupleReadSupport.class); + TupleReadSupport.setRequestedFields(jobConf, getSourceFields()); + } + + @Override + public Fields retrieveSourceFields(FlowProcess flowProcess, Tap tap) { + MessageType schema = readSchema(flowProcess, tap); + SchemaIntersection intersection = new SchemaIntersection(schema, getSourceFields()); + + setSourceFields(intersection.getSourceFields()); + + return getSourceFields(); + } + + private MessageType readSchema(FlowProcess flowProcess, Tap tap) { + try { + Hfs hfs; + + if( tap instanceof CompositeTap ) + hfs = (Hfs) ( (CompositeTap) tap ).getChildTaps().next(); + else + hfs = (Hfs) tap; + + List