From 88bff7d5b06a73899fc3ee96d7de8f6004dcd50b Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Mon, 24 Feb 2020 21:21:16 -0800 Subject: [PATCH 1/7] Support reading tables with Hive metadata if Iceberg metadata is not available --- .../org/apache/iceberg/BaseFileScanTask.java | 4 +- .../apache/iceberg/BaseMetastoreCatalog.java | 4 +- .../iceberg/BaseMetastoreTableOperations.java | 9 + .../org/apache/iceberg/hive/HiveCatalogs.java | 13 + .../iceberg/hive/legacy/DirectoryInfo.java | 51 ++++ .../iceberg/hive/legacy/FileSystemUtils.java | 60 ++++ .../HiveCatalogWithLegacyReadFallback.java | 73 +++++ .../iceberg/hive/legacy/HiveExpressions.java | 271 ++++++++++++++++++ .../iceberg/hive/legacy/LegacyHiveTable.java | 209 ++++++++++++++ .../legacy/LegacyHiveTableOperations.java | 226 +++++++++++++++ .../hive/legacy/LegacyHiveTableScan.java | 116 ++++++++ .../hive/legacy/LegacyHiveTableUtils.java | 144 ++++++++++ .../hive/legacy/TestHiveExpressions.java | 108 +++++++ 13 files changed, 1284 insertions(+), 4 deletions(-) create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java create mode 100644 hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index 988b07eb05..f9950047c1 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -29,7 +29,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ResidualEvaluator; -class BaseFileScanTask implements FileScanTask { +public class BaseFileScanTask implements FileScanTask { private final DataFile file; private final String schemaString; private final String specString; @@ -37,7 +37,7 @@ class BaseFileScanTask implements FileScanTask { private transient PartitionSpec spec = null; - BaseFileScanTask(DataFile file, String schemaString, String specString, ResidualEvaluator residuals) { + public BaseFileScanTask(DataFile file, String schemaString, String specString, ResidualEvaluator residuals) { this.file = file; this.schemaString = schemaString; this.specString = specString; diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index fd6a817ba9..72d387da49 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -177,7 +177,7 @@ private Table loadMetadataTable(TableIdentifier identifier) { } } - private boolean isValidMetadataIdentifier(TableIdentifier identifier) { + protected boolean isValidMetadataIdentifier(TableIdentifier identifier) { return MetadataTableType.from(identifier.name()) != null && isValidIdentifier(TableIdentifier.of(identifier.namespace().levels())); } @@ -275,7 +275,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { }); } - private static String fullTableName(String catalogName, TableIdentifier identifier) { + protected static String fullTableName(String catalogName, TableIdentifier identifier) { StringBuilder sb = new StringBuilder(); if (catalogName.contains("/") || catalogName.contains(":")) { diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 0d4432e860..d02b7e7873 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -270,4 +270,13 @@ private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metada .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); } } + + protected void setCurrentMetadata(TableMetadata currentMetadata) { + this.currentMetadata = currentMetadata; + } + + protected void setShouldRefresh(boolean shouldRefresh) { + this.shouldRefresh = shouldRefresh; + } + } diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java index d282c99012..4bcb901ace 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.hive.legacy.HiveCatalogWithLegacyReadFallback; public final class HiveCatalogs { @@ -39,6 +40,11 @@ public final class HiveCatalogs { .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) .build(); + private static final Cache CATALOG_WITH_LEGACY_READ_FALLBACK_CACHE = Caffeine.newBuilder() + .expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) + .build(); + private HiveCatalogs() {} public static HiveCatalog loadCatalog(Configuration conf) { @@ -47,6 +53,13 @@ public static HiveCatalog loadCatalog(Configuration conf) { return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf)); } + public static HiveCatalog loadCatalogWithLegacyReadFallback(Configuration conf) { + // metastore URI can be null in local mode + String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); + return CATALOG_WITH_LEGACY_READ_FALLBACK_CACHE.get(metastoreUri, + uri -> new HiveCatalogWithLegacyReadFallback(conf)); + } + /** * @deprecated Use {@link #loadHiveMetadataPreservingCatalog(Configuration)} instead */ diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java new file mode 100644 index 0000000000..c1d490167c --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.StructLike; + + +/** + * Metadata for a data directory referenced by either a Hive table or a partition + */ +class DirectoryInfo { + private String location; + private FileFormat format; + private StructLike partitionData; + + DirectoryInfo(String location, FileFormat format, StructLike partitionData) { + this.location = location; + this.format = format; + this.partitionData = partitionData; + } + + public String location() { + return location; + } + + public FileFormat format() { + return format; + } + + public StructLike partitionData() { + return partitionData; + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java new file mode 100644 index 0000000000..4bd7a135c0 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + + +class FileSystemUtils { + + private FileSystemUtils() {} + + /** + * Lists all non-hidden files for the given directory + */ + static List listFiles(String directory) { + + final Path directoryPath = new Path(directory); + final FileStatus[] files; + try { + FileSystem fs = directoryPath.getFileSystem(new Configuration()); + files = fs.listStatus(directoryPath, HiddenPathFilter.INSTANCE); + } catch (IOException e) { + throw new RuntimeException("Error listing files for directory: " + directory, e); + } + return Arrays.asList(files); + } + + private enum HiddenPathFilter implements PathFilter { + INSTANCE; + + @Override + public boolean accept(Path path) { + return !path.getName().startsWith("_") && !path.getName().startsWith("."); + } + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java new file mode 100644 index 0000000000..a7e03df91b --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hive.HiveCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link HiveCatalog} which uses falls back to using Hive metadata to read tables when Iceberg metadata is not + * available. If the table is read through Hive metadata, features like time travel, snapshot isolation and incremental + * computation are not supported along with any WRITE operations to either the data or metadata. + */ +public class HiveCatalogWithLegacyReadFallback extends HiveCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogWithLegacyReadFallback.class); + + public HiveCatalogWithLegacyReadFallback(Configuration conf) { + super(conf); + } + + @Override + @SuppressWarnings("CatchBlockLogException") + public Table loadTable(TableIdentifier identifier) { + // Try to load the table using Iceberg metadata first. If it fails, use Hive metadata + try { + return super.loadTable(identifier); + } catch (NoSuchTableException e) { + TableOperations ops = legacyTableOps(identifier); + if (ops.current() == null) { + if (isValidMetadataIdentifier(identifier)) { + throw new UnsupportedOperationException( + "Metadata tables not supported for Hive tables without Iceberg metadata. Table: " + identifier); + } + throw new NoSuchTableException("Table does not exist: %s", identifier); + } else { + LOG.info( + "Iceberg metadata does not exist for {}; Falling back to Hive metadata. Time travel, snapshot isolation," + + " incremental computation features will not be available", identifier); + return new LegacyHiveTable(ops, fullTableName(name(), identifier)); + } + } + } + + private TableOperations legacyTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new LegacyHiveTableOperations(conf(), clientPool(), dbName, tableName); + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java new file mode 100644 index 0000000000..54b523933c --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.expressions.UnboundTerm; + + +class HiveExpressions { + + private HiveExpressions() {} + + /** + * Simplifies the {@link Expression} so that it fits the restrictions of the expression that can be passed + * to the Hive metastore. For details about the simplification, please see {@link Simplify} + * @param expr The {@link Expression} to be simplified + * @param partitionColumnNames The set of partition column names + * @return TRUE if the simplified expression results in an always true expression or if there are no predicates on + * partition columns in the simplified expression, + * FALSE if the simplified expression results in an always false expression, + * otherwise returns the simplified expression + */ + static Expression simplifyPartitionFilter(Expression expr, Set partitionColumnNames) { + Expression simplified = ExpressionVisitors.visit(expr, new Simplify(partitionColumnNames)); + if (simplified != null) { + // During simplification of IN, NOTIN, NULL, and NOT NULL expressions we introduce additional NOT, TRUE, and FALSE + // expressions; so we call Simplify again to remove them + simplified = ExpressionVisitors.visit(simplified, new Simplify(partitionColumnNames)); + } + return (simplified == null) ? Expressions.alwaysTrue() : simplified; + } + + /** + * Converts an {@link Expression} into a filter string which can be passed to the Hive metastore + * @param expr The {@link Expression} to be converted into a filter string. This expression must fit the restrictions + * on Hive metastore partition filters. For more details, see {@link Simplify} + * @return a filter string equivalent to the given {@link Expression} which can be passed to the Hive metastore + */ + static String toPartitionFilterString(Expression expr) { + return ExpressionVisitors.visit(expr, ExpressionToPartitionFilterString.get()); + } + + /** + * Simplifies the {@link Expression} so that it fits the restrictions of the expression that can be passed + * to the Hive metastore. It performs the following changes: + * 1. Removes any predicates and non-partition columns + * 2. Rewrites NOT operators + * 3. Removes IS NULL and IS NOT NULL predicates (Replaced with FALSE and TRUE respectively as partition column values + * are always non null for Hive) + * 4. Expands IN and NOT IN operators into ORs of EQUAL operations and ANDs of NOT EQUAL operations respectively + * 5. Removes any children TRUE and FALSE expressions (Note that the simplified expression still can be TRUE and FALSE + * at the root) + */ + private static class Simplify extends ExpressionVisitors.ExpressionVisitor { + + private final Set partitionColumnNamesLowerCase; + + Simplify(Set partitionColumnNames) { + this.partitionColumnNamesLowerCase = + partitionColumnNames.stream().map(String::toLowerCase).collect(Collectors.toSet()); + } + + @Override + public Expression alwaysTrue() { + return Expressions.alwaysTrue(); + } + + @Override + public Expression alwaysFalse() { + return Expressions.alwaysFalse(); + } + + @Override + public Expression not(Expression result) { + return (result == null) ? null : result.negate(); + } + + @Override + public Expression and(Expression leftResult, Expression rightResult) { + if (leftResult == null && rightResult == null) { + return null; + } else if (leftResult == null) { + return rightResult; + } else if (rightResult == null) { + return leftResult; + } else if (leftResult.op() == Expression.Operation.FALSE || rightResult.op() == Expression.Operation.FALSE) { + return Expressions.alwaysFalse(); + } else if (leftResult.op() == Expression.Operation.TRUE) { + return rightResult; + } else if (rightResult.op() == Expression.Operation.TRUE) { + return leftResult; + } else { + return Expressions.and(leftResult, rightResult); + } + } + + @Override + public Expression or(Expression leftResult, Expression rightResult) { + if (leftResult == null && rightResult == null) { + return null; + } else if (leftResult == null || rightResult == null) { + throw new IllegalStateException( + "A filter on a partition column was ORed with a filter on a non-partition column which is not supported" + + " yet"); + } else if (leftResult.op() == Expression.Operation.TRUE || rightResult.op() == Expression.Operation.TRUE) { + return Expressions.alwaysTrue(); + } else if (leftResult.op() == Expression.Operation.FALSE) { + return rightResult; + } else if (rightResult.op() == Expression.Operation.FALSE) { + return leftResult; + } else { + return Expressions.or(leftResult, rightResult); + } + } + + Expression in(UnboundTerm expr, List> literals) { + Expression in = alwaysFalse(); + for (Literal literal : literals) { + in = Expressions.or(in, Expressions.equal(expr, literal.value())); + } + return in; + } + + @Override + public Expression predicate(BoundPredicate pred) { + throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); + } + + @Override + public Expression predicate(UnboundPredicate pred) { + if (!partitionColumnNamesLowerCase.contains(pred.ref().name().toLowerCase())) { + return null; + } + + switch (pred.op()) { + case LT: + case LT_EQ: + case GT: + case GT_EQ: + case EQ: + case NOT_EQ: + return pred; + case IS_NULL: + return Expressions.alwaysFalse(); + case NOT_NULL: + return Expressions.alwaysTrue(); + case IN: + return in(pred.term(), pred.literals()); + case NOT_IN: + return Expressions.not(in(pred.term(), pred.literals())); + case STARTS_WITH: + throw new UnsupportedOperationException("STARTS_WITH predicate not supported in partition filter expression"); + default: + throw new IllegalStateException("Unexpected predicate: " + pred.op()); + } + } + } + + private static class ExpressionToPartitionFilterString extends ExpressionVisitors.ExpressionVisitor { + private static final ExpressionToPartitionFilterString INSTANCE = new ExpressionToPartitionFilterString(); + + private ExpressionToPartitionFilterString() { + } + + static ExpressionToPartitionFilterString get() { + return INSTANCE; + } + + @Override + public String alwaysTrue() { + throw new IllegalStateException("TRUE literal not allowed in Hive partition filter string"); + } + + @Override + public String alwaysFalse() { + throw new IllegalStateException("FALSE literal not allowed in Hive partition filter string"); + } + + @Override + public String not(String result) { + throw new IllegalStateException("NOT operator not allowed in Hive partition filter string"); + } + + @Override + public String and(String leftResult, String rightResult) { + return String.format("((%s) AND (%s))", leftResult, rightResult); + } + + @Override + public String or(String leftResult, String rightResult) { + return String.format("((%s) OR (%s))", leftResult, rightResult); + } + + @Override + public String predicate(BoundPredicate pred) { + throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); + } + + @Override + public String predicate(UnboundPredicate pred) { + switch (pred.op()) { + case LT: + case LT_EQ: + case GT: + case GT_EQ: + case EQ: + case NOT_EQ: + return getBinaryExpressionString(pred.ref().name(), pred.op(), pred.literal()); + default: + throw new IllegalStateException("Unexpected operator in Hive partition filter string: " + pred.op()); + } + } + + private String getBinaryExpressionString(String columnName, Expression.Operation op, Literal lit) { + return String.format("( %s %s %s )", columnName, getOperationString(op), getLiteralValue(lit)); + } + + private String getOperationString(Expression.Operation op) { + switch (op) { + case LT: + return "<"; + case LT_EQ: + return "<="; + case GT: + return ">"; + case GT_EQ: + return ">="; + case EQ: + return "="; + case NOT_EQ: + return "!="; + default: + throw new IllegalStateException("Unexpected operator in Hive partition filter string: " + op); + } + } + + private String getLiteralValue(Literal lit) { + Object value = lit.value(); + if (value instanceof String) { + return String.format("'%s'", value); + } else { + return String.valueOf(value); + } + } + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java new file mode 100644 index 0000000000..d481e5937f --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTable.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.HistoryEntry; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.OverwriteFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.RewriteManifests; +import org.apache.iceberg.Rollback; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateLocation; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; + + +/** + * A {@link Table} which uses Hive table/partition metadata to perform scans using {@link LegacyHiveTableScan}. + * This table does not provide any time travel, snapshot isolation, incremental computation benefits. + * It also does not allow any WRITE operations to either the data or metadata. + */ +public class LegacyHiveTable implements Table, HasTableOperations { + private final TableOperations ops; + private final String name; + + LegacyHiveTable(TableOperations ops, String name) { + this.ops = ops; + this.name = name; + } + + @Override + public TableOperations operations() { + return ops; + } + + @Override + public void refresh() { + ops.refresh(); + } + + @Override + public TableScan newScan() { + return new LegacyHiveTableScan(ops, this); + } + + @Override + public Schema schema() { + return ops.current().schema(); + } + + @Override + public PartitionSpec spec() { + return ops.current().spec(); + } + + @Override + public Map specs() { + throw new UnsupportedOperationException( + "Multiple partition specs not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Map properties() { + return ops.current().properties(); + } + + @Override + public String location() { + return ops.current().location(); + } + + @Override + public Snapshot currentSnapshot() { + throw new UnsupportedOperationException("Snapshots not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Snapshot snapshot(long snapshotId) { + throw new UnsupportedOperationException("Snapshots not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Iterable snapshots() { + throw new UnsupportedOperationException("Snapshots not supported for Hive tables without Iceberg metadata"); + } + + @Override + public List history() { + throw new UnsupportedOperationException("History not available for Hive tables without Iceberg metadata"); + } + + @Override + public UpdateSchema updateSchema() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public UpdateProperties updateProperties() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public UpdateLocation updateLocation() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public AppendFiles newAppend() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public RewriteFiles newRewrite() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public RewriteManifests rewriteManifests() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public OverwriteFiles newOverwrite() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public ReplacePartitions newReplacePartitions() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public DeleteFiles newDelete() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public ExpireSnapshots expireSnapshots() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Rollback rollback() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public ManageSnapshots manageSnapshots() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public Transaction newTransaction() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public FileIO io() { + return ops.io(); + } + + @Override + public EncryptionManager encryption() { + return ops.encryption(); + } + + @Override + public LocationProvider locationProvider() { + return ops.locationProvider(); + } + + @Override + public String toString() { + return name; + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java new file mode 100644 index 0000000000..1255d6e945 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hive.HiveClientPool; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LegacyHiveTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableOperations.class); + + private final HiveClientPool metaClients; + private final String database; + private final String tableName; + private final Configuration conf; + + private FileIO fileIO; + + protected LegacyHiveTableOperations(Configuration conf, HiveClientPool metaClients, String database, String table) { + this.conf = conf; + this.metaClients = metaClients; + this.database = database; + this.tableName = table; + } + + @Override + public FileIO io() { + if (fileIO == null) { + fileIO = new HadoopFileIO(conf); + } + + return fileIO; + } + + @Override + protected void doRefresh() { + try { + org.apache.hadoop.hive.metastore.api.Table hiveTable = + metaClients.run(client -> client.getTable(database, tableName)); + + Schema schema = LegacyHiveTableUtils.getSchema(hiveTable); + PartitionSpec spec = LegacyHiveTableUtils.getPartitionSpec(hiveTable, schema); + + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, hiveTable.getSd().getLocation(), + LegacyHiveTableUtils.getTableProperties(hiveTable)); + metadata = metadata.replaceCurrentSnapshot(getDummySnapshot()); + setCurrentMetadata(metadata); + } catch (TException e) { + String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + setShouldRefresh(false); + } + + DirectoryInfo getDirectoryInfo() { + Preconditions.checkArgument(current().spec().fields().isEmpty(), + "getDirectoryInfo only allowed for unpartitioned tables"); + try { + org.apache.hadoop.hive.metastore.api.Table hiveTable = + metaClients.run(client -> client.getTable(database, tableName)); + + return LegacyHiveTableUtils.toDirectoryInfo(hiveTable); + } catch (TException e) { + String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during refresh", e); + } + } + + List getDirectoryInfosByFilter(Expression expression) { + Preconditions.checkArgument(!current().spec().fields().isEmpty(), + "getDirectoryInfosByFilter only allowed for partitioned tables"); + try { + LOG.info("Fetching partitions for {}.{} with expression: {}", database, tableName, expression); + Set partitionColumnNames = current().spec() + .identitySourceIds() + .stream() + .map(id -> current().schema().findColumnName(id)) + .collect(Collectors.toSet()); + Expression simplified = HiveExpressions.simplifyPartitionFilter(expression, partitionColumnNames); + LOG.info("Simplified expression for {}.{} to {}", database, tableName, simplified); + final List partitions; + // If simplifyPartitionFilter returns TRUE, there are no filters on partition columns or the filter expression is + // going to match all partitions + if (simplified.op() == Expression.Operation.TRUE) { + partitions = metaClients.run(client -> client.listPartitionsByFilter(database, tableName, null, (short) -1)); + } else if (simplified.op() == Expression.Operation.FALSE) { + // If simplifyPartitionFilter returns FALSE, no partitions are going to match the filter expression + partitions = ImmutableList.of(); + } else { + String partitionFilterString = HiveExpressions.toPartitionFilterString(simplified); + LOG.info("Listing partitions for {}.{} with filter string: {}", database, tableName, partitionFilterString); + partitions = metaClients.run( + client -> client.listPartitionsByFilter(database, tableName, partitionFilterString, (short) -1)); + } + + return LegacyHiveTableUtils.toDirectoryInfos(partitions); + } catch (TException e) { + String errMsg = String.format("Failed to get partition info from metastore for %s.%s", database, tableName); + throw new RuntimeException(errMsg, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to getPartitionsByFilter", e); + } + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + @Override + public String metadataFileLocation(String filename) { + throw new UnsupportedOperationException( + "Metadata file location not available for Hive tables without Iceberg metadata"); + } + + @Override + public LocationProvider locationProvider() { + throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); + } + + /** + * A dummy snapshot to represent a table. Without a snapshot, Iceberg will not scan the table and instead eagerly exit + */ + private Snapshot getDummySnapshot() { + final long currentTime = System.currentTimeMillis(); + final long snapshotId = newSnapshotId(); + + return new Snapshot() { + @Override + public long snapshotId() { + return snapshotId; + } + + @Override + public Long parentId() { + return null; + } + + @Override + public long timestampMillis() { + return currentTime; + } + + @Override + public List manifests() { + throw new UnsupportedOperationException( + "Manifest files not available for Hive Tables without Iceberg metadata"); + } + + @Override + public String operation() { + return DataOperations.APPEND; + } + + @Override + public Map summary() { + return ImmutableMap.of(); + } + + @Override + public Iterable addedFiles() { + throw new UnsupportedOperationException( + "Added files information not available for Hive Tables without Iceberg metadata"); + } + + @Override + public Iterable deletedFiles() { + throw new UnsupportedOperationException( + "Deleted files information not available for Hive Tables without Iceberg metadata"); + } + + @Override + public String manifestListLocation() { + throw new UnsupportedOperationException("Manifest list not available for Hive Tables without Iceberg metadata"); + } + }; + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java new file mode 100644 index 0000000000..ab30e743b8 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.hadoop.fs.FileStatus; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DataTableScan; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.util.ParallelIterable; +import org.apache.iceberg.util.ThreadPools; + + +/** + * A {@link DataTableScan} which uses Hive table and partition metadata to read tables. + * This scan does not provide any time travel, snapshot isolation, incremental computation benefits. + */ +public class LegacyHiveTableScan extends DataTableScan { + + protected LegacyHiveTableScan(TableOperations ops, Table table) { + super(ops, table); + } + + protected LegacyHiveTableScan(TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter, + boolean caseSensitive, boolean colStats, Collection selectedColumns, + ImmutableMap options) { + super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options); + } + + @Override + @SuppressWarnings("checkstyle:HiddenField") + protected TableScan newRefinedScan(TableOperations ops, Table table, Long snapshotId, Schema schema, + Expression rowFilter, boolean caseSensitive, boolean colStats, Collection selectedColumns, + ImmutableMap options) { + return new LegacyHiveTableScan(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, + options); + } + + @Override + public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter, + boolean caseSensitive, boolean colStats) { + PartitionSpec spec = ops.current().spec(); + String schemaString = SchemaParser.toJson(spec.schema()); + String specString = PartitionSpecParser.toJson(spec); + // TODO: Consider returning the whole rowFilter as residual since we many not be able to guarantee that all + // predicates for the partition columns are supported by Hive's listPartitionsByFilter + ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive); + + LegacyHiveTableOperations hiveOps = (LegacyHiveTableOperations) ops; + final Iterable matchingDirectories; + if (ops.current().spec().fields().isEmpty()) { + matchingDirectories = ImmutableList.of(hiveOps.getDirectoryInfo()); + } else { + matchingDirectories = hiveOps.getDirectoryInfosByFilter(rowFilter); + } + + Iterable> readers = Iterables.transform(matchingDirectories, directory -> { + return Iterables.transform(FileSystemUtils.listFiles(directory.location()), + file -> new BaseFileScanTask(createDataFile(file, spec, directory.partitionData(), directory.format()), + schemaString, specString, residuals)); + }); + + return new ParallelIterable<>(readers, ThreadPools.getWorkerPool()); + } + + private static DataFile createDataFile(FileStatus fileStatus, PartitionSpec partitionSpec, StructLike partitionData, + FileFormat format) { + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(fileStatus.getPath().toString()) + .withFormat(format) + .withFileSizeInBytes(fileStatus.getLen()) + .withMetrics(new Metrics(10000L, null, null, null, null, null)); + + if (partitionSpec.fields().isEmpty()) { + return builder.build(); + } else { + return builder.withPartition(partitionData).build(); + } + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java new file mode 100644 index 0000000000..6d84260112 --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +class LegacyHiveTableUtils { + + private LegacyHiveTableUtils() {} + + private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableUtils.class); + + static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { + Map props = getTableProperties(table); + String schemaStr = props.get("avro.schema.literal"); + Schema schema; + if (schemaStr == null) { + LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. The schema will not" + + " have case sensitivity and nullability information", table.getDbName(), table.getTableName()); + // TODO: Add support for tables without avro.schema.literal + throw new UnsupportedOperationException("Reading tables without avro.schema.literal not implemented yet"); + } else { + schema = new Schema( + AvroSchemaUtil.convert(new org.apache.avro.Schema.Parser().parse(schemaStr)).asStructType().fields()); + } + + List partCols = table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); + return addPartitionColumnsIfRequired(schema, partCols); + } + + private static Schema addPartitionColumnsIfRequired(Schema schema, List partitionColumns) { + List fields = new ArrayList<>(schema.columns()); + AtomicInteger fieldId = new AtomicInteger(10000); + partitionColumns.stream().forEachOrdered(column -> { + Types.NestedField field = schema.findField(column); + if (field == null) { + // TODO: Support partition fields with non-string types + fields.add(Types.NestedField.required(fieldId.incrementAndGet(), column, Types.StringType.get())); + } else { + Preconditions.checkArgument(field.type().equals(Types.StringType.get()), + "Tables with non-string partition columns not supported yet"); + } + }); + return new Schema(fields); + } + + static Map getTableProperties(org.apache.hadoop.hive.metastore.api.Table table) { + Map props = new HashMap<>(); + props.putAll(table.getSd().getSerdeInfo().getParameters()); + props.putAll(table.getSd().getParameters()); + props.putAll(table.getParameters()); + return props; + } + + static PartitionSpec getPartitionSpec(org.apache.hadoop.hive.metastore.api.Table table, Schema schema) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + + table.getPartitionKeys().forEach(fieldSchema -> { + // TODO: Support partition fields with non-string types + Preconditions.checkArgument(fieldSchema.getType().equals("string"), + "Tables with non-string partition columns not supported yet"); + builder.identity(fieldSchema.getName()); + }); + + return builder.build(); + } + + static DirectoryInfo toDirectoryInfo(org.apache.hadoop.hive.metastore.api.Table table) { + return new DirectoryInfo(table.getSd().getLocation(), + serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()), null); + } + + static List toDirectoryInfos(List partitions) { + return partitions.stream().map(p -> { + return new DirectoryInfo(p.getSd().getLocation(), + serdeToFileFormat(p.getSd().getSerdeInfo().getSerializationLib()), buildPartitionStructLike(p.getValues())); + }).collect(Collectors.toList()); + } + + private static StructLike buildPartitionStructLike(List partitionValues) { + return new StructLike() { + + @Override + public int size() { + return partitionValues.size(); + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(partitionValues.get(pos)); + } + + @Override + public void set(int pos, T value) { + throw new IllegalStateException("Read-only"); + } + }; + } + + private static FileFormat serdeToFileFormat(String serde) { + switch (serde) { + case "org.apache.hadoop.hive.serde2.avro.AvroSerDe": + return FileFormat.AVRO; + case "org.apache.hadoop.hive.ql.io.orc.OrcSerde": + return FileFormat.ORC; + default: + throw new IllegalArgumentException("Unrecognized serde: " + serde); + } + } +} diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java new file mode 100644 index 0000000000..0a17e9b450 --- /dev/null +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.ImmutableSet; +import org.apache.iceberg.expressions.Expression; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.iceberg.expressions.Expressions.alwaysFalse; +import static org.apache.iceberg.expressions.Expressions.alwaysTrue; +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notEqual; +import static org.apache.iceberg.expressions.Expressions.notIn; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; +import static org.apache.iceberg.hive.legacy.HiveExpressions.simplifyPartitionFilter; + + +public class TestHiveExpressions { + + @Test + public void testSimplifyRemoveNonPartitionColumns() { + Expression input = and(and(equal("pCol", 1), equal("nonpCol", 2)), isNull("nonpCol")); + Expression expected = equal("pCol", 1); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveNot() { + Expression input = not(and(equal("pCol", 1), equal("pCol", 2))); + Expression expected = or(notEqual("pCol", 1), notEqual("pCol", 2)); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveIsNull() { + Expression input = isNull("pcol"); + Expression expected = alwaysFalse(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveNotNull() { + Expression input = notNull("pcol"); + Expression expected = alwaysTrue(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyExpandIn() { + Expression input = in("pcol", 1, 2, 3); + Expression expected = or(or(equal("pcol", 1), equal("pcol", 2)), equal("pcol", 3)); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyExpandNotIn() { + Expression input = notIn("pcol", 1, 2, 3); + Expression expected = and(and(notEqual("pcol", 1), notEqual("pcol", 2)), notEqual("pcol", 3)); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveAlwaysTrueChildren() { + Expression input = and(alwaysTrue(), equal("pcol", 1)); + Expression expected = equal("pcol", 1); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + + input = or(alwaysTrue(), equal("pcol", 1)); + expected = alwaysTrue(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + @Test + public void testSimplifyRemoveAlwaysFalseChildren() { + Expression input = and(alwaysFalse(), equal("pcol", 1)); + Expression expected = alwaysFalse(); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + + input = or(alwaysFalse(), equal("pcol", 1)); + expected = equal("pcol", 1); + Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); + } + + +} From 0f9bd27379e87924da5b9783666844a3a035882f Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Mon, 16 Mar 2020 13:23:23 -0700 Subject: [PATCH 2/7] Address RB comments --- ...HiveMetadataPreservingTableOperations.java | 1 + .../iceberg/hive/HiveTableOperations.java | 5 + .../iceberg/hive/legacy/DirectoryInfo.java | 6 +- .../iceberg/hive/legacy/FileSystemUtils.java | 3 +- .../HiveCatalogWithLegacyReadFallback.java | 8 +- .../iceberg/hive/legacy/HiveExpressions.java | 115 +++++--- .../legacy/LegacyHiveTableOperations.java | 98 ++----- .../hive/legacy/LegacyHiveTableScan.java | 23 +- .../hive/legacy/LegacyHiveTableUtils.java | 3 +- .../hive/legacy/TestHiveExpressions.java | 7 +- .../hive/legacy/TestLegacyHiveTableScan.java | 248 ++++++++++++++++++ 11 files changed, 376 insertions(+), 141 deletions(-) create mode 100644 hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java index d6d9f79ead..41c09374f3 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java @@ -83,6 +83,7 @@ protected void doRefresh() { String errMsg = String.format("%s.%s is missing %s property", database, tableName, METADATA_LOCATION_PROP); throw new IllegalArgumentException(errMsg); } + if (!io().newInputFile(metadataLocation).exists()) { String errMsg = String.format("%s property for %s.%s points to a non-existent file %s", METADATA_LOCATION_PROP, database, tableName, metadataLocation); diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index d25fcf9ae6..80bc0b850d 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -113,6 +113,11 @@ protected void doRefresh() { throw new IllegalArgumentException(errMsg); } + if (!io().newInputFile(metadataLocation).exists()) { + String errMsg = String.format("%s property for %s.%s points to a non-existent file %s", + METADATA_LOCATION_PROP, database, tableName, metadataLocation); + throw new IllegalArgumentException(errMsg); + } } catch (NoSuchObjectException e) { if (currentMetadataLocation() != null) { throw new NoSuchTableException(String.format("No such table: %s.%s", database, tableName)); diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java index c1d490167c..50de7957ce 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/DirectoryInfo.java @@ -27,9 +27,9 @@ * Metadata for a data directory referenced by either a Hive table or a partition */ class DirectoryInfo { - private String location; - private FileFormat format; - private StructLike partitionData; + private final String location; + private final FileFormat format; + private final StructLike partitionData; DirectoryInfo(String location, FileFormat format, StructLike partitionData) { this.location = location; diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java index 4bd7a135c0..14a744197c 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.iceberg.exceptions.RuntimeIOException; class FileSystemUtils { @@ -44,7 +45,7 @@ static List listFiles(String directory) { FileSystem fs = directoryPath.getFileSystem(new Configuration()); files = fs.listStatus(directoryPath, HiddenPathFilter.INSTANCE); } catch (IOException e) { - throw new RuntimeException("Error listing files for directory: " + directory, e); + throw new RuntimeIOException(e, "Error listing files for directory: " + directory); } return Arrays.asList(files); } diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java index a7e03df91b..38f5520b09 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java @@ -30,7 +30,7 @@ /** - * A {@link HiveCatalog} which uses falls back to using Hive metadata to read tables when Iceberg metadata is not + * A {@link HiveCatalog} which falls back to using Hive metadata to read tables when Iceberg metadata is not * available. If the table is read through Hive metadata, features like time travel, snapshot isolation and incremental * computation are not supported along with any WRITE operations to either the data or metadata. */ @@ -48,7 +48,7 @@ public Table loadTable(TableIdentifier identifier) { // Try to load the table using Iceberg metadata first. If it fails, use Hive metadata try { return super.loadTable(identifier); - } catch (NoSuchTableException e) { + } catch (NoSuchTableException | IllegalArgumentException e) { TableOperations ops = legacyTableOps(identifier); if (ops.current() == null) { if (isValidMetadataIdentifier(identifier)) { @@ -57,9 +57,9 @@ public Table loadTable(TableIdentifier identifier) { } throw new NoSuchTableException("Table does not exist: %s", identifier); } else { - LOG.info( + LOG.warn( "Iceberg metadata does not exist for {}; Falling back to Hive metadata. Time travel, snapshot isolation," + - " incremental computation features will not be available", identifier); + " and incremental computation features will not be available", identifier); return new LegacyHiveTable(ops, fullTableName(name(), identifier)); } } diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java index 54b523933c..3eba13e164 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -37,7 +37,8 @@ private HiveExpressions() {} /** * Simplifies the {@link Expression} so that it fits the restrictions of the expression that can be passed - * to the Hive metastore. For details about the simplification, please see {@link Simplify} + * to the Hive metastore. For details about the simplification, please see {@link RemoveNonPartitionPredicates} and + * {@link RewriteUnsupportedOperators} * @param expr The {@link Expression} to be simplified * @param partitionColumnNames The set of partition column names * @return TRUE if the simplified expression results in an always true expression or if there are no predicates on @@ -46,19 +47,27 @@ private HiveExpressions() {} * otherwise returns the simplified expression */ static Expression simplifyPartitionFilter(Expression expr, Set partitionColumnNames) { - Expression simplified = ExpressionVisitors.visit(expr, new Simplify(partitionColumnNames)); - if (simplified != null) { - // During simplification of IN, NOTIN, NULL, and NOT NULL expressions we introduce additional NOT, TRUE, and FALSE - // expressions; so we call Simplify again to remove them - simplified = ExpressionVisitors.visit(simplified, new Simplify(partitionColumnNames)); + try { + Expression partitionPredicatesOnly = ExpressionVisitors.visit(expr, + new RemoveNonPartitionPredicates(partitionColumnNames)); + if (partitionPredicatesOnly == null) { + return Expressions.alwaysTrue(); + } else { + Expression rewritten = ExpressionVisitors.visit(partitionPredicatesOnly, new RewriteUnsupportedOperators()); + // During rewrite of IN, NOTIN, NULL, and NOT NULL expressions we introduce additional NOT, TRUE, and FALSE + // expressions; so we call RewriteUnsupportedOperators again to remove them + rewritten = ExpressionVisitors.visit(rewritten, new RewriteUnsupportedOperators()); + return rewritten; + } + } catch (Exception e) { + throw new RuntimeException("Error while processing expression: " + expr, e); } - return (simplified == null) ? Expressions.alwaysTrue() : simplified; } /** * Converts an {@link Expression} into a filter string which can be passed to the Hive metastore * @param expr The {@link Expression} to be converted into a filter string. This expression must fit the restrictions - * on Hive metastore partition filters. For more details, see {@link Simplify} + * on Hive metastore partition filters. For more details, see {@link RewriteUnsupportedOperators} * @return a filter string equivalent to the given {@link Expression} which can be passed to the Hive metastore */ static String toPartitionFilterString(Expression expr) { @@ -66,21 +75,13 @@ static String toPartitionFilterString(Expression expr) { } /** - * Simplifies the {@link Expression} so that it fits the restrictions of the expression that can be passed - * to the Hive metastore. It performs the following changes: - * 1. Removes any predicates and non-partition columns - * 2. Rewrites NOT operators - * 3. Removes IS NULL and IS NOT NULL predicates (Replaced with FALSE and TRUE respectively as partition column values - * are always non null for Hive) - * 4. Expands IN and NOT IN operators into ORs of EQUAL operations and ANDs of NOT EQUAL operations respectively - * 5. Removes any children TRUE and FALSE expressions (Note that the simplified expression still can be TRUE and FALSE - * at the root) + * Removes any predicates on non-partition columns from the given {@link Expression} */ - private static class Simplify extends ExpressionVisitors.ExpressionVisitor { + private static class RemoveNonPartitionPredicates extends ExpressionVisitors.ExpressionVisitor { private final Set partitionColumnNamesLowerCase; - Simplify(Set partitionColumnNames) { + RemoveNonPartitionPredicates(Set partitionColumnNames) { this.partitionColumnNamesLowerCase = partitionColumnNames.stream().map(String::toLowerCase).collect(Collectors.toSet()); } @@ -97,7 +98,7 @@ public Expression alwaysFalse() { @Override public Expression not(Expression result) { - return (result == null) ? null : result.negate(); + return (result == null) ? null : Expressions.not(result); } @Override @@ -108,12 +109,6 @@ public Expression and(Expression leftResult, Expression rightResult) { return rightResult; } else if (rightResult == null) { return leftResult; - } else if (leftResult.op() == Expression.Operation.FALSE || rightResult.op() == Expression.Operation.FALSE) { - return Expressions.alwaysFalse(); - } else if (leftResult.op() == Expression.Operation.TRUE) { - return rightResult; - } else if (rightResult.op() == Expression.Operation.TRUE) { - return leftResult; } else { return Expressions.and(leftResult, rightResult); } @@ -125,23 +120,64 @@ public Expression or(Expression leftResult, Expression rightResult) { return null; } else if (leftResult == null || rightResult == null) { throw new IllegalStateException( - "A filter on a partition column was ORed with a filter on a non-partition column which is not supported" + - " yet"); - } else if (leftResult.op() == Expression.Operation.TRUE || rightResult.op() == Expression.Operation.TRUE) { - return Expressions.alwaysTrue(); - } else if (leftResult.op() == Expression.Operation.FALSE) { - return rightResult; - } else if (rightResult.op() == Expression.Operation.FALSE) { - return leftResult; + "A filter on a partition column was ORed with a filter on a non-partition column which is not supported"); } else { return Expressions.or(leftResult, rightResult); } } - Expression in(UnboundTerm expr, List> literals) { + @Override + public Expression predicate(BoundPredicate pred) { + throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); + } + + @Override + public Expression predicate(UnboundPredicate pred) { + return (partitionColumnNamesLowerCase.contains(pred.ref().name().toLowerCase())) ? pred : null; + } + } + + /** + * Rewrites the {@link Expression} so that it fits the restrictions of the expression that can be passed + * to the Hive metastore. It performs the following changes: + * 1. Rewrites NOT operators by inverting binary operators, negating unary literals and De Morgan's laws + * 2. Removes IS NULL and IS NOT NULL predicates (Replaced with FALSE and TRUE respectively as partition column values + * are always non null for Hive) + * 3. Expands IN and NOT IN operators into ORs of EQUAL operations and ANDs of NOT EQUAL operations respectively + * 4. Removes any children TRUE and FALSE expressions (Note that the rewritten expression still can be TRUE and FALSE + * at the root and will have to be handled appropriately by the caller) + */ + private static class RewriteUnsupportedOperators extends ExpressionVisitors.ExpressionVisitor { + + @Override + public Expression alwaysTrue() { + return Expressions.alwaysTrue(); + } + + @Override + public Expression alwaysFalse() { + return Expressions.alwaysFalse(); + } + + @Override + public Expression not(Expression result) { + return result.negate(); + } + + @Override + public Expression and(Expression leftResult, Expression rightResult) { + return Expressions.and(leftResult, rightResult); + } + + @Override + public Expression or(Expression leftResult, Expression rightResult) { + return Expressions.or(leftResult, rightResult); + } + + Expression in(UnboundTerm term, List> literals) { Expression in = alwaysFalse(); for (Literal literal : literals) { - in = Expressions.or(in, Expressions.equal(expr, literal.value())); + in = Expressions.or(in, Expressions.equal(term, literal.value())); } return in; } @@ -153,10 +189,6 @@ public Expression predicate(BoundPredicate pred) { @Override public Expression predicate(UnboundPredicate pred) { - if (!partitionColumnNamesLowerCase.contains(pred.ref().name().toLowerCase())) { - return null; - } - switch (pred.op()) { case LT: case LT_EQ: @@ -262,7 +294,8 @@ private String getOperationString(Expression.Operation op) { private String getLiteralValue(Literal lit) { Object value = lit.value(); if (value instanceof String) { - return String.format("'%s'", value); + String escapedString = ((String) value).replace("'", "\\'"); + return String.format("'%s'", escapedString); } else { return String.valueOf(value); } diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java index 1255d6e945..3d55a8dd0b 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -21,22 +21,17 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.iceberg.BaseMetastoreTableOperations; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataOperations; -import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hive.HiveClientPool; import org.apache.iceberg.io.FileIO; @@ -51,7 +46,7 @@ public class LegacyHiveTableOperations extends BaseMetastoreTableOperations { private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableOperations.class); private final HiveClientPool metaClients; - private final String database; + private final String databaseName; private final String tableName; private final Configuration conf; @@ -60,7 +55,7 @@ public class LegacyHiveTableOperations extends BaseMetastoreTableOperations { protected LegacyHiveTableOperations(Configuration conf, HiveClientPool metaClients, String database, String table) { this.conf = conf; this.metaClients = metaClients; - this.database = database; + this.databaseName = database; this.tableName = table; } @@ -77,17 +72,16 @@ public FileIO io() { protected void doRefresh() { try { org.apache.hadoop.hive.metastore.api.Table hiveTable = - metaClients.run(client -> client.getTable(database, tableName)); + metaClients.run(client -> client.getTable(databaseName, tableName)); Schema schema = LegacyHiveTableUtils.getSchema(hiveTable); PartitionSpec spec = LegacyHiveTableUtils.getPartitionSpec(hiveTable, schema); TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, hiveTable.getSd().getLocation(), LegacyHiveTableUtils.getTableProperties(hiveTable)); - metadata = metadata.replaceCurrentSnapshot(getDummySnapshot()); setCurrentMetadata(metadata); } catch (TException e) { - String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName); + String errMsg = String.format("Failed to get table info from metastore %s.%s", databaseName, tableName); throw new RuntimeException(errMsg, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -101,15 +95,15 @@ DirectoryInfo getDirectoryInfo() { "getDirectoryInfo only allowed for unpartitioned tables"); try { org.apache.hadoop.hive.metastore.api.Table hiveTable = - metaClients.run(client -> client.getTable(database, tableName)); + metaClients.run(client -> client.getTable(databaseName, tableName)); return LegacyHiveTableUtils.toDirectoryInfo(hiveTable); } catch (TException e) { - String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName); + String errMsg = String.format("Failed to get table info for %s.%s from metastore", databaseName, tableName); throw new RuntimeException(errMsg, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted during refresh", e); + throw new RuntimeException("Interrupted in call to getDirectoryInfo", e); } } @@ -117,32 +111,34 @@ List getDirectoryInfosByFilter(Expression expression) { Preconditions.checkArgument(!current().spec().fields().isEmpty(), "getDirectoryInfosByFilter only allowed for partitioned tables"); try { - LOG.info("Fetching partitions for {}.{} with expression: {}", database, tableName, expression); + LOG.info("Fetching partitions for {}.{} with expression: {}", databaseName, tableName, expression); Set partitionColumnNames = current().spec() .identitySourceIds() .stream() .map(id -> current().schema().findColumnName(id)) .collect(Collectors.toSet()); Expression simplified = HiveExpressions.simplifyPartitionFilter(expression, partitionColumnNames); - LOG.info("Simplified expression for {}.{} to {}", database, tableName, simplified); + LOG.info("Simplified expression for {}.{} to {}", databaseName, tableName, simplified); final List partitions; // If simplifyPartitionFilter returns TRUE, there are no filters on partition columns or the filter expression is // going to match all partitions - if (simplified.op() == Expression.Operation.TRUE) { - partitions = metaClients.run(client -> client.listPartitionsByFilter(database, tableName, null, (short) -1)); - } else if (simplified.op() == Expression.Operation.FALSE) { + if (simplified.equals(Expressions.alwaysTrue())) { + partitions = metaClients.run(client -> client.listPartitionsByFilter( + databaseName, tableName, null, (short) -1)); + } else if (simplified.equals(Expressions.alwaysFalse())) { // If simplifyPartitionFilter returns FALSE, no partitions are going to match the filter expression partitions = ImmutableList.of(); } else { String partitionFilterString = HiveExpressions.toPartitionFilterString(simplified); - LOG.info("Listing partitions for {}.{} with filter string: {}", database, tableName, partitionFilterString); + LOG.info("Listing partitions for {}.{} with filter string: {}", databaseName, tableName, partitionFilterString); partitions = metaClients.run( - client -> client.listPartitionsByFilter(database, tableName, partitionFilterString, (short) -1)); + client -> client.listPartitionsByFilter(databaseName, tableName, partitionFilterString, (short) -1)); } return LegacyHiveTableUtils.toDirectoryInfos(partitions); } catch (TException e) { - String errMsg = String.format("Failed to get partition info from metastore for %s.%s", database, tableName); + String errMsg = String.format("Failed to get partition info for %s.%s + expression %s from metastore", + databaseName, tableName, expression); throw new RuntimeException(errMsg, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -165,62 +161,4 @@ public String metadataFileLocation(String filename) { public LocationProvider locationProvider() { throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); } - - /** - * A dummy snapshot to represent a table. Without a snapshot, Iceberg will not scan the table and instead eagerly exit - */ - private Snapshot getDummySnapshot() { - final long currentTime = System.currentTimeMillis(); - final long snapshotId = newSnapshotId(); - - return new Snapshot() { - @Override - public long snapshotId() { - return snapshotId; - } - - @Override - public Long parentId() { - return null; - } - - @Override - public long timestampMillis() { - return currentTime; - } - - @Override - public List manifests() { - throw new UnsupportedOperationException( - "Manifest files not available for Hive Tables without Iceberg metadata"); - } - - @Override - public String operation() { - return DataOperations.APPEND; - } - - @Override - public Map summary() { - return ImmutableMap.of(); - } - - @Override - public Iterable addedFiles() { - throw new UnsupportedOperationException( - "Added files information not available for Hive Tables without Iceberg metadata"); - } - - @Override - public Iterable deletedFiles() { - throw new UnsupportedOperationException( - "Deleted files information not available for Hive Tables without Iceberg metadata"); - } - - @Override - public String manifestListLocation() { - throw new UnsupportedOperationException("Manifest list not available for Hive Tables without Iceberg metadata"); - } - }; - } } diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java index ab30e743b8..4a89106baf 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java @@ -73,21 +73,20 @@ protected TableScan newRefinedScan(TableOperations ops, Table table, Long snapsh } @Override - public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter, - boolean caseSensitive, boolean colStats) { - PartitionSpec spec = ops.current().spec(); + public CloseableIterable planFiles() { + LegacyHiveTableOperations hiveOps = (LegacyHiveTableOperations) tableOps(); + PartitionSpec spec = hiveOps.current().spec(); String schemaString = SchemaParser.toJson(spec.schema()); String specString = PartitionSpecParser.toJson(spec); - // TODO: Consider returning the whole rowFilter as residual since we many not be able to guarantee that all + // TODO: Consider returning the whole filter() as residual since we many not be able to guarantee that all // predicates for the partition columns are supported by Hive's listPartitionsByFilter - ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive); + ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter(), isCaseSensitive()); - LegacyHiveTableOperations hiveOps = (LegacyHiveTableOperations) ops; - final Iterable matchingDirectories; - if (ops.current().spec().fields().isEmpty()) { + Iterable matchingDirectories; + if (hiveOps.current().spec().fields().isEmpty()) { matchingDirectories = ImmutableList.of(hiveOps.getDirectoryInfo()); } else { - matchingDirectories = hiveOps.getDirectoryInfosByFilter(rowFilter); + matchingDirectories = hiveOps.getDirectoryInfosByFilter(filter()); } Iterable> readers = Iterables.transform(matchingDirectories, directory -> { @@ -99,6 +98,12 @@ public CloseableIterable planFiles(TableOperations ops, Snapshot s return new ParallelIterable<>(readers, ThreadPools.getWorkerPool()); } + @Override + public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot, + Expression rowFilter, boolean caseSensitive, boolean colStats) { + throw new IllegalStateException("Control flow should never reach here"); + } + private static DataFile createDataFile(FileStatus fileStatus, PartitionSpec partitionSpec, StructLike partitionData, FileFormat format) { DataFiles.Builder builder = DataFiles.builder(partitionSpec) diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java index 6d84260112..f2fcc31db7 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java @@ -54,8 +54,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { // TODO: Add support for tables without avro.schema.literal throw new UnsupportedOperationException("Reading tables without avro.schema.literal not implemented yet"); } else { - schema = new Schema( - AvroSchemaUtil.convert(new org.apache.avro.Schema.Parser().parse(schemaStr)).asStructType().fields()); + schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr)); } List partCols = table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList()); diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java index 0a17e9b450..4489e003db 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestHiveExpressions.java @@ -36,6 +36,7 @@ import static org.apache.iceberg.expressions.Expressions.notNull; import static org.apache.iceberg.expressions.Expressions.or; import static org.apache.iceberg.hive.legacy.HiveExpressions.simplifyPartitionFilter; +import static org.apache.iceberg.hive.legacy.HiveExpressions.toPartitionFilterString; public class TestHiveExpressions { @@ -104,5 +105,9 @@ public void testSimplifyRemoveAlwaysFalseChildren() { Assert.assertEquals(expected.toString(), simplifyPartitionFilter(input, ImmutableSet.of("pcol")).toString()); } - + @Test + public void testToPartitionFilterStringEscapeStringLiterals() { + Expression input = equal("pcol", "s'1"); + Assert.assertEquals("( pcol = 's\\'1' )", toPartitionFilterString(input)); + } } diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java new file mode 100644 index 0000000000..8fa1378972 --- /dev/null +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveMetastoreTest; +import org.apache.iceberg.io.CloseableIterable; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.iceberg.FileFormat.AVRO; +import static org.apache.iceberg.FileFormat.ORC; + + +public class TestLegacyHiveTableScan extends HiveMetastoreTest { + + private static final List DATA_COLUMNS = ImmutableList.of( + new FieldSchema("strCol", "string", ""), + new FieldSchema("intCol", "int", "")); + private static final List PARTITION_COLUMNS = ImmutableList.of( + new FieldSchema("pcol", "string", "")); + private static HiveCatalog readFallbackCatalog; + private static Path dbPath; + + @BeforeClass + public static void beforeClass() throws Exception { + readFallbackCatalog = new HiveCatalogWithLegacyReadFallback(HiveMetastoreTest.hiveConf); + dbPath = Paths.get(URI.create(metastoreClient.getDatabase(DB_NAME).getLocationUri())); + } + + @AfterClass + public static void afterClass() { + readFallbackCatalog.close(); + TestLegacyHiveTableScan.readFallbackCatalog = null; + } + + @Test + public void testHiveScanUnpartitioned() throws Exception { + String tableName = "unpartitioned"; + Table table = createTable(tableName, DATA_COLUMNS, ImmutableList.of()); + addFiles(table, AVRO, "A", "B"); + filesMatch(ImmutableMap.of("A", AVRO, "B", AVRO), hiveScan(table)); + } + + @Test + public void testHiveScanSinglePartition() throws Exception { + String tableName = "single_partition"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A", "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=1/B", AVRO), hiveScan(table)); + } + + @Test + public void testHiveScanMultiPartition() throws Exception { + String tableName = "multi_partition"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", AVRO), hiveScan(table)); + } + + @Test + public void testHiveScanMultiPartitionWithFilter() throws Exception { + String tableName = "multi_partition_with_filter"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=2/B", AVRO), hiveScan(table, Expressions.equal("pcol", "2"))); + } + + @Test + public void testHiveScanMultiPartitionWithNonPartitionFilter() throws Exception { + String tableName = "multi_partition_with_non_partition_filter"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), AVRO, "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", AVRO), hiveScan(table, Expressions.equal("intCol", 1))); + } + + @Test + public void testHiveScanHybridTable() throws Exception { + String tableName = "hybrid_table"; + Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS); + addPartition(table, ImmutableList.of("1"), AVRO, "A"); + addPartition(table, ImmutableList.of("2"), ORC, "B"); + filesMatch(ImmutableMap.of("pcol=1/A", AVRO, "pcol=2/B", ORC), hiveScan(table)); + } + + private static Table createTable(String tableName, List columns, List partitionColumns) + throws Exception { + long currentTimeMillis = System.currentTimeMillis(); + Path tableLocation = dbPath.resolve(tableName); + Files.createDirectories(tableLocation); + Table tbl = new Table(tableName, + DB_NAME, + System.getProperty("user.name"), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + storageDescriptor(columns, tableLocation.toString(), AVRO), + partitionColumns, + new HashMap<>(), + null, + null, + TableType.EXTERNAL_TABLE.toString()); + tbl.getParameters().put("EXTERNAL", "TRUE"); + metastoreClient.createTable(tbl); + return tbl; + } + + private static StorageDescriptor storageDescriptor(List columns, String location, FileFormat format) { + final StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(columns); + storageDescriptor.setLocation(location); + SerDeInfo serDeInfo = new SerDeInfo(); + switch (format) { + case AVRO: + storageDescriptor.setOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"); + storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"); + serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.avro.AvroSerDe"); + break; + case ORC: + storageDescriptor.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); + serDeInfo.setSerializationLib("org.apache.hadoop.hive.ql.io.orc.OrcSerde"); + break; + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + storageDescriptor.setSerdeInfo(serDeInfo); + storageDescriptor.setParameters(ImmutableMap.of( + AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schemaLiteral(columns))); + return storageDescriptor; + } + + private static String schemaLiteral(List columns) { + List columnNames = columns.stream().map(FieldSchema::getName).collect(Collectors.toList()); + List columnTypes = columns.stream().map(c -> TypeInfoUtils.getTypeInfoFromTypeString(c.getType())) + .collect(Collectors.toList()); + return AvroSerDe.getSchemaFromCols(new Properties(), columnNames, columnTypes, null).toString(); + } + + private static Path location(Table table) { + return Paths.get(table.getSd().getLocation()); + } + + private static Path location(Table table, List partitionValues) { + Path partitionLocation = location(table); + for (int i = 0; i < table.getPartitionKeysSize(); i++) { + partitionLocation = partitionLocation.resolve( + table.getPartitionKeys().get(i).getName() + "=" + partitionValues.get(i)); + } + return partitionLocation; + } + + private void addFiles(Table table, FileFormat format, String... fileNames) throws IOException { + Path tableLocation = location(table); + for (String fileName : fileNames) { + Path filePath = tableLocation.resolve(format.addExtension(fileName)); + Files.createFile(filePath); + } + } + + private void addPartition(Table table, List partitionValues, FileFormat format, String... fileNames) + throws Exception { + Path partitionLocation = location(table, partitionValues); + Files.createDirectories(partitionLocation); + long currentTimeMillis = System.currentTimeMillis(); + metastoreClient.add_partition(new Partition( + partitionValues, + table.getDbName(), + table.getTableName(), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + storageDescriptor(table.getSd().getCols(), partitionLocation.toString(), format), + new HashMap<>() + )); + for (String fileName : fileNames) { + Path filePath = partitionLocation.resolve(format.addExtension(fileName)); + Files.createFile(filePath); + } + } + + private Map hiveScan(Table table) { + return hiveScan(table, Expressions.alwaysTrue()); + } + + private Map hiveScan(Table table, Expression filter) { + Path tableLocation = location(table); + CloseableIterable fileScanTasks = readFallbackCatalog + .loadTable(TableIdentifier.of(table.getDbName(), table.getTableName())) + .newScan().filter(filter).planFiles(); + return StreamSupport.stream(fileScanTasks.spliterator(), false).collect(Collectors.toMap( + f -> tableLocation.relativize(Paths.get(URI.create(f.file().path().toString()))).toString().split("\\.")[0], + f -> f.file().format())); + } + + private static void filesMatch(Map expected, Map actual) { + Assert.assertEquals(expected, actual); + } +} From cb15c65cc1e09de7a75caf539bfc89af74b68125 Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Tue, 17 Mar 2020 03:18:31 -0700 Subject: [PATCH 3/7] Address PR comments Take 2 --- .../org/apache/iceberg/hive/HiveCatalogs.java | 9 +- .../iceberg/hive/legacy/FileSystemUtils.java | 4 +- .../HiveCatalogWithLegacyReadFallback.java | 73 ------------ .../hive/legacy/LegacyHiveCatalog.java | 105 ++++++++++++++++++ .../legacy/LegacyHiveTableOperations.java | 51 ++++++++- .../hive/legacy/LegacyHiveTableScan.java | 38 +------ .../hive/legacy/TestLegacyHiveTableScan.java | 10 +- 7 files changed, 168 insertions(+), 122 deletions(-) delete mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java create mode 100644 hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java index 4bcb901ace..6026ba63fa 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.iceberg.hive.legacy.HiveCatalogWithLegacyReadFallback; +import org.apache.iceberg.hive.legacy.LegacyHiveCatalog; public final class HiveCatalogs { @@ -40,7 +40,7 @@ public final class HiveCatalogs { .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) .build(); - private static final Cache CATALOG_WITH_LEGACY_READ_FALLBACK_CACHE = Caffeine.newBuilder() + private static final Cache LEGACY_CATALOG_CACHE = Caffeine.newBuilder() .expireAfterAccess(10, TimeUnit.MINUTES) .removalListener((RemovalListener) (uri, catalog, cause) -> catalog.close()) .build(); @@ -53,11 +53,10 @@ public static HiveCatalog loadCatalog(Configuration conf) { return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf)); } - public static HiveCatalog loadCatalogWithLegacyReadFallback(Configuration conf) { + public static HiveCatalog loadLegacyCatalog(Configuration conf) { // metastore URI can be null in local mode String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""); - return CATALOG_WITH_LEGACY_READ_FALLBACK_CACHE.get(metastoreUri, - uri -> new HiveCatalogWithLegacyReadFallback(conf)); + return LEGACY_CATALOG_CACHE.get(metastoreUri, uri -> new LegacyHiveCatalog(conf)); } /** diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java index 14a744197c..81cad5f5bc 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/FileSystemUtils.java @@ -37,12 +37,12 @@ private FileSystemUtils() {} /** * Lists all non-hidden files for the given directory */ - static List listFiles(String directory) { + static List listFiles(String directory, Configuration conf) { final Path directoryPath = new Path(directory); final FileStatus[] files; try { - FileSystem fs = directoryPath.getFileSystem(new Configuration()); + FileSystem fs = directoryPath.getFileSystem(conf); files = fs.listStatus(directoryPath, HiddenPathFilter.INSTANCE); } catch (IOException e) { throw new RuntimeIOException(e, "Error listing files for directory: " + directory); diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java deleted file mode 100644 index 38f5520b09..0000000000 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveCatalogWithLegacyReadFallback.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.hive.legacy; - -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.hive.HiveCatalog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A {@link HiveCatalog} which falls back to using Hive metadata to read tables when Iceberg metadata is not - * available. If the table is read through Hive metadata, features like time travel, snapshot isolation and incremental - * computation are not supported along with any WRITE operations to either the data or metadata. - */ -public class HiveCatalogWithLegacyReadFallback extends HiveCatalog { - - private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogWithLegacyReadFallback.class); - - public HiveCatalogWithLegacyReadFallback(Configuration conf) { - super(conf); - } - - @Override - @SuppressWarnings("CatchBlockLogException") - public Table loadTable(TableIdentifier identifier) { - // Try to load the table using Iceberg metadata first. If it fails, use Hive metadata - try { - return super.loadTable(identifier); - } catch (NoSuchTableException | IllegalArgumentException e) { - TableOperations ops = legacyTableOps(identifier); - if (ops.current() == null) { - if (isValidMetadataIdentifier(identifier)) { - throw new UnsupportedOperationException( - "Metadata tables not supported for Hive tables without Iceberg metadata. Table: " + identifier); - } - throw new NoSuchTableException("Table does not exist: %s", identifier); - } else { - LOG.warn( - "Iceberg metadata does not exist for {}; Falling back to Hive metadata. Time travel, snapshot isolation," + - " and incremental computation features will not be available", identifier); - return new LegacyHiveTable(ops, fullTableName(name(), identifier)); - } - } - } - - private TableOperations legacyTableOps(TableIdentifier tableIdentifier) { - String dbName = tableIdentifier.namespace().level(0); - String tableName = tableIdentifier.name(); - return new LegacyHiveTableOperations(conf(), clientPool(), dbName, tableName); - } -} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java new file mode 100644 index 0000000000..c97d29a94c --- /dev/null +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveCatalog.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hive.HiveCatalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link HiveCatalog} which uses Hive metadata to read tables. Features like time travel, snapshot isolation and + * incremental computation are not supported along with any WRITE operations to either the data or metadata. + */ +public class LegacyHiveCatalog extends HiveCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveCatalog.class); + + public LegacyHiveCatalog(Configuration conf) { + super(conf); + } + + @Override + @SuppressWarnings("CatchBlockLogException") + public Table loadTable(TableIdentifier identifier) { + if (isValidIdentifier(identifier)) { + TableOperations ops = newTableOps(identifier); + if (ops.current() == null) { + throw new NoSuchTableException("Table does not exist: %s", identifier); + } + + return new LegacyHiveTable(ops, fullTableName(name(), identifier)); + } else if (isValidMetadataIdentifier(identifier)) { + throw new UnsupportedOperationException( + "Metadata views not supported for Hive tables without Iceberg metadata. Table: " + identifier); + } else { + throw new NoSuchTableException("Invalid table identifier: %s", identifier); + } + } + + @Override + public TableOperations newTableOps(TableIdentifier tableIdentifier) { + String dbName = tableIdentifier.namespace().level(0); + String tableName = tableIdentifier.name(); + return new LegacyHiveTableOperations(conf(), clientPool(), dbName, tableName); + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + throw new UnsupportedOperationException( + "Dropping tables not supported through legacy Hive catalog. Table: " + identifier); + } + + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + throw new UnsupportedOperationException( + "Renaming tables not supported through legacy Hive catalog. From: " + from + " To: " + to); + } + + @Override + public Table createTable(TableIdentifier identifier, Schema schema, PartitionSpec spec, String location, + Map properties) { + throw new UnsupportedOperationException( + "Creating tables not supported through legacy Hive catalog. Table: " + identifier); + } + + @Override + public Transaction newCreateTableTransaction(TableIdentifier identifier, Schema schema, PartitionSpec spec, + String location, Map properties) { + throw new UnsupportedOperationException( + "Creating tables not supported through legacy Hive catalog. Table: " + identifier); + } + + @Override + public Transaction newReplaceTableTransaction(TableIdentifier identifier, Schema schema, PartitionSpec spec, + String location, Map properties, boolean orCreate) { + throw new UnsupportedOperationException( + "Replacing tables not supported through legacy Hive catalog. Table: " + identifier); + } +} diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java index 3d55a8dd0b..5344dcab66 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -21,14 +21,21 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -90,7 +97,32 @@ protected void doRefresh() { setShouldRefresh(false); } - DirectoryInfo getDirectoryInfo() { + /** + * Returns an {@link Iterable} of {@link Iterable}s of {@link DataFile}s which belong to the current table and + * match the partition predicates from the given expression. + * + * Each element in the outer {@link Iterable} maps to an {@link Iterable} of {@link DataFile}s originating from the + * same directory + */ + Iterable> getFilesByFilter(Expression expression) { + Iterable matchingDirectories; + if (current().spec().fields().isEmpty()) { + matchingDirectories = ImmutableList.of(getDirectoryInfo()); + } else { + matchingDirectories = getDirectoryInfosByFilter(expression); + } + + Iterable> filesPerDirectory = Iterables.transform(matchingDirectories, directory -> { + return Iterables.transform(FileSystemUtils.listFiles(directory.location(), conf), + file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format())); + }); + + // Note that we return an Iterable of Iterables here so that the TableScan can process iterables of individual + // directories in parallel hence resulting in a parallel file listing + return filesPerDirectory; + } + + private DirectoryInfo getDirectoryInfo() { Preconditions.checkArgument(current().spec().fields().isEmpty(), "getDirectoryInfo only allowed for unpartitioned tables"); try { @@ -107,7 +139,7 @@ DirectoryInfo getDirectoryInfo() { } } - List getDirectoryInfosByFilter(Expression expression) { + private List getDirectoryInfosByFilter(Expression expression) { Preconditions.checkArgument(!current().spec().fields().isEmpty(), "getDirectoryInfosByFilter only allowed for partitioned tables"); try { @@ -146,6 +178,21 @@ List getDirectoryInfosByFilter(Expression expression) { } } + private static DataFile createDataFile(FileStatus fileStatus, PartitionSpec partitionSpec, StructLike partitionData, + FileFormat format) { + DataFiles.Builder builder = DataFiles.builder(partitionSpec) + .withPath(fileStatus.getPath().toString()) + .withFormat(format) + .withFileSizeInBytes(fileStatus.getLen()) + .withMetrics(new Metrics(10000L, null, null, null, null, null)); + + if (partitionSpec.fields().isEmpty()) { + return builder.build(); + } else { + return builder.withPartition(partitionData).build(); + } + } + @Override public void commit(TableMetadata base, TableMetadata metadata) { throw new UnsupportedOperationException("Writes not supported for Hive tables without Iceberg metadata"); diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java index 4a89106baf..37f3821225 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java @@ -19,24 +19,17 @@ package org.apache.iceberg.hive.legacy; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import java.util.Collection; -import org.apache.hadoop.fs.FileStatus; import org.apache.iceberg.BaseFileScanTask; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.DataTableScan; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableScan; @@ -82,20 +75,10 @@ public CloseableIterable planFiles() { // predicates for the partition columns are supported by Hive's listPartitionsByFilter ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter(), isCaseSensitive()); - Iterable matchingDirectories; - if (hiveOps.current().spec().fields().isEmpty()) { - matchingDirectories = ImmutableList.of(hiveOps.getDirectoryInfo()); - } else { - matchingDirectories = hiveOps.getDirectoryInfosByFilter(filter()); - } + Iterable> tasks = Iterables.transform(hiveOps.getFilesByFilter(filter()), fileIterable -> + Iterables.transform(fileIterable, file -> new BaseFileScanTask(file, schemaString, specString, residuals))); - Iterable> readers = Iterables.transform(matchingDirectories, directory -> { - return Iterables.transform(FileSystemUtils.listFiles(directory.location()), - file -> new BaseFileScanTask(createDataFile(file, spec, directory.partitionData(), directory.format()), - schemaString, specString, residuals)); - }); - - return new ParallelIterable<>(readers, ThreadPools.getWorkerPool()); + return new ParallelIterable<>(tasks, ThreadPools.getWorkerPool()); } @Override @@ -103,19 +86,4 @@ public CloseableIterable planFiles(TableOperations ops, Snapshot s Expression rowFilter, boolean caseSensitive, boolean colStats) { throw new IllegalStateException("Control flow should never reach here"); } - - private static DataFile createDataFile(FileStatus fileStatus, PartitionSpec partitionSpec, StructLike partitionData, - FileFormat format) { - DataFiles.Builder builder = DataFiles.builder(partitionSpec) - .withPath(fileStatus.getPath().toString()) - .withFormat(format) - .withFileSizeInBytes(fileStatus.getLen()) - .withMetrics(new Metrics(10000L, null, null, null, null, null)); - - if (partitionSpec.fields().isEmpty()) { - return builder.build(); - } else { - return builder.withPartition(partitionData).build(); - } - } } diff --git a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java index 8fa1378972..277ab75f1a 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java +++ b/hive/src/test/java/org/apache/iceberg/hive/legacy/TestLegacyHiveTableScan.java @@ -66,19 +66,19 @@ public class TestLegacyHiveTableScan extends HiveMetastoreTest { new FieldSchema("intCol", "int", "")); private static final List PARTITION_COLUMNS = ImmutableList.of( new FieldSchema("pcol", "string", "")); - private static HiveCatalog readFallbackCatalog; + private static HiveCatalog legacyCatalog; private static Path dbPath; @BeforeClass public static void beforeClass() throws Exception { - readFallbackCatalog = new HiveCatalogWithLegacyReadFallback(HiveMetastoreTest.hiveConf); + legacyCatalog = new LegacyHiveCatalog(HiveMetastoreTest.hiveConf); dbPath = Paths.get(URI.create(metastoreClient.getDatabase(DB_NAME).getLocationUri())); } @AfterClass public static void afterClass() { - readFallbackCatalog.close(); - TestLegacyHiveTableScan.readFallbackCatalog = null; + legacyCatalog.close(); + TestLegacyHiveTableScan.legacyCatalog = null; } @Test @@ -234,7 +234,7 @@ private Map hiveScan(Table table) { private Map hiveScan(Table table, Expression filter) { Path tableLocation = location(table); - CloseableIterable fileScanTasks = readFallbackCatalog + CloseableIterable fileScanTasks = legacyCatalog .loadTable(TableIdentifier.of(table.getDbName(), table.getTableName())) .newScan().filter(filter).planFiles(); return StreamSupport.stream(fileScanTasks.spliterator(), false).collect(Collectors.toMap( From 162125fc203211ed036e62659d474f919693661b Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Tue, 17 Mar 2020 16:49:33 -0700 Subject: [PATCH 4/7] Adress PR comments [Take 3] --- .../org/apache/iceberg/hive/legacy/HiveExpressions.java | 8 ++++++-- .../apache/iceberg/hive/legacy/LegacyHiveTableScan.java | 2 -- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java index 3eba13e164..aedd393141 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -144,8 +144,12 @@ public Expression predicate(UnboundPredicate pred) { * 2. Removes IS NULL and IS NOT NULL predicates (Replaced with FALSE and TRUE respectively as partition column values * are always non null for Hive) * 3. Expands IN and NOT IN operators into ORs of EQUAL operations and ANDs of NOT EQUAL operations respectively - * 4. Removes any children TRUE and FALSE expressions (Note that the rewritten expression still can be TRUE and FALSE - * at the root and will have to be handled appropriately by the caller) + * 4. Removes any children TRUE and FALSE expressions. The checks to remove these are happening inside + * {@link Expressions#and(Expression, Expression)} and {@link Expressions#or(Expression, Expression)} + * (Note that the rewritten expression still can be TRUE and FALSE at the root and will have to be handled + * appropriately by the caller) + * + * For examples take a look at the tests in {@code TestHiveExpressions} */ private static class RewriteUnsupportedOperators extends ExpressionVisitors.ExpressionVisitor { diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java index 37f3821225..7e375caecb 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableScan.java @@ -71,8 +71,6 @@ public CloseableIterable planFiles() { PartitionSpec spec = hiveOps.current().spec(); String schemaString = SchemaParser.toJson(spec.schema()); String specString = PartitionSpecParser.toJson(spec); - // TODO: Consider returning the whole filter() as residual since we many not be able to guarantee that all - // predicates for the partition columns are supported by Hive's listPartitionsByFilter ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter(), isCaseSensitive()); Iterable> tasks = Iterables.transform(hiveOps.getFilesByFilter(filter()), fileIterable -> From 0f0c4d9f9e633fe381dbf1a69230a2647e2b1483 Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Tue, 17 Mar 2020 20:10:52 -0700 Subject: [PATCH 5/7] Address PR comments --- .../iceberg/hive/legacy/HiveExpressions.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java index aedd393141..609ff60d29 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -53,11 +53,7 @@ static Expression simplifyPartitionFilter(Expression expr, Set partition if (partitionPredicatesOnly == null) { return Expressions.alwaysTrue(); } else { - Expression rewritten = ExpressionVisitors.visit(partitionPredicatesOnly, new RewriteUnsupportedOperators()); - // During rewrite of IN, NOTIN, NULL, and NOT NULL expressions we introduce additional NOT, TRUE, and FALSE - // expressions; so we call RewriteUnsupportedOperators again to remove them - rewritten = ExpressionVisitors.visit(rewritten, new RewriteUnsupportedOperators()); - return rewritten; + return ExpressionVisitors.visit(partitionPredicatesOnly, new RewriteUnsupportedOperators()); } } catch (Exception e) { throw new RuntimeException("Error while processing expression: " + expr, e); @@ -139,11 +135,20 @@ public Expression predicate(UnboundPredicate pred) { /** * Rewrites the {@link Expression} so that it fits the restrictions of the expression that can be passed - * to the Hive metastore. It performs the following changes: - * 1. Rewrites NOT operators by inverting binary operators, negating unary literals and De Morgan's laws + * to the Hive metastore. + * + * This visitor assumes that all predicates are on partition columns. Predicates on non-partition columns should be + * removed using {@link RemoveNonPartitionPredicates} before calling this visitor. It performs the following changes: + * 1. Rewrites NOT operators by inverting binary operators, negating unary literals and using De Morgan's laws + * e.g. NOT(value > 0 AND TRUE) => value <= 0 OR FALSE + * NOT(value < 0 OR value > 10) => value >= 0 AND value <= 10 * 2. Removes IS NULL and IS NOT NULL predicates (Replaced with FALSE and TRUE respectively as partition column values * are always non null for Hive) + * e.g. partitionColumn IS NULL => FALSE + * partitionColumn IS NOT NULL => TRUE * 3. Expands IN and NOT IN operators into ORs of EQUAL operations and ANDs of NOT EQUAL operations respectively + * e.g. value IN (1, 2, 3) => value = 1 OR value = 2 OR value = 3 + * value NOT IN (1, 2, 3) => value != 1 AND value != 2 AND value != 3 * 4. Removes any children TRUE and FALSE expressions. The checks to remove these are happening inside * {@link Expressions#and(Expression, Expression)} and {@link Expressions#or(Expression, Expression)} * (Note that the rewritten expression still can be TRUE and FALSE at the root and will have to be handled @@ -186,6 +191,14 @@ Expression in(UnboundTerm term, List> literals) { return in; } + Expression notIn(UnboundTerm term, List> literals) { + Expression notIn = alwaysTrue(); + for (Literal literal : literals) { + notIn = Expressions.and(notIn, Expressions.notEqual(term, literal.value())); + } + return notIn; + } + @Override public Expression predicate(BoundPredicate pred) { throw new IllegalStateException("Bound predicate not expected: " + pred.getClass().getName()); @@ -208,7 +221,7 @@ public Expression predicate(UnboundPredicate pred) { case IN: return in(pred.term(), pred.literals()); case NOT_IN: - return Expressions.not(in(pred.term(), pred.literals())); + return notIn(pred.term(), pred.literals()); case STARTS_WITH: throw new UnsupportedOperationException("STARTS_WITH predicate not supported in partition filter expression"); default: From ce52e3e1c7002908b74d02c94bc0428cc52eef2e Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Tue, 17 Mar 2020 20:39:10 -0700 Subject: [PATCH 6/7] Address PR comments --- .../java/org/apache/iceberg/hive/legacy/HiveExpressions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java index 609ff60d29..c1ad52d280 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -223,7 +223,8 @@ public Expression predicate(UnboundPredicate pred) { case NOT_IN: return notIn(pred.term(), pred.literals()); case STARTS_WITH: - throw new UnsupportedOperationException("STARTS_WITH predicate not supported in partition filter expression"); + throw new UnsupportedOperationException("STARTS_WITH predicate not supported in partition filter " + + "expression. Please use a combination of greater than AND less than predicates instead."); default: throw new IllegalStateException("Unexpected predicate: " + pred.op()); } From c2a3f4498664d2ce1172bf2c8eadcbfa70080d59 Mon Sep 17 00:00:00 2001 From: Shardul Mahadik Date: Wed, 18 Mar 2020 12:53:42 -0700 Subject: [PATCH 7/7] Address PR comments --- .../iceberg/hive/legacy/HiveExpressions.java | 37 ++++++------------- .../legacy/LegacyHiveTableOperations.java | 12 +++--- 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java index c1ad52d280..7d545f71b2 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/HiveExpressions.java @@ -50,11 +50,7 @@ static Expression simplifyPartitionFilter(Expression expr, Set partition try { Expression partitionPredicatesOnly = ExpressionVisitors.visit(expr, new RemoveNonPartitionPredicates(partitionColumnNames)); - if (partitionPredicatesOnly == null) { - return Expressions.alwaysTrue(); - } else { - return ExpressionVisitors.visit(partitionPredicatesOnly, new RewriteUnsupportedOperators()); - } + return ExpressionVisitors.visit(partitionPredicatesOnly, new RewriteUnsupportedOperators()); } catch (Exception e) { throw new RuntimeException("Error while processing expression: " + expr, e); } @@ -62,8 +58,11 @@ static Expression simplifyPartitionFilter(Expression expr, Set partition /** * Converts an {@link Expression} into a filter string which can be passed to the Hive metastore - * @param expr The {@link Expression} to be converted into a filter string. This expression must fit the restrictions - * on Hive metastore partition filters. For more details, see {@link RewriteUnsupportedOperators} + * + * It is expected that caller handles TRUE and FALSE expressions before calling this method. The given + * {@link Expressions} must also be passed through {@link #simplifyPartitionFilter(Expression, Set)} first to + * remove any unsupported predicates. + * @param expr The {@link Expression} to be converted into a filter string * @return a filter string equivalent to the given {@link Expression} which can be passed to the Hive metastore */ static String toPartitionFilterString(Expression expr) { @@ -94,32 +93,17 @@ public Expression alwaysFalse() { @Override public Expression not(Expression result) { - return (result == null) ? null : Expressions.not(result); + return Expressions.not(result); } @Override public Expression and(Expression leftResult, Expression rightResult) { - if (leftResult == null && rightResult == null) { - return null; - } else if (leftResult == null) { - return rightResult; - } else if (rightResult == null) { - return leftResult; - } else { - return Expressions.and(leftResult, rightResult); - } + return Expressions.and(leftResult, rightResult); } @Override public Expression or(Expression leftResult, Expression rightResult) { - if (leftResult == null && rightResult == null) { - return null; - } else if (leftResult == null || rightResult == null) { - throw new IllegalStateException( - "A filter on a partition column was ORed with a filter on a non-partition column which is not supported"); - } else { - return Expressions.or(leftResult, rightResult); - } + return Expressions.or(leftResult, rightResult); } @Override @@ -129,7 +113,8 @@ public Expression predicate(BoundPredicate pred) { @Override public Expression predicate(UnboundPredicate pred) { - return (partitionColumnNamesLowerCase.contains(pred.ref().name().toLowerCase())) ? pred : null; + return (partitionColumnNamesLowerCase.contains(pred.ref().name().toLowerCase())) ? pred + : Expressions.alwaysTrue(); } } diff --git a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java index 5344dcab66..52b01b698c 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java +++ b/hive/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableOperations.java @@ -151,15 +151,15 @@ private List getDirectoryInfosByFilter(Expression expression) { .collect(Collectors.toSet()); Expression simplified = HiveExpressions.simplifyPartitionFilter(expression, partitionColumnNames); LOG.info("Simplified expression for {}.{} to {}", databaseName, tableName, simplified); + final List partitions; - // If simplifyPartitionFilter returns TRUE, there are no filters on partition columns or the filter expression is - // going to match all partitions - if (simplified.equals(Expressions.alwaysTrue())) { - partitions = metaClients.run(client -> client.listPartitionsByFilter( - databaseName, tableName, null, (short) -1)); - } else if (simplified.equals(Expressions.alwaysFalse())) { + if (simplified.equals(Expressions.alwaysFalse())) { // If simplifyPartitionFilter returns FALSE, no partitions are going to match the filter expression partitions = ImmutableList.of(); + } else if (simplified.equals(Expressions.alwaysTrue())) { + // If simplifyPartitionFilter returns TRUE, all partitions are going to match the filter expression + partitions = metaClients.run(client -> client.listPartitionsByFilter( + databaseName, tableName, null, (short) -1)); } else { String partitionFilterString = HiveExpressions.toPartitionFilterString(simplified); LOG.info("Listing partitions for {}.{} with filter string: {}", databaseName, tableName, partitionFilterString);