diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java index 41ce738b33..151e0f2aa3 100644 --- a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java +++ b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java @@ -30,9 +30,13 @@ import static org.apache.parquet.pig.TupleReadSupport.PARQUET_COLUMN_INDEX_ACCESS; import static org.apache.parquet.pig.TupleReadSupport.getPigSchemaFromMultipleFiles; +import static org.apache.parquet.filter2.predicate.FilterApi.*; + import java.io.IOException; import java.lang.ref.Reference; import java.lang.ref.SoftReference; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.WeakHashMap; @@ -42,9 +46,13 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; +import org.apache.parquet.io.api.Binary; import org.apache.pig.Expression; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; +import org.apache.pig.LoadPredicatePushdown; import org.apache.pig.LoadPushDown; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; @@ -57,6 +65,11 @@ import org.apache.pig.impl.util.UDFContext; import org.apache.pig.parser.ParserException; +import static org.apache.pig.Expression.BinaryExpression; +import static org.apache.pig.Expression.Column; +import static org.apache.pig.Expression.Const; +import static org.apache.pig.Expression.OpType; + import org.apache.parquet.Log; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.metadata.GlobalMetaData; @@ -70,9 +83,12 @@ * @author Julien Le Dem * */ -public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown { +public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown, LoadPredicatePushdown { private static final Log LOG = Log.getLog(ParquetLoader.class); + public static final String ENABLE_PREDICATE_FILTER_PUSHDOWN = "parquet.pig.predicate.pushdown.enable"; + private static final boolean DEFAULT_PREDICATE_PUSHDOWN_ENABLED = false; + // Using a weak hash map will ensure that the cache will be gc'ed when there is memory pressure static final Map>> inputFormatCache = new WeakHashMap>>(); @@ -169,6 +185,11 @@ private void setInput(String location, Job job) throws IOException { getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema)); getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList)); getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess)); + + FilterPredicate filterPredicate = (FilterPredicate) getFromUDFContext(ParquetInputFormat.FILTER_PREDICATE); + if(filterPredicate != null) { + ParquetInputFormat.setFilterPredicate(getConfiguration(job), filterPredicate); + } } @Override @@ -380,4 +401,149 @@ private Schema getSchemaFromRequiredFieldList(Schema schema, List return s; } + @Override + public List getPredicateFields(String s, Job job) throws IOException { + if(!job.getConfiguration().getBoolean(ENABLE_PREDICATE_FILTER_PUSHDOWN, DEFAULT_PREDICATE_PUSHDOWN_ENABLED)) { + return null; + } + + List fields = new ArrayList(); + + for(FieldSchema field : schema.getFields()) { + switch(field.type) { + case DataType.BOOLEAN: + case DataType.INTEGER: + case DataType.LONG: + case DataType.FLOAT: + case DataType.DOUBLE: + case DataType.CHARARRAY: + fields.add(field.alias); + break; + default: + // Skip BYTEARRAY, TUPLE, MAP, BAG, DATETIME, BIGINTEGER, BIGDECIMAL + break; + } + } + + return fields; + } + + @Override + public List getSupportedExpressionTypes() { + OpType supportedTypes [] = { + OpType.OP_EQ, + OpType.OP_GT, + OpType.OP_GE, + OpType.OP_LT, + OpType.OP_LE, + OpType.OP_AND, + OpType.OP_OR + }; + + return Arrays.asList(supportedTypes); + } + + @Override + public void setPushdownPredicate(Expression e) throws IOException { + LOG.info("Pig pushdown expression: " + e); + + FilterPredicate pred = buildFilter(e); + LOG.info("Parquet filter predicate expression: " + pred); + + storeInUDFContext(ParquetInputFormat.FILTER_PREDICATE, pred); + } + + private FilterPredicate buildFilter(Expression e) { + if (e instanceof BinaryExpression) { + Expression lhs = ((BinaryExpression) e).getLhs(); + Expression rhs = ((BinaryExpression) e).getRhs(); + OpType op = e.getOpType(); + + FilterPredicate lfp; + FilterPredicate rfp; + switch (op) { + case OP_AND: + lfp = buildFilter(lhs); + rfp = buildFilter(rhs); + if (lfp == null || rfp == null) { + return null; + } + return and(lfp, rfp); + case OP_OR: + lfp = buildFilter(lhs); + rfp = buildFilter(rhs); + if (lfp == null || rfp == null) { + return null; + } + return or(lfp, rfp); + } + + if (lhs instanceof Column && rhs instanceof Const) { + return buildFilter(op, (Column) lhs, (Const) rhs); + } else if (lhs instanceof Const && rhs instanceof Column) { + return buildFilter(op, (Column) rhs, (Const) lhs); + } + } + + return null; + } + + private FilterPredicate buildFilter(OpType op, Column col, Const value) { + String name = col.getName(); + try { + FieldSchema f = schema.getField(name); + switch (f.type) { + case DataType.BOOLEAN: + Operators.BooleanColumn boolCol = booleanColumn(name); + switch(op) { + case OP_EQ: return eq(boolCol, getValue(value, boolCol.getColumnType())); + case OP_NE: return notEq(boolCol, getValue(value, boolCol.getColumnType())); + } + case DataType.INTEGER: + Operators.IntColumn intCol = intColumn(name); + return op(op, intCol, value); + case DataType.LONG: + Operators.LongColumn longCol = longColumn(name); + return op(op, longCol, value); + case DataType.FLOAT: + Operators.FloatColumn floatCol = floatColumn(name); + return op(op, floatCol, value); + case DataType.DOUBLE: + Operators.DoubleColumn doubleCol = doubleColumn(name); + return op(op, doubleCol, value); + case DataType.CHARARRAY: + Operators.BinaryColumn binaryCol = binaryColumn(name); + return op(op, binaryCol, value); + + } + } catch (FrontendException e) { + throw new RuntimeException("Error processing pushdown for column:" + col, e); + } + + return null; + } + + private , COL extends Operators.Column & Operators.SupportsLtGt> + FilterPredicate op(Expression.OpType op, COL col, Expression valueExpr) { + C value = getValue(valueExpr, col.getColumnType()); + switch (op) { + case OP_EQ: return eq(col, value); + case OP_GT: return gt(col, value); + case OP_GE: return gtEq(col, value); + case OP_LT: return lt(col, value); + case OP_LE: return ltEq(col, value); + } + return null; + } + + private > C getValue(Expression expr, Class type) { + Comparable value = (Comparable) ((Const) expr).getValue(); + + if (value instanceof String) { + value = Binary.fromString((String) value); + } + + return type.cast(value); + } + } diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java index 6f11538d85..8e57424498 100644 --- a/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java +++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java @@ -22,17 +22,21 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.ExecType; import org.apache.pig.LoadPushDown.RequiredField; import org.apache.pig.LoadPushDown.RequiredFieldList; import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataType; import static org.apache.pig.data.DataType.*; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.tools.pigstats.JobStats; import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals; @@ -175,11 +179,11 @@ public void testReqestedSchemaColumnPruning() throws Exception { for (int i = 0; i < rows; i++) { list.add(Storage.tuple(i, "a"+i, i*2)); } - data.set("in", "i:int, a:chararray, b:int", list ); + data.set("in", "i:int, a:chararray, b:int", list); pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); - pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();"); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); pigServer.executeBatch(); //Test Null Padding at the end @@ -212,7 +216,7 @@ public void testTypePersuasion() throws Exception { for (int i = 0; i < rows; i++) { list.add(Storage.tuple(i, (long)i, (float)i, (double)i, Integer.toString(i), Boolean.TRUE)); } - data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list ); + data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list); pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); @@ -268,11 +272,11 @@ public void testColumnIndexAccess() throws Exception { pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); - pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();"); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); pigServer.executeBatch(); //Test Null Padding at the end - pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int, n2:double, n3:long, n4:chararray', 'true');"); + pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName() + "('n1:int, n2:double, n3:long, n4:chararray', 'true');"); pigServer.registerQuery("STORE B into 'out' using mock.Storage();"); pigServer.executeBatch(); @@ -285,7 +289,7 @@ public void testColumnIndexAccess() throws Exception { assertEquals(4, t.size()); assertEquals(i, t.get(0)); - assertEquals(i*1.0, t.get(1)); + assertEquals(i * 1.0, t.get(1)); assertEquals(i*2L, t.get(2)); assertEquals("v"+i, t.get(3)); } @@ -306,10 +310,10 @@ public void testColumnIndexAccessProjection() throws Exception { pigServer.setBatchOn(); pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); pigServer.deleteFile(out); - pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();"); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); pigServer.executeBatch(); - pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int, n2:double, n3:long, n4:chararray', 'true');"); + pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName() + "('n1:int, n2:double, n3:long, n4:chararray', 'true');"); pigServer.registerQuery("C = foreach B generate n1, n3;"); pigServer.registerQuery("STORE C into 'out' using mock.Storage();"); pigServer.executeBatch(); @@ -325,10 +329,39 @@ public void testColumnIndexAccessProjection() throws Exception { assertEquals(i, t.get(0)); assertEquals(i*2L, t.get(1)); } - } - + } + @Test - public void testRead() { - + public void testPredicatePushdown() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(ParquetLoader.ENABLE_PREDICATE_FILTER_PUSHDOWN, true); + + PigServer pigServer = new PigServer(ExecType.LOCAL, conf); + pigServer.setValidateEachStatement(true); + + String out = "target/out"; + String out2 = "target/out2"; + int rows = 10; + Data data = Storage.resetData(pigServer); + List list = new ArrayList(); + for (int i = 0; i < rows; i++) { + list.add(Storage.tuple(i, i*1.0, i*2L, "v"+i)); + } + data.set("in", "c1:int, c2:double, c3:long, c4:chararray", list); + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); + pigServer.deleteFile(out); + pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName() + "();"); + pigServer.executeBatch(); + + pigServer.deleteFile(out2); + pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName() + "('c1:int, c2:double, c3:long, c4:chararray');"); + pigServer.registerQuery("C = FILTER B by c1 == 1 or c1 == 5;"); + pigServer.registerQuery("STORE C into '" + out2 +"' using mock.Storage();"); + List jobs = pigServer.executeBatch(); + + long recordsRead = jobs.get(0).getStatistics().getInputStats().get(0).getNumberRecords(); + + assertEquals(2, recordsRead); } } diff --git a/pom.xml b/pom.xml index c769ad3696..b2a760cd61 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ 2.10 false - 0.11.1 + 0.14.0 0.7.0 6.5.7 @@ -489,7 +489,7 @@ true 2.3.0 - 0.13.0 + 0.14.0 h2