Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ public class HoodieMetadataWriteUtils {
// from the metadata payload schema.
public static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME;

// MDT writes are always prepped. Hence, insert and upsert shuffle parallelism are not important to be configured. Same for delete
// parallelism as deletes are not used.
// The finalize, cleaner and rollback tasks will operate on each fileGroup so their parallelism should be as large as the total file groups.
// But it's not possible to accurately get the file group count here so keeping these values large enough. This parallelism would
// any ways be limited by the executor counts.
private static final int MDT_DEFAULT_PARALLELISM = 512;

// File groups in each partition are fixed at creation time and we do not want them to be split into multiple files
// ever. Hence, we use a very large basefile size in metadata table. The actual size of the HFiles created will
// eventually depend on the number of file groups selected for each partition (See estimateFileGroupCount function)
Expand Down Expand Up @@ -164,7 +157,7 @@ public static HoodieWriteConfig createMetadataWriteConfig(
HoodieCleanConfig.Builder cleanConfigBuilder = HoodieCleanConfig.newBuilder()
.withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN)
.withAutoClean(false)
.withCleanerParallelism(MDT_DEFAULT_PARALLELISM)
.withCleanerParallelism(writeConfig.getMetadataConfig().getCleanerParallelism())
.withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
.withCleanerPolicy(dataTableCleaningPolicy);

Expand Down Expand Up @@ -233,8 +226,8 @@ public static HoodieWriteConfig createMetadataWriteConfig(
.withBloomFilterFpp(writeConfig.getMetadataConfig().getBloomFilterFpp())
.withBloomFilterDynamicMaxEntries(writeConfig.getMetadataConfig().getDynamicBloomFilterMaxNumEntries())
.build())
.withRollbackParallelism(MDT_DEFAULT_PARALLELISM)
.withFinalizeWriteParallelism(MDT_DEFAULT_PARALLELISM)
.withRollbackParallelism(writeConfig.getMetadataConfig().getRollbackParallelism())
.withFinalizeWriteParallelism(writeConfig.getMetadataConfig().getFinalizeWritesParallelism())
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS)
.withWriteStatusClass(FailOnFirstErrorWriteStatus.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ public class TestHoodieMetadataWriteUtils {

@Test
public void testCreateMetadataWriteConfigForCleaner() {
Properties properties = new Properties();
properties.setProperty(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), "1000");
HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder()
.withPath("/tmp/")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(5).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(properties).build())
.build();

HoodieWriteConfig metadataWriteConfig1 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, HoodieFailedWritesCleaningPolicy.EAGER,
Expand All @@ -52,6 +55,7 @@ public void testCreateMetadataWriteConfigForCleaner() {
assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, metadataWriteConfig1.getCleanerPolicy());
// default value already greater than data cleaner commits retained * 1.2
assertEquals(HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, metadataWriteConfig1.getCleanerCommitsRetained());
assertEquals(1000, metadataWriteConfig1.getCleanerParallelism());

assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, metadataWriteConfig1.getCleanerPolicy());
assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, metadataWriteConfig1.getCleanerPolicy());
Expand Down Expand Up @@ -112,4 +116,35 @@ private void validateMetadataWriteConfig(HoodieWriteConfig metadataWriteConfig,
assertNull(metadataWriteConfig.getLockProviderClass());
}
}

@Test
public void testParallelismConfigs() {
// default
testMetadataConfig(false, 512, 512, 512);
// overrides
testMetadataConfig(true, 10, 20, 10);
testMetadataConfig(true, 1000, 2000, 1000);
}

private void testMetadataConfig(boolean setParallelismConfigs, int cleanerParallelism, int rollbackParallelism, int finalizeWriteParallelism) {
Properties properties = new Properties();
if (setParallelismConfigs) {
properties.setProperty(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), Integer.toString(cleanerParallelism));
properties.setProperty(HoodieMetadataConfig.ROLLBACK_PARALLELISM.key(), Integer.toString(rollbackParallelism));
properties.setProperty(HoodieMetadataConfig.FINALIZE_WRITES_PARALLELISM.key(), Integer.toString(finalizeWriteParallelism));
}
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(5).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(properties).build())
.build();
HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, HoodieFailedWritesCleaningPolicy.EAGER,
HoodieTableVersion.EIGHT);

assertEquals(cleanerParallelism, metadataWriteConfig.getCleanerParallelism());
assertEquals(rollbackParallelism, metadataWriteConfig.getRollbackParallelism());
assertEquals(finalizeWriteParallelism, metadataWriteConfig.getFinalizeWriteParallelism());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,27 @@ public final class HoodieMetadataConfig extends HoodieConfig {
+ "honor the set value for number of tasks. If not, number of write status's from data "
+ "table writes will be used for metadata table record preparation");

public static final ConfigProperty<Integer> CLEANER_PARALLELISM = ConfigProperty
.key(METADATA_PREFIX + ".cleaner.parallelism")
.defaultValue(512)
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Cleaner parallelism to use for metadata table.");

public static final ConfigProperty<Integer> ROLLBACK_PARALLELISM = ConfigProperty
.key(METADATA_PREFIX + ".rollback.parallelism")
.defaultValue(512)
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Rollback parallelism to use for metadata table.");

public static final ConfigProperty<Integer> FINALIZE_WRITES_PARALLELISM = ConfigProperty
.key(METADATA_PREFIX + ".finalize.writes.parallelism")
.defaultValue(512)
.markAdvanced()
.sinceVersion("0.14.2")
.withDocumentation("Finalize writes parallelism to use for metadata table.");

public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
Expand Down Expand Up @@ -909,6 +930,18 @@ public boolean isDropMetadataIndex(String indexName) {
return subIndexNameToDrop.contains(indexName);
}

public int getCleanerParallelism() {
return getInt(CLEANER_PARALLELISM);
}

public int getRollbackParallelism() {
return getInt(ROLLBACK_PARALLELISM);
}

public int getFinalizeWritesParallelism() {
return getInt(FINALIZE_WRITES_PARALLELISM);
}

public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* Tests {@link HoodieMetadataConfig}.
*/
class TestHoodieMetadataConfig {

@Test
void testGetRecordPreparationParallelism() {
// Test default value
Expand Down Expand Up @@ -140,4 +141,39 @@ void testRecordIndexMaxFileGroupSizeBytes() {
// Verify that the value is indeed larger than Integer.MAX_VALUE
assertTrue(largeSize > Integer.MAX_VALUE, "Test value should exceed Integer.MAX_VALUE to validate long type");
}


@Test
public void testCleanerRollbackParallelism() {
// Test default value
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build();
assertEquals(512, config.getCleanerParallelism());
assertEquals(512, config.getRollbackParallelism());
assertEquals(512, config.getFinalizeWritesParallelism());

// Test custom value
Properties props = new Properties();
props.put(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), "100");
props.put(HoodieMetadataConfig.ROLLBACK_PARALLELISM.key(), "100");
props.put(HoodieMetadataConfig.FINALIZE_WRITES_PARALLELISM.key(), "100");
HoodieMetadataConfig configWithCustomValue = HoodieMetadataConfig.newBuilder()
.fromProperties(props)
.build();
assertEquals(100, configWithCustomValue.getCleanerParallelism());
assertEquals(100, configWithCustomValue.getRollbackParallelism());
assertEquals(100, configWithCustomValue.getFinalizeWritesParallelism());

// Test zero value
Properties propsZero = new Properties();
props = new Properties();
props.put(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), "0");
props.put(HoodieMetadataConfig.ROLLBACK_PARALLELISM.key(), "0");
props.put(HoodieMetadataConfig.FINALIZE_WRITES_PARALLELISM.key(), "0");
configWithCustomValue = HoodieMetadataConfig.newBuilder()
.fromProperties(props)
.build();
assertEquals(0, configWithCustomValue.getCleanerParallelism());
assertEquals(0, configWithCustomValue.getRollbackParallelism());
assertEquals(0, configWithCustomValue.getFinalizeWritesParallelism());
}
}
Loading