diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index a3e955709ffaf..99521486b7ede 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -124,13 +124,6 @@ public class HoodieMetadataWriteUtils { // from the metadata payload schema. public static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME; - // MDT writes are always prepped. Hence, insert and upsert shuffle parallelism are not important to be configured. Same for delete - // parallelism as deletes are not used. - // The finalize, cleaner and rollback tasks will operate on each fileGroup so their parallelism should be as large as the total file groups. - // But it's not possible to accurately get the file group count here so keeping these values large enough. This parallelism would - // any ways be limited by the executor counts. - private static final int MDT_DEFAULT_PARALLELISM = 512; - // File groups in each partition are fixed at creation time and we do not want them to be split into multiple files // ever. Hence, we use a very large basefile size in metadata table. The actual size of the HFiles created will // eventually depend on the number of file groups selected for each partition (See estimateFileGroupCount function) @@ -164,7 +157,7 @@ public static HoodieWriteConfig createMetadataWriteConfig( HoodieCleanConfig.Builder cleanConfigBuilder = HoodieCleanConfig.newBuilder() .withAsyncClean(DEFAULT_METADATA_ASYNC_CLEAN) .withAutoClean(false) - .withCleanerParallelism(MDT_DEFAULT_PARALLELISM) + .withCleanerParallelism(writeConfig.getMetadataConfig().getCleanerParallelism()) .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) .withCleanerPolicy(dataTableCleaningPolicy); @@ -233,8 +226,8 @@ public static HoodieWriteConfig createMetadataWriteConfig( .withBloomFilterFpp(writeConfig.getMetadataConfig().getBloomFilterFpp()) .withBloomFilterDynamicMaxEntries(writeConfig.getMetadataConfig().getDynamicBloomFilterMaxNumEntries()) .build()) - .withRollbackParallelism(MDT_DEFAULT_PARALLELISM) - .withFinalizeWriteParallelism(MDT_DEFAULT_PARALLELISM) + .withRollbackParallelism(writeConfig.getMetadataConfig().getRollbackParallelism()) + .withFinalizeWriteParallelism(writeConfig.getMetadataConfig().getFinalizeWritesParallelism()) .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) .withPopulateMetaFields(DEFAULT_METADATA_POPULATE_META_FIELDS) .withWriteStatusClass(FailOnFirstErrorWriteStatus.class) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java index 3b343d3dc12a6..37ee8f78a3e27 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java @@ -39,11 +39,14 @@ public class TestHoodieMetadataWriteUtils { @Test public void testCreateMetadataWriteConfigForCleaner() { + Properties properties = new Properties(); + properties.setProperty(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), "1000"); HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder() .withPath("/tmp/") .withCleanConfig(HoodieCleanConfig.newBuilder() .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(properties).build()) .build(); HoodieWriteConfig metadataWriteConfig1 = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, HoodieFailedWritesCleaningPolicy.EAGER, @@ -52,6 +55,7 @@ public void testCreateMetadataWriteConfigForCleaner() { assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, metadataWriteConfig1.getCleanerPolicy()); // default value already greater than data cleaner commits retained * 1.2 assertEquals(HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED, metadataWriteConfig1.getCleanerCommitsRetained()); + assertEquals(1000, metadataWriteConfig1.getCleanerParallelism()); assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, metadataWriteConfig1.getCleanerPolicy()); assertNotEquals(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS, metadataWriteConfig1.getCleanerPolicy()); @@ -112,4 +116,35 @@ private void validateMetadataWriteConfig(HoodieWriteConfig metadataWriteConfig, assertNull(metadataWriteConfig.getLockProviderClass()); } } + + @Test + public void testParallelismConfigs() { + // default + testMetadataConfig(false, 512, 512, 512); + // overrides + testMetadataConfig(true, 10, 20, 10); + testMetadataConfig(true, 1000, 2000, 1000); + } + + private void testMetadataConfig(boolean setParallelismConfigs, int cleanerParallelism, int rollbackParallelism, int finalizeWriteParallelism) { + Properties properties = new Properties(); + if (setParallelismConfigs) { + properties.setProperty(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), Integer.toString(cleanerParallelism)); + properties.setProperty(HoodieMetadataConfig.ROLLBACK_PARALLELISM.key(), Integer.toString(rollbackParallelism)); + properties.setProperty(HoodieMetadataConfig.FINALIZE_WRITES_PARALLELISM.key(), Integer.toString(finalizeWriteParallelism)); + } + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath("/tmp") + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .retainCommits(5).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withProperties(properties).build()) + .build(); + HoodieWriteConfig metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, + HoodieTableVersion.EIGHT); + + assertEquals(cleanerParallelism, metadataWriteConfig.getCleanerParallelism()); + assertEquals(rollbackParallelism, metadataWriteConfig.getRollbackParallelism()); + assertEquals(finalizeWriteParallelism, metadataWriteConfig.getFinalizeWriteParallelism()); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index ec688f7768d68..acad0cefd47f9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -604,6 +604,27 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "honor the set value for number of tasks. If not, number of write status's from data " + "table writes will be used for metadata table record preparation"); + public static final ConfigProperty CLEANER_PARALLELISM = ConfigProperty + .key(METADATA_PREFIX + ".cleaner.parallelism") + .defaultValue(512) + .markAdvanced() + .sinceVersion("0.14.2") + .withDocumentation("Cleaner parallelism to use for metadata table."); + + public static final ConfigProperty ROLLBACK_PARALLELISM = ConfigProperty + .key(METADATA_PREFIX + ".rollback.parallelism") + .defaultValue(512) + .markAdvanced() + .sinceVersion("0.14.2") + .withDocumentation("Rollback parallelism to use for metadata table."); + + public static final ConfigProperty FINALIZE_WRITES_PARALLELISM = ConfigProperty + .key(METADATA_PREFIX + ".finalize.writes.parallelism") + .defaultValue(512) + .markAdvanced() + .sinceVersion("0.14.2") + .withDocumentation("Finalize writes parallelism to use for metadata table."); + public long getMaxLogFileSize() { return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP); } @@ -909,6 +930,18 @@ public boolean isDropMetadataIndex(String indexName) { return subIndexNameToDrop.contains(indexName); } + public int getCleanerParallelism() { + return getInt(CLEANER_PARALLELISM); + } + + public int getRollbackParallelism() { + return getInt(ROLLBACK_PARALLELISM); + } + + public int getFinalizeWritesParallelism() { + return getInt(FINALIZE_WRITES_PARALLELISM); + } + public static class Builder { private EngineType engineType = EngineType.SPARK; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java index 89a1d07618283..ff859886e1e44 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java @@ -31,6 +31,7 @@ * Tests {@link HoodieMetadataConfig}. */ class TestHoodieMetadataConfig { + @Test void testGetRecordPreparationParallelism() { // Test default value @@ -140,4 +141,39 @@ void testRecordIndexMaxFileGroupSizeBytes() { // Verify that the value is indeed larger than Integer.MAX_VALUE assertTrue(largeSize > Integer.MAX_VALUE, "Test value should exceed Integer.MAX_VALUE to validate long type"); } + + + @Test + public void testCleanerRollbackParallelism() { + // Test default value + HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build(); + assertEquals(512, config.getCleanerParallelism()); + assertEquals(512, config.getRollbackParallelism()); + assertEquals(512, config.getFinalizeWritesParallelism()); + + // Test custom value + Properties props = new Properties(); + props.put(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), "100"); + props.put(HoodieMetadataConfig.ROLLBACK_PARALLELISM.key(), "100"); + props.put(HoodieMetadataConfig.FINALIZE_WRITES_PARALLELISM.key(), "100"); + HoodieMetadataConfig configWithCustomValue = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + assertEquals(100, configWithCustomValue.getCleanerParallelism()); + assertEquals(100, configWithCustomValue.getRollbackParallelism()); + assertEquals(100, configWithCustomValue.getFinalizeWritesParallelism()); + + // Test zero value + Properties propsZero = new Properties(); + props = new Properties(); + props.put(HoodieMetadataConfig.CLEANER_PARALLELISM.key(), "0"); + props.put(HoodieMetadataConfig.ROLLBACK_PARALLELISM.key(), "0"); + props.put(HoodieMetadataConfig.FINALIZE_WRITES_PARALLELISM.key(), "0"); + configWithCustomValue = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + assertEquals(0, configWithCustomValue.getCleanerParallelism()); + assertEquals(0, configWithCustomValue.getRollbackParallelism()); + assertEquals(0, configWithCustomValue.getFinalizeWritesParallelism()); + } }