Skip to content

Commit

Permalink
HDDS-11605. Directory deletion service should support multiple threads (
Browse files Browse the repository at this point in the history
  • Loading branch information
aryangupta1998 authored Dec 9, 2024
1 parent 055b13c commit 23197e2
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ private OMConfigKeys() {
// resulting 24MB
public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 6000;

public static final String OZONE_THREAD_NUMBER_DIR_DELETION =
"ozone.thread.number.dir.deletion";

public static final int OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT = 10;

public static final String SNAPSHOT_SST_DELETING_LIMIT_PER_TASK =
"ozone.snapshot.filtering.limit.per.task";
public static final int SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
when(ozoneManager.getOmSnapshotManager()).thenAnswer(i -> omSnapshotManager);
DirectoryDeletingService service = Mockito.spy(new DirectoryDeletingService(1000, TimeUnit.MILLISECONDS, 1000,
ozoneManager,
cluster.getConf()));
cluster.getConf(), 1));
service.shutdown();
final int initialSnapshotCount =
(int) cluster.getOzoneManager().getMetadataManager().countRowsInTable(snapshotInfoTable);
Expand Down Expand Up @@ -563,7 +563,7 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution()
}
return i.callRealMethod();
}).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(), anyLong(),
anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), Mockito.any(), any());
anyLong(), anyList(), anyList(), eq(null), anyLong(), anyInt(), Mockito.any(), any(), anyLong());

Mockito.doAnswer(i -> {
store.createSnapshot(testVolumeName, testBucketName, snap2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private void addPropertiesNotInXml() {
OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION,
ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
ScmConfigKeys.OZONE_SCM_HA_PREFIX,
S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,22 +480,22 @@ public void testSnapshotWithFSO() throws Exception {

private DirectoryDeletingService getMockedDirectoryDeletingService(AtomicBoolean dirDeletionWaitStarted,
AtomicBoolean dirDeletionStarted)
throws InterruptedException, TimeoutException {
throws InterruptedException, TimeoutException, IOException {
OzoneManager ozoneManager = Mockito.spy(om);
om.getKeyManager().getDirDeletingService().shutdown();
GenericTestUtils.waitFor(() -> om.getKeyManager().getDirDeletingService().getThreadCount() == 0, 1000,
100000);
DirectoryDeletingService directoryDeletingService = Mockito.spy(new DirectoryDeletingService(10000,
TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf()));
TimeUnit.MILLISECONDS, 100000, ozoneManager, cluster.getConf(), 1));
directoryDeletingService.shutdown();
GenericTestUtils.waitFor(() -> directoryDeletingService.getThreadCount() == 0, 1000,
100000);
when(ozoneManager.getMetadataManager()).thenAnswer(i -> {
doAnswer(i -> {
// Wait for SDS to reach DDS wait block before processing any deleted directories.
GenericTestUtils.waitFor(dirDeletionWaitStarted::get, 1000, 100000);
dirDeletionStarted.set(true);
return i.callRealMethod();
});
}).when(directoryDeletingService).getPendingDeletedDirInfo();
return directoryDeletingService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
Expand Down Expand Up @@ -257,8 +259,16 @@ public void start(OzoneConfiguration configuration) {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
dirDeletingService = new DirectoryDeletingService(dirDeleteInterval,
TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
int dirDeletingServiceCorePoolSize =
configuration.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
if (dirDeletingServiceCorePoolSize <= 0) {
dirDeletingServiceCorePoolSize = 1;
}
dirDeletingService =
new DirectoryDeletingService(dirDeleteInterval, TimeUnit.MILLISECONDS,
serviceTimeout, ozoneManager, configuration,
dirDeletingServiceCorePoolSize);
dirDeletingService.start();
}

Expand Down Expand Up @@ -2052,7 +2062,7 @@ public List<OmKeyInfo> getPendingDeletionSubDirs(long volumeId, long bucketId,
parentInfo.getObjectID(), "");
long countEntries = 0;

Table dirTable = metadataManager.getDirectoryTable();
Table<String, OmDirectoryInfo> dirTable = metadataManager.getDirectoryTable();
try (TableIterator<String,
? extends Table.KeyValue<String, OmDirectoryInfo>>
iterator = dirTable.iterator()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private void addToMap(Map<Pair<String, String>, List<String>> map, String object

protected void submitPurgePaths(List<PurgePathRequest> requests,
String snapTableKey,
UUID expectedPreviousSnapshotId) {
UUID expectedPreviousSnapshotId, long rnCnt) {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();

Expand All @@ -305,7 +305,7 @@ protected void submitPurgePaths(List<PurgePathRequest> requests,

// Submit Purge paths request to OM
try {
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, rnCnt);
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.", e);
}
Expand Down Expand Up @@ -400,7 +400,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
List<PurgePathRequest> purgePathRequestList,
String snapTableKey, long startTime,
int remainingBufLimit, KeyManager keyManager,
UUID expectedPreviousSnapshotId) {
UUID expectedPreviousSnapshotId, long rnCnt) {

// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
Expand Down Expand Up @@ -442,7 +442,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
}

if (!purgePathRequestList.isEmpty()) {
submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId);
submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, rnCnt);
}

if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
Expand All @@ -455,7 +455,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
"DeletedDirectoryTable, iteration elapsed: {}ms," +
" totalRunCount: {}",
dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
Time.monotonicNow() - startTime, getRunCount());
Time.monotonicNow() - startTime, rnCnt);
}
return remainNum;
}
Expand Down
Loading

0 comments on commit 23197e2

Please sign in to comment.