From cfc8573e1e4b93af9698d6da7349affe24b5d5eb Mon Sep 17 00:00:00 2001 From: Vinaykumar Bhat Date: Fri, 1 Mar 2024 13:04:17 +0530 Subject: [PATCH] [HUDI-7458] Fix bug with functional index creation There are a couple of issues in how functional indexes are managed. 1. HoodieSparkFunctionalIndexClient::create(...) was failing a register a functional index iff a (different) functional index was already created. Fixed this check by looking up the index-name in the FunctionalIndexMetadata 2. HoodieTableConfig `TABLE_METADATA_PARTITIONS` and `TABLE_METADATA_PARTITIONS_INFLIGHT` should actually store the Metadata partition path. While the path is contained in the `MeatadatPartitionType` for most of the indexes, it is not correct for functional-index. MeatadatPartitionType.FUNCTIONAL_INDEX only stores the prefix (i.e func_index_). The actual partition path needs to be extracted from the index-name. 3. Because of #2, most of the helper methods that operate on metadata-partitions, should take partition-path (and not partition-type) This PR addresses the problem listed above. This fix is required to add SQL support for secondary-indexes (the configs for which will be based on functional-index-config). Note that there are still issues with some functional-index operations (like drop index / delete partition) because of the issues listed here. Those will be fixed in a subsequent PR. --- .../hudi/cli/commands/MetadataCommand.java | 2 +- .../hudi/client/BaseHoodieWriteClient.java | 4 +- .../HoodieBackedTableMetadataWriter.java | 11 ++++-- .../org/apache/hudi/table/HoodieTable.java | 8 ++-- .../action/index/RunIndexActionExecutor.java | 11 ++---- .../index/ScheduleIndexActionExecutor.java | 14 +++++-- .../upgrade/ThreeToFourUpgradeHandler.java | 2 +- .../table/HoodieFlinkCopyOnWriteTable.java | 2 +- .../table/HoodieJavaCopyOnWriteTable.java | 4 +- .../table/HoodieSparkCopyOnWriteTable.java | 4 +- .../functional/TestHoodieBackedMetadata.java | 2 +- .../client/functional/TestHoodieIndex.java | 4 +- .../table/upgrade/TestUpgradeDowngrade.java | 2 +- .../hudi/common/table/HoodieTableConfig.java | 38 +++++++++---------- .../metadata/HoodieTableMetadataUtil.java | 32 ++++++++-------- .../HoodieSparkFunctionalIndexClient.java | 10 +++-- .../functional/TestRecordLevelIndex.scala | 2 +- .../command/index/TestFunctionalIndex.scala | 22 +++++++++-- .../apache/hudi/utilities/HoodieIndexer.java | 4 +- .../hudi/utilities/TestHoodieIndexer.java | 26 ++++++------- 20 files changed, 116 insertions(+), 88 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index d106d8375e7a..7cebf43db029 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -151,7 +151,7 @@ public String delete(@ShellOption(value = "--backup", help = "Backup the metadat public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception { HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient(); String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc), - MetadataPartitionType.RECORD_INDEX, backup); + MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup); if (backup) { return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath; } else { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 9b69d819e712..33aad54ee5af 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -987,10 +987,10 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option scheduleIndexing(List partitionTypes) { + public Option scheduleIndexing(List partitionTypes, List partitionPaths) { String instantTime = createNewInstantTime(); Option indexPlan = createTable(config, hadoopConf) - .scheduleIndexing(context, instantTime, partitionTypes); + .scheduleIndexing(context, instantTime, partitionTypes, partitionPaths); return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 8d40fc240952..446dbbba9a51 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -392,7 +392,6 @@ private boolean initializeFromFilesystem(String initializationTime, List dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition))); - // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT List partitionInfoList; if (filesPartitionAvailable) { @@ -462,7 +461,9 @@ private boolean initializeFromFilesystem(String initializationTime, List records = fileGroupCountAndRecordsPair.getValue(); bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); metadataMetaClient.reloadActiveTimeline(); - dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true); + String partitionPath = (partitionType == FUNCTIONAL_INDEX) ? dataWriteConfig.getFunctionalIndexConfig().getIndexName() : partitionType.getPartitionPath(); + + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, true); // initialize the metadata reader again so the MDT partition can be read after initialization initMetadataReader(); long totalInitTime = partitionInitTimer.endTimer(); @@ -795,7 +796,7 @@ public void dropMetadataPartitions(List metadataPartition for (MetadataPartitionType partitionType : metadataPartitions) { String partitionPath = partitionType.getPartitionPath(); // first update table config - dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false); LOG.warn("Deleting Metadata Table partition: " + partitionPath); dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true); // delete corresponding pending indexing instant file in the timeline @@ -900,6 +901,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List partitionPaths = new ArrayList<>(); List partitionTypes = new ArrayList<>(); indexPartitionInfos.forEach(indexPartitionInfo -> { String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath(); @@ -913,10 +915,11 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex); + public abstract Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, + List partitionsToIndex, + List partitionPaths); /** * Execute requested index action. @@ -984,8 +986,8 @@ public void deleteMetadataIndexIfNecessary() { if (shouldDeleteMetadataPartition(partitionType)) { try { LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name()); - if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) { - deleteMetadataPartition(metaClient.getBasePath(), context, partitionType); + if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType.getPartitionPath())) { + deleteMetadataPartition(metaClient.getBasePath(), context, partitionType.getPartitionPath()); } clearMetadataTablePartitionsConfig(Option.of(partitionType), false); } catch (HoodieMetadataException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index fc0c320b4406..94c4296e470e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -51,7 +51,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,7 +71,6 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; -import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions; @@ -220,9 +218,8 @@ private void abort(HoodieInstant indexInstant, Set requestedPartitions) // delete metadata partition requestedPartitions.forEach(partition -> { - MetadataPartitionType partitionType = MetadataPartitionType.valueOf(partition.toUpperCase(Locale.ROOT)); - if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partitionType)) { - deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partitionType); + if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partition)) { + deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partition); } }); @@ -320,9 +317,7 @@ private static List getCompletedArchivedAndActiveInstantsAfter(St private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set metadataPartitions) { metadataPartitions.forEach(metadataPartition -> { - MetadataPartitionType partitionType = metadataPartition.startsWith(PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX) ? MetadataPartitionType.FUNCTIONAL_INDEX : - MetadataPartitionType.valueOf(metadataPartition.toUpperCase(Locale.ROOT)); - metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true); + metaClient.getTableConfig().setMetadataPartitionState(metaClient, metadataPartition, true); }); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java index 7b27d7ef6e1c..da85fc4d6340 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java @@ -67,15 +67,20 @@ public class ScheduleIndexActionExecutor extends BaseActionExecutor< private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1; private final List partitionIndexTypes; + + private final List partitionPaths; + private final TransactionManager txnManager; public ScheduleIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, - List partitionIndexTypes) { + List partitionIndexTypes, + List partitionPaths) { super(context, config, table, instantTime); this.partitionIndexTypes = partitionIndexTypes; + this.partitionPaths = partitionPaths; this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); } @@ -84,8 +89,11 @@ public Option execute() { validateBeforeScheduling(); // make sure that it is idempotent, check with previously pending index operations. Set indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig()); + Set requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()); + requestedPartitions.addAll(partitionPaths); requestedPartitions.removeAll(indexesInflightOrCompleted); + if (!requestedPartitions.isEmpty()) { LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to schedule indexing of only these partitions: %s", indexesInflightOrCompleted, requestedPartitions)); @@ -142,8 +150,8 @@ private void validateBeforeScheduling() { private void abort(HoodieInstant indexInstant) { // delete metadata partition partitionIndexTypes.forEach(partitionType -> { - if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType)) { - deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType); + if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath())) { + deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath()); } }); // delete requested instant diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java index c7cb544aec94..edc2d19cf4bc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java @@ -49,7 +49,7 @@ public Map upgrade(HoodieWriteConfig config, HoodieEngin tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps()))); // if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED // schema for the files partition is same between the two versions - if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES)) { + if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES.getPartitionPath())) { tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, MetadataPartitionType.FILES.getPartitionPath()); } return tablePropsToAdd; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 0f73b0bce05d..0167294deaf7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -378,7 +378,7 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb } @Override - public Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex) { + public Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex, List partitionPaths) { throw new HoodieNotSupportedException("Metadata indexing is not supported for a Flink table yet."); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 4c080f2f6635..c387f609e020 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -242,8 +242,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, } @Override - public Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex) { - return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute(); + public Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex, List partitionPaths) { + return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex, partitionPaths).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index e9d21350c212..cb82e6e2e385 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -299,8 +299,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb } @Override - public Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex) { - return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute(); + public Option scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List partitionsToIndex, List partitionPaths) { + return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex, partitionPaths).execute(); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 32264bbf35fa..db10d5c88c3a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1098,7 +1098,7 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO // Transition the second init commit for record_index partition to inflight in MDT deleteMetaFile(metaClient.getFs(), mdtBasePath, mdtInitCommit2, DELTA_COMMIT_EXTENSION); metaClient.getTableConfig().setMetadataPartitionState( - metaClient, MetadataPartitionType.RECORD_INDEX, false); + metaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), false); metaClient.getTableConfig().setMetadataPartitionsInflight( metaClient, MetadataPartitionType.RECORD_INDEX); timeline = metaClient.getActiveTimeline().reload(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 709572325f8c..557905ae85a8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -329,11 +329,11 @@ public void testLookupIndexWithAndWithoutColumnStats() throws Exception { // check column_stats partition exists metaClient = HoodieTableMetaClient.reload(metaClient); - assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS)); + assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS.getPartitionPath())); assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())); // delete the column_stats partition - deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS); + deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS.getPartitionPath()); // Now tagLocation for these records, they should be tagged correctly despite column_stats being enabled but not present hoodieTable = HoodieSparkTable.create(config, context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index ca26cad75d10..0fa28f2ef422 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -540,7 +540,7 @@ public void testDowngradeSixToFiveShouldDeleteRecordIndexPartition() throws Exce .withEnableRecordIndex(true).build()) .build(); for (MetadataPartitionType partitionType : MetadataPartitionType.values()) { - metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true); + metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType.getPartitionPath(), true); } metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient, MetadataPartitionType.values()); String metadataTableBasePath = Paths.get(basePath, METADATA_TABLE_FOLDER_PATH).toString(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f70f9456a868..86311778353a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -747,52 +747,52 @@ public boolean isMetadataPartitionAvailable(MetadataPartitionType partition) { /** * Enables or disables the specified metadata table partition. * - * @param partitionType The partition + * @param partitionPath The partition * @param enabled If true, the partition is enabled, else disabled */ - public void setMetadataPartitionState(HoodieTableMetaClient metaClient, MetadataPartitionType partitionType, boolean enabled) { - ValidationUtils.checkArgument(!partitionType.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), - "Metadata Table partition path cannot contain a comma: " + partitionType.getPartitionPath()); + public void setMetadataPartitionState(HoodieTableMetaClient metaClient, String partitionPath, boolean enabled) { + ValidationUtils.checkArgument(!partitionPath.contains(CONFIG_VALUES_DELIMITER), + "Metadata Table partition path cannot contain a comma: " + partitionPath); Set partitions = getMetadataPartitions(); Set partitionsInflight = getMetadataPartitionsInflight(); if (enabled) { - partitions.add(partitionType.getPartitionPath()); - partitionsInflight.remove(partitionType.getPartitionPath()); - } else if (partitionType.equals(MetadataPartitionType.FILES)) { + partitions.add(partitionPath); + partitionsInflight.remove(partitionPath); + } else if (partitionPath.equals(MetadataPartitionType.FILES.getPartitionPath())) { // file listing partition is required for all other partitions to work // Disabling file partition will also disable all partitions partitions.clear(); partitionsInflight.clear(); } else { - partitions.remove(partitionType.getPartitionPath()); - partitionsInflight.remove(partitionType.getPartitionPath()); + partitions.remove(partitionPath); + partitionsInflight.remove(partitionPath); } setValue(TABLE_METADATA_PARTITIONS, partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); update(metaClient.getFs(), new Path(metaClient.getMetaPath()), getProps()); - LOG.info(String.format("MDT %s partition %s has been %s", metaClient.getBasePathV2(), partitionType.name(), enabled ? "enabled" : "disabled")); + LOG.info(String.format("MDT %s partition %s has been %s", metaClient.getBasePathV2(), partitionPath, enabled ? "enabled" : "disabled")); } /** * Enables the specified metadata table partition as inflight. * - * @param partitionTypes The list of partitions to enable as inflight. + * @param partitionPaths The list of partitions to enable as inflight. */ - public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient, List partitionTypes) { + public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient, List partitionPaths) { Set partitionsInflight = getMetadataPartitionsInflight(); - partitionTypes.forEach(t -> { - ValidationUtils.checkArgument(!t.getPartitionPath().contains(CONFIG_VALUES_DELIMITER), - "Metadata Table partition path cannot contain a comma: " + t.getPartitionPath()); - partitionsInflight.add(t.getPartitionPath()); + partitionPaths.forEach(partitionPath -> { + ValidationUtils.checkArgument(!partitionPath.contains(CONFIG_VALUES_DELIMITER), + "Metadata Table partition path cannot contain a comma: " + partitionPath); + partitionsInflight.add(partitionPath); }); setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER))); update(metaClient.getFs(), new Path(metaClient.getMetaPath()), getProps()); - LOG.info(String.format("MDT %s partitions %s have been set to inflight", metaClient.getBasePathV2(), partitionTypes)); + LOG.info(String.format("MDT %s partitions %s have been set to inflight", metaClient.getBasePathV2(), partitionPaths)); } public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient, MetadataPartitionType... partitionTypes) { - setMetadataPartitionsInflight(metaClient, Arrays.stream(partitionTypes).collect(Collectors.toList())); + setMetadataPartitionsInflight(metaClient, Arrays.stream(partitionTypes).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList())); } /** @@ -800,7 +800,7 @@ public void setMetadataPartitionsInflight(HoodieTableMetaClient metaClient, Meta * {@link HoodieTableConfig#TABLE_METADATA_PARTITIONS_INFLIGHT}. */ public void clearMetadataPartitions(HoodieTableMetaClient metaClient) { - setMetadataPartitionState(metaClient, MetadataPartitionType.FILES, false); + setMetadataPartitionState(metaClient, MetadataPartitionType.FILES.getPartitionPath(), false); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index e19570443054..e3267d736028 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -328,11 +328,11 @@ public static void deleteMetadataTable(String basePath, HoodieEngineContext cont * * @param basePath - base path of the dataset * @param context - instance of {@link HoodieEngineContext} - * @param partitionType - {@link MetadataPartitionType} of the partition to delete + * @param partitionPath - Partition path of the partition to delete */ - public static void deleteMetadataPartition(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) { + public static void deleteMetadataPartition(String basePath, HoodieEngineContext context, String partitionPath) { HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(context.getHadoopConf().get()).build(); - deleteMetadataTablePartition(dataMetaClient, context, partitionType, false); + deleteMetadataTablePartition(dataMetaClient, context, partitionPath, false); } /** @@ -341,13 +341,13 @@ public static void deleteMetadataPartition(String basePath, HoodieEngineContext * @param basePath base path of the dataset * @param context instance of {@link HoodieEngineContext}. */ - public static boolean metadataPartitionExists(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) { + public static boolean metadataPartitionExists(String basePath, HoodieEngineContext context, String partitionPath) { final String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); FileSystem fs = HadoopFSUtils.getFs(metadataTablePath, context.getHadoopConf().get()); try { - return fs.exists(new Path(metadataTablePath, partitionType.getPartitionPath())); + return fs.exists(new Path(metadataTablePath, partitionPath)); } catch (Exception e) { - throw new HoodieIOException(String.format("Failed to check metadata partition %s exists.", partitionType.getPartitionPath())); + throw new HoodieIOException(String.format("Failed to check metadata partition %s exists.", partitionPath)); } } @@ -1500,41 +1500,41 @@ public static String deleteMetadataTable(HoodieTableMetaClient dataMetaClient, H * @param context instance of {@code HoodieEngineContext}. * @param backup Whether metadata table should be backed up before deletion. If true, the table is backed up to the * directory with name metadata_. - * @param partitionType The partition to delete + * @param partitionPath The partition to delete * @return The backup directory if backup was requested, null otherwise */ public static String deleteMetadataTablePartition(HoodieTableMetaClient dataMetaClient, HoodieEngineContext context, - MetadataPartitionType partitionType, boolean backup) { - if (partitionType.equals(MetadataPartitionType.FILES)) { + String partitionPath, boolean backup) { + if (partitionPath.equals(MetadataPartitionType.FILES.getPartitionPath())) { return deleteMetadataTable(dataMetaClient, context, backup); } - final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionType.getPartitionPath()); + final Path metadataTablePartitionPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()), partitionPath); FileSystem fs = HadoopFSUtils.getFs(metadataTablePartitionPath.toString(), context.getHadoopConf().get()); - dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false); try { if (!fs.exists(metadataTablePartitionPath)) { return null; } } catch (FileNotFoundException e) { // Ignoring exception as metadata table already does not exist - LOG.debug("Metadata table partition " + partitionType + " not found at path " + metadataTablePartitionPath); + LOG.debug("Metadata table partition " + partitionPath + " not found at path " + metadataTablePartitionPath); return null; } catch (Exception e) { - throw new HoodieMetadataException(String.format("Failed to check existence of MDT partition %s at path %s: ", partitionType, metadataTablePartitionPath), e); + throw new HoodieMetadataException(String.format("Failed to check existence of MDT partition %s at path %s: ", partitionPath, metadataTablePartitionPath), e); } if (backup) { final Path metadataPartitionBackupPath = new Path(metadataTablePartitionPath.getParent().getParent(), - String.format(".metadata_%s_%s", partitionType.getPartitionPath(), dataMetaClient.createNewInstantTime(false))); - LOG.info(String.format("Backing up MDT partition %s to %s before deletion", partitionType, metadataPartitionBackupPath)); + String.format(".metadata_%s_%s", partitionPath, dataMetaClient.createNewInstantTime(false))); + LOG.info(String.format("Backing up MDT partition %s to %s before deletion", partitionPath, metadataPartitionBackupPath)); try { if (fs.rename(metadataTablePartitionPath, metadataPartitionBackupPath)) { return metadataPartitionBackupPath.toString(); } } catch (Exception e) { // If rename fails, we will try to delete the table instead - LOG.error(String.format("Failed to backup MDT partition %s using rename", partitionType), e); + LOG.error(String.format("Failed to backup MDT partition %s using rename", partitionPath), e); } } else { LOG.info("Deleting metadata table partition from " + metadataTablePartitionPath); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java index 542b76e8dd16..541a0d272a43 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieSparkFunctionalIndexClient.java @@ -82,7 +82,9 @@ public void create(HoodieTableMetaClient metaClient, String indexName, String in throw new HoodieFunctionalIndexException("Index already exists: " + indexName); } - if (!metaClient.getTableConfig().getIndexDefinitionPath().isPresent() || !metaClient.getFunctionalIndexMetadata().isPresent()) { + if (!metaClient.getTableConfig().getIndexDefinitionPath().isPresent() + || !metaClient.getFunctionalIndexMetadata().isPresent() + || !metaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().containsKey(indexName)) { LOG.info("Index definition is not present. Registering the index first"); register(metaClient, indexName, indexType, columns, options); } @@ -94,7 +96,7 @@ public void create(HoodieTableMetaClient metaClient, String indexName, String in try (SparkRDDWriteClient writeClient = HoodieCLIUtils.createHoodieWriteClient( sparkSession, metaClient.getBasePathV2().toString(), mapAsScalaImmutableMap(buildWriteConfig(metaClient, functionalIndexDefinition)), toScalaOption(Option.empty()))) { // generate index plan - Option indexInstantTime = doSchedule(writeClient, metaClient); + Option indexInstantTime = doSchedule(writeClient, metaClient, indexName); if (indexInstantTime.isPresent()) { // build index writeClient.index(indexInstantTime.get()); @@ -104,13 +106,13 @@ public void create(HoodieTableMetaClient metaClient, String indexName, String in } } - private static Option doSchedule(SparkRDDWriteClient client, HoodieTableMetaClient metaClient) { + private static Option doSchedule(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, String indexName) { List partitionTypes = Collections.singletonList(MetadataPartitionType.FUNCTIONAL_INDEX); checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time."); if (metaClient.getTableConfig().getMetadataPartitions().isEmpty()) { throw new HoodieException("Metadata table is not yet initialized. Initialize FILES partition before any other partition " + Arrays.toString(partitionTypes.toArray())); } - return client.scheduleIndexing(partitionTypes); + return client.scheduleIndexing(partitionTypes, Collections.singletonList(indexName)); } private static boolean indexExists(HoodieTableMetaClient metaClient, String indexName) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala index 56866e7bf40a..b5304cd2e23c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala @@ -367,7 +367,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase { saveMode = SaveMode.Append) hudiOpts += (HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "false") - metaClient.getTableConfig.setMetadataPartitionState(metaClient, MetadataPartitionType.RECORD_INDEX, false) + metaClient.getTableConfig.setMetadataPartitionState(metaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath, false) doWriteAndValidateDataAndRecordIndex(hudiOpts, operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala index a555378713a6..34f79fa45b52 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala @@ -202,8 +202,9 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase { var resolvedLogicalPlan = analyzer.execute(logicalPlan) assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table, databaseName, tableName) - val createIndexSql = s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')" + var createIndexSql = s"create index idx_datestr on $tableName using column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')" logicalPlan = sqlParser.parsePlan(createIndexSql) + resolvedLogicalPlan = analyzer.execute(logicalPlan) assertTableIdentifier(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].table, databaseName, tableName) assertResult("idx_datestr")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) @@ -211,14 +212,29 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase { assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists) spark.sql(createIndexSql) - val metaClient = HoodieTableMetaClient.builder() + var metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) .setConf(spark.sessionState.newHadoopConf()) .build() assertTrue(metaClient.getFunctionalIndexMetadata.isPresent) - val functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get() + var functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get() assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size()) assertEquals("func_index_idx_datestr", functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName) + + // Verify one can create more than one functional index + createIndexSql = s"create index name_lower on $tableName using column_stats(ts) options(func='identity')" + spark.sql(createIndexSql) + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get() + assertEquals(2, functionalIndexMetadata.getIndexDefinitions.size()) + assertEquals("func_index_name_lower", functionalIndexMetadata.getIndexDefinitions.get("func_index_name_lower").getIndexName) + + // Ensure that both the indexes are tracked correctly in metadata partition config + val mdtPartitions = metaClient.getTableConfig.getMetadataPartitions + assert(mdtPartitions.contains("func_index_name_lower") && mdtPartitions.contains("func_index_idx_datestr")) } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 5c626a53ae7e..03b6d934b5f8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -43,6 +43,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Set; @@ -240,7 +241,8 @@ private Option doSchedule(SparkRDDWriteClient clien if (indexExists(partitionTypes)) { return Option.empty(); } - Option indexingInstant = client.scheduleIndexing(partitionTypes); + + Option indexingInstant = client.scheduleIndexing(partitionTypes, Collections.emptyList()); if (!indexingInstant.isPresent()) { LOG.error("Scheduling of index action did not return any instant."); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index e3724aee48a7..4fea9be1942a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -373,7 +373,7 @@ public void testIndexerForExceptionWithNonFilesPartition() { assertFalse(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())); assertFalse(metaClient.getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); // validate metadata partitions actually exist - assertFalse(metadataPartitionExists(basePath(), context(), FILES)); + assertFalse(metadataPartitionExists(basePath(), context(), FILES.getPartitionPath())); // trigger FILES partition and indexing should succeed. indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName, "streamer-config/indexer.properties"); @@ -432,8 +432,8 @@ private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTyp nonExistentPartitions.forEach(entry -> assertFalse(completedPartitions.contains(entry.getPartitionPath()))); // validate metadata partitions actually exist - assertTrue(metadataPartitionExists(basePath(), context(), partitionTypeToIndex)); - alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath(), context(), entry))); + assertTrue(metadataPartitionExists(basePath(), context(), partitionTypeToIndex.getPartitionPath())); + alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath(), context(), entry.getPartitionPath()))); } @Test @@ -455,9 +455,9 @@ public void testIndexerDropPartitionDeletesInstantFromTimeline() { // validate partitions built successfully assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES.getPartitionPath())); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS.getPartitionPath())); // build indexer config which has only column_stats enabled (files is enabled by default) HoodieIndexer.Config config = new HoodieIndexer.Config(); @@ -481,13 +481,13 @@ public void testIndexerDropPartitionDeletesInstantFromTimeline() { assertEquals(0, indexer.start(0)); indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertFalse(indexInstantInTimeline.isPresent()); - assertFalse(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); + assertFalse(metadataPartitionExists(basePath(), context(), COLUMN_STATS.getPartitionPath())); // check other partitions are intact assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES.getPartitionPath())); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS.getPartitionPath())); } @Test @@ -509,7 +509,7 @@ public void testTwoIndexersOneCreateOneDropPartition() { // validate files partition built successfully assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES.getPartitionPath())); // build indexer config which has only bloom_filters enabled HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "streamer-config/indexer-only-bloom.properties", tableName); @@ -517,7 +517,7 @@ public void testTwoIndexersOneCreateOneDropPartition() { HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS.getPartitionPath())); // completed index timeline for later validation Option bloomIndexInstant = metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant(); @@ -540,9 +540,9 @@ public void testTwoIndexersOneCreateOneDropPartition() { // check other partitions are intact assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES.getPartitionPath())); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS.getPartitionPath())); // drop bloom filter partition. timeline files should not be deleted since the index building is complete. dropIndexAndAssert(BLOOM_FILTERS, "streamer-config/indexer-only-bloom.properties", bloomIndexInstant, tableName); @@ -554,7 +554,7 @@ private void dropIndexAndAssert(MetadataPartitionType indexType, String resource assertEquals(0, indexer.start(0)); Option pendingFlights = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertFalse(pendingFlights.isPresent()); - assertFalse(metadataPartitionExists(basePath(), context(), indexType)); + assertFalse(metadataPartitionExists(basePath(), context(), indexType.getPartitionPath())); if (completedIndexInstant.isPresent()) { assertEquals(completedIndexInstant, metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant()); }