Skip to content

Hive: Filter pushdown#1326

Merged
rdblue merged 21 commits intoapache:masterfrom
ExpediaGroup:hive-filter-pushdown
Sep 1, 2020
Merged

Hive: Filter pushdown#1326
rdblue merged 21 commits intoapache:masterfrom
ExpediaGroup:hive-filter-pushdown

Conversation

@cmathiesen
Copy link
Contributor

Hello! This is a follow-up PR to the Hive IF PR's that got merged recently that adds filter pushdown to the HiveIcebergInputFormat. I've added a filter factory to convert the Hive filter to an Iceberg Expression and then use the InputFormatConfig to set the filter expression for IcebergInputFormat to apply to the table scan.

cc: @rdblue @guilload @massdosage @pvary @rdsr @shardulm94

Thanks :D

case CONSTANT:
//We are unsure of how the CONSTANT case works, so using the approach of:
//https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/
// ParquetFilterPredicateConverter.java#L116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems to be at odds with the exception being thrown? It looks like in the hive code it just does nothing? Maybe I'm reading it wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks for the spot - I changed the approach here during another review and didn't remove the comment. Will remove :)

private static Object leafToIcebergType(PredicateLeaf leaf) {
switch (leaf.getType()) {
case LONG:
return leaf.getLiteral() != null ? leaf.getLiteral() : leaf.getLiteralList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm new to this code, so I wonder when reading this why we was to get the literal as a list if getLiteral is null? Does having getLiteral() returning null mean that there is a collection type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's sort of whats going on. It looks like you'd only be using getLiteralList if the operator for the leaf was IN or BETWEEN, and then getLiteral for all other operator types. It would either be one or the other, so it seemed easiest to check for a null rather than calling getOperator and having cases to switch through all the different operators, if that makes sense

case TIMESTAMP:
if (leaf.getLiteral() != null) {
Timestamp timestamp = (Timestamp) leaf.getLiteral();
return timestamp.toInstant().getEpochSecond() * 1000000 + timestamp.getNanos() / 1000;
Copy link
Member

@RussellSpitzer RussellSpitzer Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but I tend to make constants for numbers that can be misread like this, MILLION or MICROS_PER_SECOND

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very good point, I'll update these!

* specific language governing permissions and limitations
* under the License.
*/

Copy link
Member

@RussellSpitzer RussellSpitzer Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably have tests for all the filter literal types here, It seems like we are only checking Longs? Especially given the special code around other specific types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should tests as many types as we can. We also don't need to test every predicate for every type. I think it's fine to test each predicate with a long and then to test each type with equals, for example.

}
}

private static Object leafToIcebergType(PredicateLeaf leaf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to split this into two methods: one for a single literal and one for a list of literals. Returning either one as Object doesn't allow us to make sure we're calling getLiteralList for the correct predicates.

public class HiveIcebergFilterFactory {

private static final int MICROS_PER_SECOND = 1000000;
private static final int NANOSECS_PER_MICROSEC = 1000;
Copy link
Contributor

@rdsr rdsr Aug 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to b consistent with MICROS_PER_SECOND maybe we can rename NANOSECS_PER_MICROSEC to NANOS_PER_MICROSEC

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using TimeUnit for the conversion and get rid of those variables altogether?

//Hive converts a Date type to a Timestamp internally when retrieving literal
return ((Timestamp) leaf.getLiteral()).toLocalDateTime().toLocalDate().toEpochDay();
case DECIMAL:
return BigDecimal.valueOf(((HiveDecimalWritable) leaf.getLiteral()).doubleValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if this is correct. I think here, the scale of the BigDecimal will always be 0. Irrespective of the underlying data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is not correct because it discards the scale and precision.

This should follow the examples from ORC, which also convert decimals: https://github.com/apache/iceberg/blob/master/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java#L163

dateValues.replaceAll(value -> ((Date) value).toLocalDate().toEpochDay());
return dateValues;
case DECIMAL:
List<Object> decimalValues = leaf.getLiteralList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it is clearer to not modify the returned list but to use standard idioms like leaf.getLiteralList().stream().map ... or Lists.transform(..)

@@ -51,6 +58,17 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

forwardConfigSettings(job);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmathiesen the latest HiveIcebergInputFormat has changed substantially. Can you please rebase?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebase

Shouldn't we be following the Golden Rule of Rebasing and not do this on public branches? It has the potential to cause all kinds of inconsistencies on other people's checkouts. Surely we should be doing merge? It all gets squash merged at the end so having a pristine history isn't worth the downsides of inconsistencies IHMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that either rebasing or merging master into a PR is okay.

As a reviewer, I don't really consider PR branches to be public because github handles force-pushes well. If I have a PR checked out, I also don't mind resetting to the PR's current state because I like keeping history clean.

That said, if you're sharing a PR branch between people that can be disruptive, so I think it is up to the author and collaborators whether to merge or to rebase to stay up to date with master.

Is that reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable as long as the PR is rebased to master before it is committed. Having a linear history makes things much much easier to track where a change happened.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We always merge by squashing the entire PR into a commit, so we do get a linear history in master.

SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE,
Date.valueOf("2015-11-12")).end().build();

UnboundPredicate expected = Expressions.equal("date", LocalDate.of(2015,11,12));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue @rdsr I added tests for the Date and Timestamp types but when these are run I get errors like:

java.lang.IllegalArgumentException: Cannot create expression literal from java.time.LocalDate: 2015-11-12
	at org.apache.iceberg.expressions.Literals.from(Literals.java:83)
	at org.apache.iceberg.expressions.UnboundPredicate.<init>(UnboundPredicate.java:39)
	at org.apache.iceberg.expressions.Expressions.equal(Expressions.java:159)
        at org.apache.iceberg.mr.hive.TestHiveIcebergFilterFactory.testDateType(TestHiveIcebergFilterFactory.java:211)

I noticed here in another test that Date's etc. are actually passed as Strings - is that the correct option to be using in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a good idea to use a string instead of passing a LocalDate. The intent was to avoid tying the API to date/time representations from a specific library.

case IN:
return in(column, leafToLiteralList(leaf));
case BETWEEN:
List<Object> icebergLiterals = leafToLiteralList(leaf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that there are only two literals here, or is this reliable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we can expect there to be 2, as we're using the BETWEEN operator and Hive wouldn't accept more than 2 arguments

case STRING:
return leaf.getLiteralList();
case DATE:
return leaf.getLiteralList().stream().map(value -> dateToString((Date) value))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a small quirk with the Hive Date type where if you call getLiteral you get a Timestamp back and if you call getLiteralList you get Date objects, which is why there are 2 separate methods for DATE

return leaf.getLiteralList().stream()
.map(value -> ((Timestamp) value).toInstant().getEpochSecond() * MICROS_PER_SECOND +
((Timestamp) value).getNanos() / NANOS_PER_MICROSEC).collect(Collectors.toList());
.map(value -> timestampToTimestampString((Timestamp) value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't convert to a string. Instead, it should convert the Timestamp value directly to microseconds from the unix epoch. String conversion in expressions is only for convenience in tests and for people using the API directly with generics. If an engine passes a predicate, we don't want to needlessly convert to string and back because it is much, much more likely to corrupt the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sure, thank you for explaining that, I think I misunderstood what to do from the last comment - should hopefully be fixed now :)

}

private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
return new BigDecimal(hiveDecimalWritable.toString()).setScale(hiveDecimalWritable.scale());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to convert to String here, either. Can you use the same logic from ORC?

ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
Expression filter = HiveIcebergFilterFactory.generateFilterExpression(sarg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveIcebergFilterFactory.generateFilterExpression might throw UnsupportedOperationException.
Maybe it would be good to catch the exception and continue without filters in case if there is an error.
Hive runs the filters later anyway, so it will not cause issue.


private static long timestampToUnixEpoch(Timestamp timestamp) {
return timestamp.toInstant().getEpochSecond() * TimeUnit.SECONDS.toMicros(1) +
timestamp.getNanos() / TimeUnit.MICROSECONDS.toNanos(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems odd to me. Why not call TimeUnit.SECONDS.toMicros(timestamp.toInstant().getEpochSecond())? Using the toMicros function to get the conversion factor, but not actually using it for conversion is strange.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, similarly for timestamp.getNanos() / TimeUnit.MICROSECONDS.toNanos(1) -TimeUnit.NANOSECONDS.toMicros(timestamp.getNanos())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yep, probably should have spotted that one 😅

}

private static String timestampToDateString(Timestamp timestamp) {
return timestamp.toLocalDateTime().toLocalDate().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dates need to be converted directly to a value and not a string also. You can use DateTimeUtil if you need.

SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE,
Date.valueOf("2015-11-12")).end().build();

UnboundPredicate expected = Expressions.equal("date", "2015-11-12");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this expression should use an integer value instead of a String.


private static long timestampToUnixEpoch(Timestamp timestamp) {
return timestamp.toInstant().getEpochSecond() * TimeUnit.SECONDS.toMicros(1) +
timestamp.getNanos() / TimeUnit.MICROSECONDS.toNanos(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, similarly for timestamp.getNanos() / TimeUnit.MICROSECONDS.toNanos(1) -TimeUnit.NANOSECONDS.toMicros(timestamp.getNanos())

}

@Test
public void testAndOperand() {
Copy link
Contributor

@rdsr rdsr Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test using HiveRunner ? Since Hive stores the table's schema in lowercase I think we might have to support a case insensitive match on the iceberg side.
cc @pvary, @guilload

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely need to test for the lowercase column names, since Hive uses that. It might worth to do it for the InputFormat checks as well. On the other hand I am not sure if HiveRunner helps here or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been working on a HiveRunner test to see what happens in this case:

I've got an Iceberg table with a schema like:

private static final Schema STOCK_LIST_SCHEMA = new Schema(
          required(1, "ITEM_ID", Types.LongType.get()),
          required(2, "ITEM_COUNT", Types.LongType.get())
  );

If I run a regular query either like SELECT ITEM_ID from default.stock_table or SELECT item_id from default.stock_table then this error occurs:

Caused by: java.lang.RuntimeException: cannot find field item_id from [org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector$IcebergRecordStructField@c0fc462a, org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector$IcebergRecordStructField@275a564e]
at org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.getStandardStructFieldRef(ObjectInspectorUtils.java:523)
at org.apache.iceberg.mr.hive.serde.objectinspector.IcebergRecordObjectInspector.getStructFieldRef(IcebergRecordObjectInspector.java:68)
at org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator.initialize(ExprNodeColumnEvaluator.java:56)
at org.apache.hadoop.hive.ql.exec.Operator.initEvaluators(Operator.java:1033)
at org.apache.hadoop.hive.ql.exec.Operator.initEvaluatorsAndReturnStruct(Operator.java:1059)
at org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:75)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:366)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:556)
at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:508)
at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
at org.apache.hadoop.hive.ql.exec.FetchTask.initialize(FetchTask.java:88)
... 29 more

which looks like the case sensitivity issues @rdsr mentioned.

I haven't pushed this test yet but I can do so if others want to reproduce the issue (I've just added a test to HiveIcebergStorageHandlerBaseTest).

Where would be the best place to put in a fix for this? This also doesn't rely on predicate pushdown so it could be done in another PR if needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't look like a pushdown problem, so I'd open a separate PR to fix it and add the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sounds good! I think I've addressed all the other comments on this PR so do you have time for another review? @rdblue @rdsr

}

public static Expression generateFilterExpression(SearchArgument sarg) {
return translate(sarg.getExpression(), sarg.getLeaves());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe logging would be nice here minimally on DEBUG level, but maybe on INFO level, like:

LOG.info("Translated sarg=[{}] to expression=[{}]", sarg, expression);

Not sure about the toString implementations, but the general idea would be to see what went in and what came out.
Also we can add this later, just noting here so we do not forget :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed filters are logged in the scan, so the translated expression is already logged. I assume that Hive also logs the filters that it is pushing, so I don't think this is necessary.

@rdblue
Copy link
Contributor

rdblue commented Aug 30, 2020

@cmathiesen, I had a closer look at the date/time and decimal conversion and found that there were a few bugs. I opened ExpediaGroup#16 with the fixes for those problems. Could you review that and merge?

One major take-away was that it is not safe to call Timestamp.toLocalDate for conversion because that conversion is in local time, not UTC. FYI @massdosage, @rdsr, and @guilload.

This also hits HIVE-19726, which erases milliseconds. It was fixed in Hive 2.4.0, but I've added a work-around in the PR.

@rdblue rdblue merged commit c801a2c into apache:master Sep 1, 2020
@rdblue
Copy link
Contributor

rdblue commented Sep 1, 2020

Thanks @cmathiesen! Merged. And thanks to all the reviewers!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants

Comments