entry = iterator.next();
+ String key = entry.getKey();
+ if (key.contains(dirName)) {
+ dirTable.delete(key);
+ break;
+ }
+ }
+ }
+ }
+
+ private void assertDisconnectedTreePartiallyReadable(String volume, String bucket) throws Exception {
+ Path bucketPath = new Path("/" + volume + "/" + bucket);
+ Path dir1 = new Path(bucketPath, "dir1");
+ Path file1 = new Path(dir1, "file1");
+ Path file2 = new Path(dir1, "file2");
+
+ Path dir2 = new Path(bucketPath, "dir1/dir2");
+ Path file3 = new Path(dir2, "file3");
+
+ Path dir3 = new Path(bucketPath, "dir3");
+ Path file4 = new Path(bucketPath, "file4");
+
+ Assertions.assertFalse(fs.exists(dir1));
+ Assertions.assertFalse(fs.exists(dir2));
+ Assertions.assertTrue(fs.exists(dir3));
+ Assertions.assertFalse(fs.exists(file1));
+ Assertions.assertFalse(fs.exists(file2));
+ Assertions.assertFalse(fs.exists(file3));
+ Assertions.assertTrue(fs.exists(file4));
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatisStreaming.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatisStreaming.java
new file mode 100644
index 00000000000..571d4c64908
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/TestS3SDKV1WithRatisStreaming.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.s3.awssdk.v1;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+
+/**
+ * Tests the AWS S3 SDK basic operations with OM Ratis enabled and Streaming Write Pipeline.
+ */
+@Timeout(300)
+public class TestS3SDKV1WithRatisStreaming extends AbstractS3SDKV1Tests {
+
+ @BeforeAll
+ public static void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
+ false);
+ conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+ conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
+ true);
+ conf.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
+ conf.setBoolean(OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED, true);
+ // Ensure that all writes use datastream
+ conf.set(OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD, "0MB");
+ startCluster(conf);
+ }
+
+ @AfterAll
+ public static void shutdown() throws IOException {
+ shutdownCluster();
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 7532cf8b324..ccda21efc93 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -153,6 +153,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DIRECTORY_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
@@ -257,8 +259,16 @@ public void start(OzoneConfiguration configuration) {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- dirDeletingService = new DirectoryDeletingService(dirDeleteInterval,
- TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
+ int dirDeletingServiceCorePoolSize =
+ configuration.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
+ OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
+ if (dirDeletingServiceCorePoolSize <= 0) {
+ dirDeletingServiceCorePoolSize = 1;
+ }
+ dirDeletingService =
+ new DirectoryDeletingService(dirDeleteInterval, TimeUnit.MILLISECONDS,
+ serviceTimeout, ozoneManager, configuration,
+ dirDeletingServiceCorePoolSize);
dirDeletingService.start();
}
@@ -2052,7 +2062,7 @@ public List getPendingDeletionSubDirs(long volumeId, long bucketId,
parentInfo.getObjectID(), "");
long countEntries = 0;
- Table dirTable = metadataManager.getDirectoryTable();
+ Table dirTable = metadataManager.getDirectoryTable();
try (TableIterator>
iterator = dirTable.iterator()) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
index 76c16232e39..0ac6c986606 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java
@@ -280,7 +280,7 @@ private void addToMap(Map, List> map, String object
protected void submitPurgePaths(List requests,
String snapTableKey,
- UUID expectedPreviousSnapshotId) {
+ UUID expectedPreviousSnapshotId, long rnCnt) {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest =
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
@@ -305,7 +305,7 @@ protected void submitPurgePaths(List requests,
// Submit Purge paths request to OM
try {
- OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, runCount.get());
+ OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, rnCnt);
} catch (ServiceException e) {
LOG.error("PurgePaths request failed. Will retry at next run.", e);
}
@@ -400,7 +400,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
List purgePathRequestList,
String snapTableKey, long startTime,
int remainingBufLimit, KeyManager keyManager,
- UUID expectedPreviousSnapshotId) {
+ UUID expectedPreviousSnapshotId, long rnCnt) {
// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
@@ -442,7 +442,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
}
if (!purgePathRequestList.isEmpty()) {
- submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId);
+ submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, rnCnt);
}
if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -455,7 +455,7 @@ public long optimizeDirDeletesAndSubmitRequest(long remainNum,
"DeletedDirectoryTable, iteration elapsed: {}ms," +
" totalRunCount: {}",
dirNum, subdirDelNum, subFileNum, (subDirNum - subdirDelNum),
- Time.monotonicNow() - startTime, getRunCount());
+ Time.monotonicNow() - startTime, rnCnt);
}
return remainNum;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 09f4a8f8a3d..a8270f92f2b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -49,6 +50,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -74,10 +76,10 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
public static final Logger LOG =
LoggerFactory.getLogger(DirectoryDeletingService.class);
- // Use only a single thread for DirDeletion. Multiple threads would read
- // or write to same tables and can send deletion requests for same key
- // multiple times.
- private static final int DIR_DELETING_CORE_POOL_SIZE = 1;
+ // Using multi thread for DirDeletion. Multiple threads would read
+ // from parent directory info from deleted directory table concurrently
+ // and send deletion requests.
+ private final int dirDeletingCorePoolSize;
private static final int MIN_ERR_LIMIT_PER_TASK = 1000;
// Number of items(dirs/files) to be batched in an iteration.
@@ -86,11 +88,15 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService {
private final AtomicBoolean suspended;
private AtomicBoolean isRunningOnAOS;
+ private final DeletedDirSupplier deletedDirSupplier;
+
+ private AtomicInteger taskCount = new AtomicInteger(0);
+
public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
- OzoneConfiguration configuration) {
+ OzoneConfiguration configuration, int dirDeletingServiceCorePoolSize) {
super(DirectoryDeletingService.class.getSimpleName(), interval, unit,
- DIR_DELETING_CORE_POOL_SIZE, serviceTimeout, ozoneManager, null);
+ dirDeletingServiceCorePoolSize, serviceTimeout, ozoneManager, null);
this.pathLimitPerTask = configuration
.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
@@ -102,6 +108,9 @@ public DirectoryDeletingService(long interval, TimeUnit unit,
this.ratisByteLimit = (int) (limit * 0.9);
this.suspended = new AtomicBoolean(false);
this.isRunningOnAOS = new AtomicBoolean(false);
+ this.dirDeletingCorePoolSize = dirDeletingServiceCorePoolSize;
+ deletedDirSupplier = new DeletedDirSupplier();
+ taskCount.set(0);
}
private boolean shouldRun() {
@@ -116,6 +125,10 @@ public boolean isRunningOnAOS() {
return isRunningOnAOS.get();
}
+ public AtomicInteger getTaskCount() {
+ return taskCount;
+ }
+
/**
* Suspend the service.
*/
@@ -135,10 +148,55 @@ public void resume() {
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+ if (taskCount.get() > 0) {
+ LOG.info("{} Directory deleting task(s) already in progress.",
+ taskCount.get());
+ return queue;
+ }
+ try {
+ deletedDirSupplier.reInitItr();
+ } catch (IOException ex) {
+ LOG.error("Unable to get the iterator.", ex);
+ return queue;
+ }
+ taskCount.set(dirDeletingCorePoolSize);
+ for (int i = 0; i < dirDeletingCorePoolSize; i++) {
+ queue.add(new DirectoryDeletingService.DirDeletingTask(this));
+ }
return queue;
}
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ deletedDirSupplier.closeItr();
+ }
+
+ private final class DeletedDirSupplier {
+ private TableIterator>
+ deleteTableIterator;
+
+ private synchronized Table.KeyValue get()
+ throws IOException {
+ if (deleteTableIterator.hasNext()) {
+ return deleteTableIterator.next();
+ }
+ return null;
+ }
+
+ private synchronized void closeItr() {
+ IOUtils.closeQuietly(deleteTableIterator);
+ deleteTableIterator = null;
+ }
+
+ private synchronized void reInitItr() throws IOException {
+ closeItr();
+ deleteTableIterator =
+ getOzoneManager().getMetadataManager().getDeletedDirTable()
+ .iterator();
+ }
+ }
+
private final class DirDeletingTask implements BackgroundTask {
private final DirectoryDeletingService directoryDeletingService;
@@ -153,89 +211,93 @@ public int getPriority() {
@Override
public BackgroundTaskResult call() {
- if (shouldRun()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Running DirectoryDeletingService");
- }
- isRunningOnAOS.set(true);
- getRunCount().incrementAndGet();
- long dirNum = 0L;
- long subDirNum = 0L;
- long subFileNum = 0L;
- long remainNum = pathLimitPerTask;
- int consumedSize = 0;
- List purgePathRequestList = new ArrayList<>();
- List> allSubDirList
- = new ArrayList<>((int) remainNum);
-
- Table.KeyValue pendingDeletedDirInfo;
-
- try (TableIterator>
- deleteTableIterator = getOzoneManager().getMetadataManager().
- getDeletedDirTable().iterator()) {
+ try {
+ if (shouldRun()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running DirectoryDeletingService");
+ }
+ isRunningOnAOS.set(true);
+ long rnCnt = getRunCount().incrementAndGet();
+ long dirNum = 0L;
+ long subDirNum = 0L;
+ long subFileNum = 0L;
+ long remainNum = pathLimitPerTask;
+ int consumedSize = 0;
+ List purgePathRequestList = new ArrayList<>();
+ List> allSubDirList =
+ new ArrayList<>((int) remainNum);
+
+ Table.KeyValue pendingDeletedDirInfo;
// This is to avoid race condition b/w purge request and snapshot chain updation. For AOS taking the global
// snapshotId since AOS could process multiple buckets in one iteration.
- UUID expectedPreviousSnapshotId =
- ((OmMetadataManagerImpl)getOzoneManager().getMetadataManager()).getSnapshotChainManager()
- .getLatestGlobalSnapshotId();
-
- long startTime = Time.monotonicNow();
- while (remainNum > 0 && deleteTableIterator.hasNext()) {
- pendingDeletedDirInfo = deleteTableIterator.next();
- // Do not reclaim if the directory is still being referenced by
- // the previous snapshot.
- if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
- continue;
- }
+ try {
+ UUID expectedPreviousSnapshotId =
+ ((OmMetadataManagerImpl) getOzoneManager().getMetadataManager()).getSnapshotChainManager()
+ .getLatestGlobalSnapshotId();
- PurgePathRequest request = prepareDeleteDirRequest(
- remainNum, pendingDeletedDirInfo.getValue(),
- pendingDeletedDirInfo.getKey(), allSubDirList,
- getOzoneManager().getKeyManager());
- if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
- request.getSerializedSize())) {
- if (purgePathRequestList.size() != 0) {
- // if message buffer reaches max limit, avoid sending further
- remainNum = 0;
+ long startTime = Time.monotonicNow();
+ while (remainNum > 0) {
+ pendingDeletedDirInfo = getPendingDeletedDirInfo();
+ if (pendingDeletedDirInfo == null) {
break;
}
- // if directory itself is having a lot of keys / files,
- // reduce capacity to minimum level
- remainNum = MIN_ERR_LIMIT_PER_TASK;
- request = prepareDeleteDirRequest(
- remainNum, pendingDeletedDirInfo.getValue(),
+ // Do not reclaim if the directory is still being referenced by
+ // the previous snapshot.
+ if (previousSnapshotHasDir(pendingDeletedDirInfo)) {
+ continue;
+ }
+
+ PurgePathRequest request = prepareDeleteDirRequest(remainNum,
+ pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), allSubDirList,
getOzoneManager().getKeyManager());
+ if (isBufferLimitCrossed(ratisByteLimit, consumedSize,
+ request.getSerializedSize())) {
+ if (purgePathRequestList.size() != 0) {
+ // if message buffer reaches max limit, avoid sending further
+ remainNum = 0;
+ break;
+ }
+ // if directory itself is having a lot of keys / files,
+ // reduce capacity to minimum level
+ remainNum = MIN_ERR_LIMIT_PER_TASK;
+ request = prepareDeleteDirRequest(remainNum,
+ pendingDeletedDirInfo.getValue(),
+ pendingDeletedDirInfo.getKey(), allSubDirList,
+ getOzoneManager().getKeyManager());
+ }
+ consumedSize += request.getSerializedSize();
+ purgePathRequestList.add(request);
+ // reduce remain count for self, sub-files, and sub-directories
+ remainNum = remainNum - 1;
+ remainNum = remainNum - request.getDeletedSubFilesCount();
+ remainNum = remainNum - request.getMarkDeletedSubDirsCount();
+ // Count up the purgeDeletedDir, subDirs and subFiles
+ if (request.getDeletedDir() != null && !request.getDeletedDir()
+ .isEmpty()) {
+ dirNum++;
+ }
+ subDirNum += request.getMarkDeletedSubDirsCount();
+ subFileNum += request.getDeletedSubFilesCount();
}
- consumedSize += request.getSerializedSize();
- purgePathRequestList.add(request);
- // reduce remain count for self, sub-files, and sub-directories
- remainNum = remainNum - 1;
- remainNum = remainNum - request.getDeletedSubFilesCount();
- remainNum = remainNum - request.getMarkDeletedSubDirsCount();
- // Count up the purgeDeletedDir, subDirs and subFiles
- if (request.getDeletedDir() != null
- && !request.getDeletedDir().isEmpty()) {
- dirNum++;
- }
- subDirNum += request.getMarkDeletedSubDirsCount();
- subFileNum += request.getDeletedSubFilesCount();
- }
+ optimizeDirDeletesAndSubmitRequest(remainNum, dirNum, subDirNum,
+ subFileNum, allSubDirList, purgePathRequestList, null,
+ startTime, ratisByteLimit - consumedSize,
+ getOzoneManager().getKeyManager(), expectedPreviousSnapshotId,
+ rnCnt);
- optimizeDirDeletesAndSubmitRequest(
- remainNum, dirNum, subDirNum, subFileNum,
- allSubDirList, purgePathRequestList, null, startTime,
- ratisByteLimit - consumedSize,
- getOzoneManager().getKeyManager(), expectedPreviousSnapshotId);
-
- } catch (IOException e) {
- LOG.error("Error while running delete directories and files " +
- "background task. Will retry at next run.", e);
- }
- isRunningOnAOS.set(false);
- synchronized (directoryDeletingService) {
- this.directoryDeletingService.notify();
+ } catch (IOException e) {
+ LOG.error(
+ "Error while running delete directories and files " + "background task. Will retry at next run.",
+ e);
+ }
+ isRunningOnAOS.set(false);
+ synchronized (directoryDeletingService) {
+ this.directoryDeletingService.notify();
+ }
}
+ } finally {
+ taskCount.getAndDecrement();
}
// place holder by returning empty results of this call back.
return BackgroundTaskResult.EmptyTaskResult.newResult();
@@ -301,4 +363,9 @@ private boolean previousSnapshotHasDir(
}
}
+ public KeyValue getPendingDeletedDirInfo()
+ throws IOException {
+ return deletedDirSupplier.get();
+ }
+
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 04e8efa7b79..681b24b8e42 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -51,6 +51,9 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -202,14 +205,19 @@ public void testDeleteDirectoryFlatDirsHavingNoChilds() throws Exception {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
.setDataSize(0).setRecursive(true).build();
writeClient.deleteKey(delArgs);
+ int pathDelLimit = conf.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
+ OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
+ int numThread = conf.getInt(OZONE_THREAD_NUMBER_DIR_DELETION,
+ OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT);
// check if difference between each run should not cross the directory deletion limit
// and wait till all dir is removed
GenericTestUtils.waitFor(() -> {
delDirCnt[1] = dirDeletingService.getDeletedDirsCount();
- assertTrue(delDirCnt[1] - delDirCnt[0] <= OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT,
+ assertTrue(
+ delDirCnt[1] - delDirCnt[0] <= ((long) pathDelLimit * numThread),
"base: " + delDirCnt[0] + ", new: " + delDirCnt[1]);
- delDirCnt[0] = delDirCnt[1];
+ delDirCnt[0] = delDirCnt[1];
return dirDeletingService.getDeletedDirsCount() >= dirCreatesCount;
}, 500, 300000);
}
diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml
index a8b32c686a0..22cd10085dd 100644
--- a/hadoop-ozone/pom.xml
+++ b/hadoop-ozone/pom.xml
@@ -49,13 +49,6 @@
s3-secret-store
-
-
- apache.snapshots.https
- https://repository.apache.org/content/repositories/snapshots
-
-
-
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 0a928981613..9311fb7fa4b 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -323,7 +323,7 @@ public Response put(
perf.appendStreamMode();
Pair keyWriteResult = ObjectEndpointStreaming
.put(bucket, keyPath, length, replicationConfig, chunkSize,
- customMetadata, digestInputStream, perf);
+ customMetadata, tags, digestInputStream, perf);
eTag = keyWriteResult.getKey();
putLength = keyWriteResult.getValue();
} else {
diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index cb9499aa20d..f5d185fc76b 100644
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -61,12 +61,13 @@ public static Pair put(
OzoneBucket bucket, String keyPath,
long length, ReplicationConfig replicationConfig,
int chunkSize, Map keyMetadata,
+ Map tags,
DigestInputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
try {
return putKeyWithStream(bucket, keyPath,
- length, chunkSize, replicationConfig, keyMetadata, body, perf);
+ length, chunkSize, replicationConfig, keyMetadata, tags, body, perf);
} catch (IOException ex) {
LOG.error("Exception occurred in PutObject", ex);
if (ex instanceof OMException) {
@@ -97,13 +98,14 @@ public static Pair putKeyWithStream(
int bufferSize,
ReplicationConfig replicationConfig,
Map keyMetadata,
+ Map tags,
DigestInputStream body, PerformanceStringBuilder perf)
throws IOException {
long startNanos = Time.monotonicNowNanos();
long writeLen;
String eTag;
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
- length, replicationConfig, keyMetadata)) {
+ length, replicationConfig, keyMetadata, tags)) {
long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairCLI.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairCLI.java
new file mode 100644
index 00000000000..5a217e9f2de
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairCLI.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.repair.om;
+
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Parser for scm.db file.
+ */
+@CommandLine.Command(
+ name = "fso-tree",
+ description = "Identify and repair a disconnected FSO tree by marking unreferenced entries for deletion. " +
+ "OM should be stopped while this tool is run."
+)
+public class FSORepairCLI implements Callable {
+
+ @CommandLine.Option(names = {"--db"},
+ required = true,
+ description = "Path to OM RocksDB")
+ private String dbPath;
+
+ @CommandLine.Option(names = {"-r", "--repair"},
+ defaultValue = "false",
+ description = "Run in repair mode to move unreferenced files and directories to deleted tables.")
+ private boolean repair;
+
+ @CommandLine.Option(names = {"-v", "--volume"},
+ description = "Filter by volume name. Add '/' before the volume name.")
+ private String volume;
+
+ @CommandLine.Option(names = {"-b", "--bucket"},
+ description = "Filter by bucket name")
+ private String bucket;
+
+ @CommandLine.Option(names = {"--verbose"},
+ description = "Verbose output. Show all intermediate steps and deleted keys info.")
+ private boolean verbose;
+
+ @Override
+ public Void call() throws Exception {
+ if (repair) {
+ System.out.println("FSO Repair Tool is running in repair mode");
+ } else {
+ System.out.println("FSO Repair Tool is running in debug mode");
+ }
+ try {
+ FSORepairTool
+ repairTool = new FSORepairTool(dbPath, repair, volume, bucket, verbose);
+ repairTool.run();
+ } catch (Exception ex) {
+ throw new IllegalArgumentException("FSO repair failed: " + ex.getMessage());
+ }
+
+ if (verbose) {
+ System.out.println("FSO repair finished.");
+ }
+
+ return null;
+ }
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java
new file mode 100644
index 00000000000..7e0fb23f5aa
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/FSORepairTool.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.repair.om;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.helpers.WithObjectID;
+import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Stack;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * Base Tool to identify and repair disconnected FSO trees across all buckets.
+ * This tool logs information about reachable, unreachable and unreferenced files and directories in debug mode
+ * and moves these unreferenced files and directories to the deleted tables in repair mode.
+
+ * If deletes are still in progress (the deleted directory table is not empty), the tool
+ * reports that the tree is unreachable, even though pending deletes would fix the issue.
+ * If not, the tool reports them as unreferenced and deletes them in repair mode.
+
+ * Before using the tool, make sure all OMs are stopped, and that all Ratis logs have been flushed to the OM DB.
+ * This can be done using `ozone admin prepare` before running the tool, and `ozone admin
+ * cancelprepare` when done.
+
+ * The tool will run a DFS from each bucket, and save all reachable directories as keys in a new temporary RocksDB
+ * instance called "reachable.db" in the same directory as om.db.
+ * It will then scan the entire file and directory tables for each bucket to see if each object's parent is in the
+ * reachable table of reachable.db. The reachable table will be dropped and recreated for each bucket.
+ * The tool is idempotent. reachable.db will not be deleted automatically when the tool finishes,
+ * in case users want to manually inspect it. It can be safely deleted once the tool finishes.
+ */
+public class FSORepairTool {
+ public static final Logger LOG = LoggerFactory.getLogger(FSORepairTool.class);
+
+ private final String omDBPath;
+ private final DBStore store;
+ private final Table volumeTable;
+ private final Table bucketTable;
+ private final Table directoryTable;
+ private final Table fileTable;
+ private final Table deletedDirectoryTable;
+ private final Table deletedTable;
+ private final Table snapshotInfoTable;
+ private final String volumeFilter;
+ private final String bucketFilter;
+ private static final String REACHABLE_TABLE = "reachable";
+ private DBStore reachableDB;
+ private final ReportStatistics reachableStats;
+ private final ReportStatistics unreachableStats;
+ private final ReportStatistics unreferencedStats;
+ private final boolean repair;
+ private final boolean verbose;
+
+ public FSORepairTool(String dbPath, boolean repair, String volume, String bucket, boolean verbose)
+ throws IOException {
+ this(getStoreFromPath(dbPath), dbPath, repair, volume, bucket, verbose);
+ }
+
+ /**
+ * Allows passing RocksDB instance from a MiniOzoneCluster directly to this class for testing.
+ */
+ public FSORepairTool(DBStore dbStore, String dbPath, boolean repair, String volume, String bucket, boolean verbose)
+ throws IOException {
+ this.reachableStats = new ReportStatistics(0, 0, 0);
+ this.unreachableStats = new ReportStatistics(0, 0, 0);
+ this.unreferencedStats = new ReportStatistics(0, 0, 0);
+
+ this.store = dbStore;
+ this.omDBPath = dbPath;
+ this.repair = repair;
+ this.volumeFilter = volume;
+ this.bucketFilter = bucket;
+ this.verbose = verbose;
+ volumeTable = store.getTable(OmMetadataManagerImpl.VOLUME_TABLE,
+ String.class,
+ OmVolumeArgs.class);
+ bucketTable = store.getTable(OmMetadataManagerImpl.BUCKET_TABLE,
+ String.class,
+ OmBucketInfo.class);
+ directoryTable = store.getTable(OmMetadataManagerImpl.DIRECTORY_TABLE,
+ String.class,
+ OmDirectoryInfo.class);
+ fileTable = store.getTable(OmMetadataManagerImpl.FILE_TABLE,
+ String.class,
+ OmKeyInfo.class);
+ deletedDirectoryTable = store.getTable(OmMetadataManagerImpl.DELETED_DIR_TABLE,
+ String.class,
+ OmKeyInfo.class);
+ deletedTable = store.getTable(OmMetadataManagerImpl.DELETED_TABLE,
+ String.class,
+ RepeatedOmKeyInfo.class);
+ snapshotInfoTable = store.getTable(OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE,
+ String.class,
+ SnapshotInfo.class);
+ }
+
+ protected static DBStore getStoreFromPath(String dbPath) throws IOException {
+ File omDBFile = new File(dbPath);
+ if (!omDBFile.exists() || !omDBFile.isDirectory()) {
+ throw new IOException(String.format("Specified OM DB instance %s does " +
+ "not exist or is not a RocksDB directory.", dbPath));
+ }
+ // Load RocksDB and tables needed.
+ return OmMetadataManagerImpl.loadDB(new OzoneConfiguration(), new File(dbPath).getParentFile(), -1);
+ }
+
+ public FSORepairTool.Report run() throws Exception {
+ try {
+ if (bucketFilter != null && volumeFilter == null) {
+ System.out.println("--bucket flag cannot be used without specifying --volume.");
+ return null;
+ }
+
+ if (volumeFilter != null) {
+ OmVolumeArgs volumeArgs = volumeTable.getIfExist(volumeFilter);
+ if (volumeArgs == null) {
+ System.out.println("Volume '" + volumeFilter + "' does not exist.");
+ return null;
+ }
+ }
+
+ // Iterate all volumes or a specific volume if specified
+ try (TableIterator>
+ volumeIterator = volumeTable.iterator()) {
+ try {
+ openReachableDB();
+ } catch (IOException e) {
+ System.out.println("Failed to open reachable database: " + e.getMessage());
+ throw e;
+ }
+ while (volumeIterator.hasNext()) {
+ Table.KeyValue volumeEntry = volumeIterator.next();
+ String volumeKey = volumeEntry.getKey();
+
+ if (volumeFilter != null && !volumeFilter.equals(volumeKey)) {
+ continue;
+ }
+
+ System.out.println("Processing volume: " + volumeKey);
+
+ if (bucketFilter != null) {
+ OmBucketInfo bucketInfo = bucketTable.getIfExist(volumeKey + "/" + bucketFilter);
+ if (bucketInfo == null) {
+ //Bucket does not exist in the volume
+ System.out.println("Bucket '" + bucketFilter + "' does not exist in volume '" + volumeKey + "'.");
+ return null;
+ }
+
+ if (bucketInfo.getBucketLayout() != BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ System.out.println("Skipping non-FSO bucket " + bucketFilter);
+ continue;
+ }
+
+ processBucket(volumeEntry.getValue(), bucketInfo);
+ } else {
+
+ // Iterate all buckets in the volume.
+ try (TableIterator>
+ bucketIterator = bucketTable.iterator()) {
+ bucketIterator.seek(volumeKey);
+ while (bucketIterator.hasNext()) {
+ Table.KeyValue bucketEntry = bucketIterator.next();
+ String bucketKey = bucketEntry.getKey();
+ OmBucketInfo bucketInfo = bucketEntry.getValue();
+
+ if (bucketInfo.getBucketLayout() != BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+ System.out.println("Skipping non-FSO bucket " + bucketKey);
+ continue;
+ }
+
+ // Stop this loop once we have seen all buckets in the current
+ // volume.
+ if (!bucketKey.startsWith(volumeKey)) {
+ break;
+ }
+
+ processBucket(volumeEntry.getValue(), bucketInfo);
+ }
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ System.out.println("An error occurred while processing" + e.getMessage());
+ throw e;
+ } finally {
+ closeReachableDB();
+ store.close();
+ }
+
+ return buildReportAndLog();
+ }
+
+ private boolean checkIfSnapshotExistsForBucket(String volumeName, String bucketName) throws IOException {
+ if (snapshotInfoTable == null) {
+ return false;
+ }
+
+ try (TableIterator> iterator =
+ snapshotInfoTable.iterator()) {
+ while (iterator.hasNext()) {
+ SnapshotInfo snapshotInfo = iterator.next().getValue();
+ String snapshotPath = (volumeName + "/" + bucketName).replaceFirst("^/", "");
+ if (snapshotInfo.getSnapshotPath().equals(snapshotPath)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private void processBucket(OmVolumeArgs volume, OmBucketInfo bucketInfo) throws IOException {
+ System.out.println("Processing bucket: " + volume.getVolume() + "/" + bucketInfo.getBucketName());
+ if (checkIfSnapshotExistsForBucket(volume.getVolume(), bucketInfo.getBucketName())) {
+ if (!repair) {
+ System.out.println(
+ "Snapshot detected in bucket '" + volume.getVolume() + "/" + bucketInfo.getBucketName() + "'. ");
+ } else {
+ System.out.println(
+ "Skipping repair for bucket '" + volume.getVolume() + "/" + bucketInfo.getBucketName() + "' " +
+ "due to snapshot presence.");
+ return;
+ }
+ }
+ markReachableObjectsInBucket(volume, bucketInfo);
+ handleUnreachableAndUnreferencedObjects(volume, bucketInfo);
+ }
+
+ private Report buildReportAndLog() {
+ Report report = new Report.Builder()
+ .setReachable(reachableStats)
+ .setUnreachable(unreachableStats)
+ .setUnreferenced(unreferencedStats)
+ .build();
+
+ System.out.println("\n" + report);
+ return report;
+ }
+
+ private void markReachableObjectsInBucket(OmVolumeArgs volume, OmBucketInfo bucket) throws IOException {
+ // Only put directories in the stack.
+ // Directory keys should have the form /volumeID/bucketID/parentID/name.
+ Stack dirKeyStack = new Stack<>();
+
+ // Since the tool uses parent directories to check for reachability, add
+ // a reachable entry for the bucket as well.
+ addReachableEntry(volume, bucket, bucket);
+ // Initialize the stack with all immediate child directories of the
+ // bucket, and mark them all as reachable.
+ Collection childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, bucket);
+ dirKeyStack.addAll(childDirs);
+
+ while (!dirKeyStack.isEmpty()) {
+ // Get one directory and process its immediate children.
+ String currentDirKey = dirKeyStack.pop();
+ OmDirectoryInfo currentDir = directoryTable.get(currentDirKey);
+ if (currentDir == null) {
+ System.out.println("Directory key" + currentDirKey + "to be processed was not found in the directory table.");
+ continue;
+ }
+
+ // TODO revisit this for a more memory efficient implementation,
+ // possibly making better use of RocksDB iterators.
+ childDirs = getChildDirectoriesAndMarkAsReachable(volume, bucket, currentDir);
+ dirKeyStack.addAll(childDirs);
+ }
+ }
+
+ private boolean isDirectoryInDeletedDirTable(String dirKey) throws IOException {
+ return deletedDirectoryTable.isExist(dirKey);
+ }
+
+ private boolean isFileKeyInDeletedTable(String fileKey) throws IOException {
+ return deletedTable.isExist(fileKey);
+ }
+
+ private void handleUnreachableAndUnreferencedObjects(OmVolumeArgs volume, OmBucketInfo bucket) throws IOException {
+ // Check for unreachable and unreferenced directories in the bucket.
+ String bucketPrefix = OM_KEY_PREFIX +
+ volume.getObjectID() +
+ OM_KEY_PREFIX +
+ bucket.getObjectID();
+
+ try (TableIterator> dirIterator =
+ directoryTable.iterator()) {
+ dirIterator.seek(bucketPrefix);
+ while (dirIterator.hasNext()) {
+ Table.KeyValue dirEntry = dirIterator.next();
+ String dirKey = dirEntry.getKey();
+
+ // Only search directories in this bucket.
+ if (!dirKey.startsWith(bucketPrefix)) {
+ break;
+ }
+
+ if (!isReachable(dirKey)) {
+ if (!isDirectoryInDeletedDirTable(dirKey)) {
+ System.out.println("Found unreferenced directory: " + dirKey);
+ unreferencedStats.addDir();
+
+ if (!repair) {
+ if (verbose) {
+ System.out.println("Marking unreferenced directory " + dirKey + " for deletion.");
+ }
+ } else {
+ System.out.println("Deleting unreferenced directory " + dirKey);
+ OmDirectoryInfo dirInfo = dirEntry.getValue();
+ markDirectoryForDeletion(volume.getVolume(), bucket.getBucketName(), dirKey, dirInfo);
+ }
+ } else {
+ unreachableStats.addDir();
+ }
+ }
+ }
+ }
+
+ // Check for unreachable and unreferenced files
+ try (TableIterator>
+ fileIterator = fileTable.iterator()) {
+ fileIterator.seek(bucketPrefix);
+ while (fileIterator.hasNext()) {
+ Table.KeyValue fileEntry = fileIterator.next();
+ String fileKey = fileEntry.getKey();
+ // Only search files in this bucket.
+ if (!fileKey.startsWith(bucketPrefix)) {
+ break;
+ }
+
+ OmKeyInfo fileInfo = fileEntry.getValue();
+ if (!isReachable(fileKey)) {
+ if (!isFileKeyInDeletedTable(fileKey)) {
+ System.out.println("Found unreferenced file: " + fileKey);
+ unreferencedStats.addFile(fileInfo.getDataSize());
+
+ if (!repair) {
+ if (verbose) {
+ System.out.println("Marking unreferenced file " + fileKey + " for deletion." + fileKey);
+ }
+ } else {
+ System.out.println("Deleting unreferenced file " + fileKey);
+ markFileForDeletion(fileKey, fileInfo);
+ }
+ } else {
+ unreachableStats.addFile(fileInfo.getDataSize());
+ }
+ } else {
+ // NOTE: We are deserializing the proto of every reachable file
+ // just to log it's size. If we don't need this information we could
+ // save time by skipping this step.
+ reachableStats.addFile(fileInfo.getDataSize());
+ }
+ }
+ }
+ }
+
+ protected void markFileForDeletion(String fileKey, OmKeyInfo fileInfo) throws IOException {
+ try (BatchOperation batch = store.initBatchOperation()) {
+ fileTable.deleteWithBatch(batch, fileKey);
+
+ RepeatedOmKeyInfo originalRepeatedKeyInfo = deletedTable.get(fileKey);
+ RepeatedOmKeyInfo updatedRepeatedOmKeyInfo = OmUtils.prepareKeyForDelete(
+ fileInfo, fileInfo.getUpdateID(), true);
+ // NOTE: The FSO code seems to write the open key entry with the whole
+ // path, using the object's names instead of their ID. This would only
+ // be possible when the file is deleted explicitly, and not part of a
+ // directory delete. It is also not possible here if the file's parent
+ // is gone. The name of the key does not matter so just use IDs.
+ deletedTable.putWithBatch(batch, fileKey, updatedRepeatedOmKeyInfo);
+ if (verbose) {
+ System.out.println("Added entry " + fileKey + " to open key table: " + updatedRepeatedOmKeyInfo);
+ }
+ store.commitBatchOperation(batch);
+ }
+ }
+
+ protected void markDirectoryForDeletion(String volumeName, String bucketName,
+ String dirKeyName, OmDirectoryInfo dirInfo) throws IOException {
+ try (BatchOperation batch = store.initBatchOperation()) {
+ directoryTable.deleteWithBatch(batch, dirKeyName);
+ // HDDS-7592: Make directory entries in deleted dir table unique.
+ String deleteDirKeyName = dirKeyName + OM_KEY_PREFIX + dirInfo.getObjectID();
+
+ // Convert the directory to OmKeyInfo for deletion.
+ OmKeyInfo dirAsKeyInfo = OMFileRequest.getOmKeyInfo(volumeName, bucketName, dirInfo, dirInfo.getName());
+ deletedDirectoryTable.putWithBatch(batch, deleteDirKeyName, dirAsKeyInfo);
+
+ store.commitBatchOperation(batch);
+ }
+ }
+
+ private Collection getChildDirectoriesAndMarkAsReachable(OmVolumeArgs volume, OmBucketInfo bucket,
+ WithObjectID currentDir) throws IOException {
+
+ Collection childDirs = new ArrayList<>();
+
+ try (TableIterator>
+ dirIterator = directoryTable.iterator()) {
+ String dirPrefix = buildReachableKey(volume, bucket, currentDir);
+ // Start searching the directory table at the current directory's
+ // prefix to get its immediate children.
+ dirIterator.seek(dirPrefix);
+ while (dirIterator.hasNext()) {
+ Table.KeyValue childDirEntry = dirIterator.next();
+ String childDirKey = childDirEntry.getKey();
+ // Stop processing once we have seen all immediate children of this
+ // directory.
+ if (!childDirKey.startsWith(dirPrefix)) {
+ break;
+ }
+ // This directory was reached by search.
+ addReachableEntry(volume, bucket, childDirEntry.getValue());
+ childDirs.add(childDirKey);
+ reachableStats.addDir();
+ }
+ }
+
+ return childDirs;
+ }
+
+ /**
+ * Add the specified object to the reachable table, indicating it is part
+ * of the connected FSO tree.
+ */
+ private void addReachableEntry(OmVolumeArgs volume, OmBucketInfo bucket, WithObjectID object) throws IOException {
+ String reachableKey = buildReachableKey(volume, bucket, object);
+ // No value is needed for this table.
+ reachableDB.getTable(REACHABLE_TABLE, String.class, byte[].class).put(reachableKey, new byte[]{});
+ }
+
+ /**
+ * Build an entry in the reachable table for the current object, which
+ * could be a bucket, file or directory.
+ */
+ private static String buildReachableKey(OmVolumeArgs volume, OmBucketInfo bucket, WithObjectID object) {
+ return OM_KEY_PREFIX +
+ volume.getObjectID() +
+ OM_KEY_PREFIX +
+ bucket.getObjectID() +
+ OM_KEY_PREFIX +
+ object.getObjectID();
+ }
+
+ /**
+ *
+ * @param fileOrDirKey The key of a file or directory in RocksDB.
+ * @return true if the entry's parent is in the reachable table.
+ */
+ protected boolean isReachable(String fileOrDirKey) throws IOException {
+ String reachableParentKey = buildReachableParentKey(fileOrDirKey);
+
+ return reachableDB.getTable(REACHABLE_TABLE, String.class, byte[].class).get(reachableParentKey) != null;
+ }
+
+ /**
+ * Build an entry in the reachable table for the current object's parent
+ * object. The object could be a file or directory.
+ */
+ private static String buildReachableParentKey(String fileOrDirKey) {
+ String[] keyParts = fileOrDirKey.split(OM_KEY_PREFIX);
+ // Should be /volID/bucketID/parentID/name
+ // The first part will be blank since key begins with a slash.
+ Preconditions.assertTrue(keyParts.length >= 4);
+ String volumeID = keyParts[1];
+ String bucketID = keyParts[2];
+ String parentID = keyParts[3];
+
+ return OM_KEY_PREFIX +
+ volumeID +
+ OM_KEY_PREFIX +
+ bucketID +
+ OM_KEY_PREFIX +
+ parentID;
+ }
+
+ private void openReachableDB() throws IOException {
+ File reachableDBFile = new File(new File(omDBPath).getParentFile(), "reachable.db");
+ System.out.println("Creating database of reachable directories at " + reachableDBFile);
+ // Delete the DB from the last run if it exists.
+ if (reachableDBFile.exists()) {
+ FileUtils.deleteDirectory(reachableDBFile);
+ }
+
+ ConfigurationSource conf = new OzoneConfiguration();
+ reachableDB = DBStoreBuilder.newBuilder(conf)
+ .setName("reachable.db")
+ .setPath(reachableDBFile.getParentFile().toPath())
+ .addTable(REACHABLE_TABLE)
+ .build();
+ }
+
+ private void closeReachableDB() throws IOException {
+ if (reachableDB != null) {
+ reachableDB.close();
+ }
+ File reachableDBFile = new File(new File(omDBPath).getParentFile(), "reachable.db");
+ if (reachableDBFile.exists()) {
+ FileUtils.deleteDirectory(reachableDBFile);
+ }
+ }
+
+ /**
+ * Define a Report to be created.
+ */
+ public static class Report {
+ private final ReportStatistics reachable;
+ private final ReportStatistics unreachable;
+ private final ReportStatistics unreferenced;
+
+ /**
+ * Builds one report that is the aggregate of multiple others.
+ */
+ public Report(FSORepairTool.Report... reports) {
+ reachable = new ReportStatistics();
+ unreachable = new ReportStatistics();
+ unreferenced = new ReportStatistics();
+
+ for (FSORepairTool.Report report : reports) {
+ reachable.add(report.reachable);
+ unreachable.add(report.unreachable);
+ unreferenced.add(report.unreferenced);
+ }
+ }
+
+ private Report(FSORepairTool.Report.Builder builder) {
+ this.reachable = builder.reachable;
+ this.unreachable = builder.unreachable;
+ this.unreferenced = builder.unreferenced;
+ }
+
+ public ReportStatistics getReachable() {
+ return reachable;
+ }
+
+ public ReportStatistics getUnreachable() {
+ return unreachable;
+ }
+
+ public ReportStatistics getUnreferenced() {
+ return unreferenced;
+ }
+
+ public String toString() {
+ return "Reachable:" + reachable + "\nUnreachable:" + unreachable + "\nUnreferenced:" + unreferenced;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ FSORepairTool.Report report = (FSORepairTool.Report) other;
+
+ // Useful for testing.
+ System.out.println("Comparing reports\nExpect:\n" + this + "\nActual:\n" + report);
+
+ return reachable.equals(report.reachable) && unreachable.equals(report.unreachable) &&
+ unreferenced.equals(report.unreferenced);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(reachable, unreachable, unreferenced);
+ }
+
+ /**
+ * Builder class for a Report.
+ */
+ public static final class Builder {
+ private ReportStatistics reachable = new ReportStatistics();
+ private ReportStatistics unreachable = new ReportStatistics();
+ private ReportStatistics unreferenced = new ReportStatistics();
+
+ public Builder() {
+ }
+
+ public Builder setReachable(ReportStatistics reachable) {
+ this.reachable = reachable;
+ return this;
+ }
+
+ public Builder setUnreachable(ReportStatistics unreachable) {
+ this.unreachable = unreachable;
+ return this;
+ }
+
+ public Builder setUnreferenced(ReportStatistics unreferenced) {
+ this.unreferenced = unreferenced;
+ return this;
+ }
+
+ public Report build() {
+ return new Report(this);
+ }
+ }
+ }
+
+ /**
+ * Represents the statistics of reachable and unreachable data.
+ * This gives the count of dirs, files and bytes.
+ */
+
+ public static class ReportStatistics {
+ private long dirs;
+ private long files;
+ private long bytes;
+
+ public ReportStatistics() { }
+
+ public ReportStatistics(long dirs, long files, long bytes) {
+ this.dirs = dirs;
+ this.files = files;
+ this.bytes = bytes;
+ }
+
+ public void add(ReportStatistics other) {
+ this.dirs += other.dirs;
+ this.files += other.files;
+ this.bytes += other.bytes;
+ }
+
+ public long getDirs() {
+ return dirs;
+ }
+
+ public long getFiles() {
+ return files;
+ }
+
+ public long getBytes() {
+ return bytes;
+ }
+
+ @Override
+ public String toString() {
+ return "\n\tDirectories: " + dirs +
+ "\n\tFiles: " + files +
+ "\n\tBytes: " + bytes;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ ReportStatistics stats = (ReportStatistics) other;
+
+ return bytes == stats.bytes && files == stats.files && dirs == stats.dirs;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bytes, files, dirs);
+ }
+
+ public void addDir() {
+ dirs++;
+ }
+
+ public void addFile(long size) {
+ files++;
+ bytes += size;
+ }
+ }
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/OMRepair.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/OMRepair.java
new file mode 100644
index 00000000000..56d42d23f49
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/repair/om/OMRepair.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.repair.om;
+
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.RepairSubcommand;
+import org.kohsuke.MetaInfServices;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Ozone Repair CLI for OM.
+ */
+@CommandLine.Command(name = "om",
+ subcommands = {
+ FSORepairCLI.class,
+ },
+ description = "Operational tool to repair OM.")
+@MetaInfServices(RepairSubcommand.class)
+public class OMRepair implements Callable, RepairSubcommand {
+
+ @CommandLine.Spec
+ private CommandLine.Model.CommandSpec spec;
+
+ @Override
+ public Void call() {
+ GenericCli.missingSubcommand(spec);
+ return null;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 869afebf493..6f27c486c15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,9 +44,28 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
- ${distMgmtSnapshotsId}
- ${distMgmtSnapshotsName}
- ${distMgmtSnapshotsUrl}
+ apache.snapshots
+ https://repository.apache.org/snapshots
+
+ false
+ never
+
+
+ false
+ never
+
+
+
+ apache.snapshots.https
+ https://repository.apache.org/content/repositories/snapshots
+
+ false
+ never
+
+
+ false
+ never
+
@@ -277,7 +296,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
4.2.2
0.45.1
3.5.0
- 2.4.0
+ 2.5.0
1.0-beta-1
1.0-M1
3.6.0
@@ -1974,7 +1993,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
package
- makeAggregateBom
+ makeBom