Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7458] Fix bug with functional index creation #10792

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public String delete(@ShellOption(value = "--backup", help = "Backup the metadat
public String deleteRecordIndex(@ShellOption(value = "--backup", help = "Backup the record index before delete", defaultValue = "true", arity = 1) final boolean backup) throws Exception {
HoodieTableMetaClient dataMetaClient = HoodieCLI.getTableMetaClient();
String backupPath = HoodieTableMetadataUtil.deleteMetadataTablePartition(dataMetaClient, new HoodieSparkEngineContext(jsc),
MetadataPartitionType.RECORD_INDEX, backup);
MetadataPartitionType.RECORD_INDEX.getPartitionPath(), backup);
if (backup) {
return "Record Index has been deleted from the Metadata Table and backed up to " + backupPath;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,10 +987,10 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes, List<String> partitionPaths) {
String instantTime = createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf)
.scheduleIndexing(context, instantTime, partitionTypes);
.scheduleIndexing(context, instantTime, partitionTypes, partitionPaths);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
// Already initialized partitions can be ignored
partitionsToInit.removeIf(metadataPartition -> dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));


// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
List<DirectoryInfo> partitionInfoList;
if (filesPartitionAvailable) {
Expand Down Expand Up @@ -462,7 +461,9 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue();
bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount);
metadataMetaClient.reloadActiveTimeline();
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true);
String partitionPath = (partitionType == FUNCTIONAL_INDEX) ? dataWriteConfig.getFunctionalIndexConfig().getIndexName() : partitionType.getPartitionPath();

dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, true);
// initialize the metadata reader again so the MDT partition can be read after initialization
initMetadataReader();
long totalInitTime = partitionInitTimer.endTimer();
Expand Down Expand Up @@ -795,7 +796,7 @@ public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartition
for (MetadataPartitionType partitionType : metadataPartitions) {
String partitionPath = partitionType.getPartitionPath();
// first update table config
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, false);
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionPath, false);
LOG.warn("Deleting Metadata Table partition: " + partitionPath);
dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true);
// delete corresponding pending indexing instant file in the timeline
Expand Down Expand Up @@ -900,6 +901,7 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood
return;
}
String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
List<String> partitionPaths = new ArrayList<>();
List<MetadataPartitionType> partitionTypes = new ArrayList<>();
indexPartitionInfos.forEach(indexPartitionInfo -> {
String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
Expand All @@ -913,10 +915,11 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood
throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType));
}
partitionTypes.add(partitionType);
partitionPaths.add(relativePartitionPath);
});

// before initialization set these partitions as inflight in table config
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, partitionTypes);
dataMetaClient.getTableConfig().setMetadataPartitionsInflight(dataMetaClient, partitionPaths);

// initialize partitions
initializeFromFilesystem(HoodieTableMetadataUtil.createAsyncIndexerTimestamp(indexUptoInstantTime), partitionTypes, Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,9 @@ public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
* @param partitionsToIndex List of {@link MetadataPartitionType} that should be indexed.
* @return HoodieIndexPlan containing metadata partitions and instant upto which they should be indexed.
*/
public abstract Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex);
public abstract Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime,
List<MetadataPartitionType> partitionsToIndex,
List<String> partitionPaths);

/**
* Execute requested index action.
Expand Down Expand Up @@ -984,8 +986,8 @@ public void deleteMetadataIndexIfNecessary() {
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in writer: " + partitionType.name());
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType)) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType);
if (metadataPartitionExists(metaClient.getBasePath(), context, partitionType.getPartitionPath())) {
deleteMetadataPartition(metaClient.getBasePath(), context, partitionType.getPartitionPath());
}
clearMetadataTablePartitionsConfig(Option.of(partitionType), false);
} catch (HoodieMetadataException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -72,7 +71,6 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
Expand Down Expand Up @@ -220,9 +218,8 @@ private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions)

// delete metadata partition
requestedPartitions.forEach(partition -> {
MetadataPartitionType partitionType = MetadataPartitionType.valueOf(partition.toUpperCase(Locale.ROOT));
if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partitionType)) {
deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partitionType);
if (metadataPartitionExists(table.getMetaClient().getBasePathV2().toString(), context, partition)) {
deleteMetadataPartition(table.getMetaClient().getBasePathV2().toString(), context, partition);
}
});

Expand Down Expand Up @@ -320,9 +317,7 @@ private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(St

private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
metadataPartitions.forEach(metadataPartition -> {
MetadataPartitionType partitionType = metadataPartition.startsWith(PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX) ? MetadataPartitionType.FUNCTIONAL_INDEX :
MetadataPartitionType.valueOf(metadataPartition.toUpperCase(Locale.ROOT));
metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true);
metaClient.getTableConfig().setMetadataPartitionState(metaClient, metadataPartition, true);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,20 @@ public class ScheduleIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<
private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1;

private final List<MetadataPartitionType> partitionIndexTypes;

private final List<String> partitionPaths;

private final TransactionManager txnManager;

public ScheduleIndexActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
List<MetadataPartitionType> partitionIndexTypes) {
List<MetadataPartitionType> partitionIndexTypes,
List<String> partitionPaths) {
super(context, config, table, instantTime);
this.partitionIndexTypes = partitionIndexTypes;
this.partitionPaths = partitionPaths;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}

Expand All @@ -84,8 +89,11 @@ public Option<HoodieIndexPlan> execute() {
validateBeforeScheduling();
// make sure that it is idempotent, check with previously pending index operations.
Set<String> indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());

Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedPartitions.addAll(partitionPaths);
requestedPartitions.removeAll(indexesInflightOrCompleted);

if (!requestedPartitions.isEmpty()) {
LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to schedule indexing of only these partitions: %s",
indexesInflightOrCompleted, requestedPartitions));
Expand Down Expand Up @@ -142,8 +150,8 @@ private void validateBeforeScheduling() {
private void abort(HoodieInstant indexInstant) {
// delete metadata partition
partitionIndexTypes.forEach(partitionType -> {
if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType)) {
deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType);
if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath())) {
deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType.getPartitionPath());
}
});
// delete requested instant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngin
tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES)) {
if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES.getPartitionPath())) {
tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, MetadataPartitionType.FILES.getPartitionPath());
}
return tablePropsToAdd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
}

@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex, List<String> partitionPaths) {
throw new HoodieNotSupportedException("Metadata indexing is not supported for a Flink table yet.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context,
}

@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex, List<String> partitionPaths) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex, partitionPaths).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
}

@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex, List<String> partitionPaths) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex, partitionPaths).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ private void revertTableToInflightState(HoodieWriteConfig writeConfig) throws IO
// Transition the second init commit for record_index partition to inflight in MDT
deleteMetaFile(metaClient.getFs(), mdtBasePath, mdtInitCommit2, DELTA_COMMIT_EXTENSION);
metaClient.getTableConfig().setMetadataPartitionState(
metaClient, MetadataPartitionType.RECORD_INDEX, false);
metaClient, MetadataPartitionType.RECORD_INDEX.getPartitionPath(), false);
metaClient.getTableConfig().setMetadataPartitionsInflight(
metaClient, MetadataPartitionType.RECORD_INDEX);
timeline = metaClient.getActiveTimeline().reload();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,11 @@ public void testLookupIndexWithAndWithoutColumnStats() throws Exception {

// check column_stats partition exists
metaClient = HoodieTableMetaClient.reload(metaClient);
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS));
assertTrue(metadataPartitionExists(metaClient.getBasePath(), context, COLUMN_STATS.getPartitionPath()));
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath()));

// delete the column_stats partition
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS);
deleteMetadataPartition(metaClient.getBasePath(), context, COLUMN_STATS.getPartitionPath());

// Now tagLocation for these records, they should be tagged correctly despite column_stats being enabled but not present
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public void testDowngradeSixToFiveShouldDeleteRecordIndexPartition() throws Exce
.withEnableRecordIndex(true).build())
.build();
for (MetadataPartitionType partitionType : MetadataPartitionType.values()) {
metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType, true);
metaClient.getTableConfig().setMetadataPartitionState(metaClient, partitionType.getPartitionPath(), true);
}
metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient, MetadataPartitionType.values());
String metadataTableBasePath = Paths.get(basePath, METADATA_TABLE_FOLDER_PATH).toString();
Expand Down
Loading
Loading