Skip to content

Commit

Permalink
[HUDI-7458] Fix bug with functional index creation (#10792)
Browse files Browse the repository at this point in the history
There are a couple of issues in how functional indexes are managed.
1. HoodieSparkFunctionalIndexClient::create(...) was failing a register a functional index iff a (different) functional
index was already created. Fixed this check by looking up the index-name in the FunctionalIndexMetadata
2. HoodieTableConfig `TABLE_METADATA_PARTITIONS` and `TABLE_METADATA_PARTITIONS_INFLIGHT` should actually store the Metadata
partition path. While the path is contained in the `MeatadatPartitionType` for most of the indexes, it is not correct for
functional-index. MeatadatPartitionType.FUNCTIONAL_INDEX only stores the prefix (i.e func_index_). The actual partition
path needs to be extracted from the index-name.
3. Because of #2, most of the helper methods that operate on metadata-partitions, should take partition-path (and not partition-type)

This PR addresses the problem listed above. This fix is required to add SQL support for secondary-indexes (the configs
for which will be based on functional-index-config).

Note that there are still issues with some functional-index operations (like drop index / delete partition)
because of the issues listed here. Those will be fixed in a subsequent PR.

Co-authored-by: Vinaykumar Bhat <vinay@onehouse.ai>
  • Loading branch information
bhat-vinay and Vinaykumar Bhat authored Mar 4, 2024
1 parent 5e18e24 commit d23abd3
Show file tree
Hide file tree
Showing 20 changed files with 116 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public String delete(@ShellOption(value = "--backup", help = "Backup the metadat
public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc),
MetadataPartitionType.RECORD_INDEX, backup);
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup);
if (backup) {
return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,10 +987,10 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes, List<String> partitionPaths) {
String instantTime = createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf)
.scheduleIndexing(context, instantTime, partitionTypes);
.scheduleIndexing(context, instantTime, partitionTypes, partitionPaths);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
// Already initialized partitions can be ignored
partitionsToInit.removeIf(metadataPartition -> dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));


// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
List<DirectoryInfo> partitionInfoList;
if (filesPartitionAvailable) {
Expand Down Expand Up @@ -462,7 +461,9 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount);
metadataMetaClient.reloadActiveTimeline();
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true);
String partitionPath = (partitionType == FUNCTIONAL_INDEX) ? dataWriteConfig.getFunctionalIndexConfig().getIndexName() : partitionType.getPartitionPath();

dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, true);
// initialize the metadata reader again so the MDT partition can be read after initialization
initMetadataReader();
long totalInitTime = partitionInitTimer.endTimer();
Expand Down Expand Up @@ -795,7 +796,7 @@ public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartition
for (MetadataPartitionType partitionType : metadataPartitions) {
String partitionPath = partitionType.getPartitionPath();
// first update table config
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false);
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false);
LOG.warn("Deleting Metadata Table partition: " + partitionPath);
dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true);
// delete corresponding pending indexing instant file in the timeline
Expand Down Expand Up @@ -900,6 +901,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood
return;
}
String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
List<String> partitionPaths = new ArrayList<>();
List<MetadataPartitionType> partitionTypes = new ArrayList<>();
indexPartitionInfos.forEach(indexPartitionInfo -> {
String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
Expand All @@ -913,10 +915,11 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood
throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType));
}
partitionTypes.add(partitionType);
partitionPaths.add(relativePartitionPath);
});

// before initialization set these partitions as inflight in table config
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, partitionTypes);
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, partitionPaths);

// initialize partitions
initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime), partitionTypes, Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,9 @@ public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
* @param partitionsToIndex List of {@link MetadataPartitionType} that should be indexed.
* @return HoodieIndexPlan containing metadata partitions and instant upto which they should be indexed.
*/
public abstract Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex);
public abstract Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime,
List<MetadataPartitionType> partitionsToIndex,
List<String> partitionPaths);

/**
* Execute requested index action.
Expand Down Expand Up @@ -989,8 +991,8 @@ public void deleteMetadataIndexIfNecessary() {
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType.getPartitionPath())) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType.getPartitionPath());
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -72,7 +71,6 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
Expand Down Expand Up @@ -220,9 +218,8 @@ private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions)

// delete metadata partition
requestedPartitions.forEach(partition -> {
MetadataPartitionType partitionType = MetadataPartitionType.valueOf(partition.toUpperCase(Locale.ROOT));
if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partitionType)) {
deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partitionType);
if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partition)) {
deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partition);
}
});

Expand Down Expand Up @@ -320,9 +317,7 @@ private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(St

private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
metadataPartitions.forEach(metadataPartition -> {
MetadataPartitionType partitionType = metadataPartition.startsWith(PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX) ? MetadataPartitionType.FUNCTIONAL_INDEX :
MetadataPartitionType.valueOf(metadataPartition.toUpperCase(Locale.ROOT));
metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true);
metaClient.getTableConfig().setMetadataPartitionState(metaClient, metadataPartition, true);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,20 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<
private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1;

private final List<MetadataPartitionType> partitionIndexTypes;

private final List<String> partitionPaths;

private final TransactionManager txnManager;

public ScheduleIndexActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
List<MetadataPartitionType> partitionIndexTypes) {
List<MetadataPartitionType> partitionIndexTypes,
List<String> partitionPaths) {
super(context, config, table, instantTime);
this.partitionIndexTypes = partitionIndexTypes;
this.partitionPaths = partitionPaths;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}

Expand All @@ -84,8 +89,11 @@ public Option<HoodieIndexPlan> execute() {
validateBeforeScheduling();
// make sure that it is idempotent, check with previously pending index operations.
Set<String> indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());

Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedPartitions.addAll(partitionPaths);
requestedPartitions.removeAll(indexesInflightOrCompleted);

if (!requestedPartitions.isEmpty()) {
LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to schedule indexing of only these partitions: %s",
indexesInflightOrCompleted, requestedPartitions));
Expand Down Expand Up @@ -142,8 +150,8 @@ private void validateBeforeScheduling() {
private void abort(HoodieInstant indexInstant) {
// delete metadata partition
partitionIndexTypes.forEach(partitionType -> {
if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType)) {
deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType);
if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath())) {
deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath());
}
});
// delete requested instant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngin
tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES)) {
if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES.getPartitionPath())) {
tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, MetadataPartitionType.FILES.getPartitionPath());
}
return tablePropsToAdd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
}

@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex, List<String> partitionPaths) {
throw new HoodieNotSupportedException("Metadata indexing is not supported for a Flink table yet.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context,
}

@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex, List<String> partitionPaths) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex, partitionPaths).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
}

@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex, List<String> partitionPaths) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex, partitionPaths).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO
// Transition the second init commit for record_index partition to inflight in MDT
deleteMetaFile(metaClient.getFs(), mdtBasePath, mdtInitCommit2, DELTA_COMMIT_EXTENSION);
metaClient.getTableConfig().setMetadataPartitionState(
metaClient, MetadataPartitionType.RECORD_INDEX, false);
metaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), false);
metaClient.getTableConfig().setMetadataPartitionsInflight(
metaClient, MetadataPartitionType.RECORD_INDEX);
timeline = metaClient.getActiveTimeline().reload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,11 @@ public void testLookupIndexWithAndWithoutColumnStats() throws Exception {

// check column_stats partition exists
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS));
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS.getPartitionPath()));
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));

// delete the column_stats partition
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS);
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS.getPartitionPath());

// Now tagLocation for these records, they should be tagged correctly despite column_stats being enabled but not present
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public void testDowngradeSixToFiveShouldDeleteRecordIndexPartition() throws Exce
.withEnableRecordIndex(true).build())
.build();
for (MetadataPartitionType partitionType : MetadataPartitionType.values()) {
metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true);
metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType.getPartitionPath(), true);
}
metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient, MetadataPartitionType.values());
String metadataTableBasePath = Paths.get(basePath, METADATA_TABLE_FOLDER_PATH).toString();
Expand Down
Loading

0 comments on commit d23abd3

Please sign in to comment.