Skip to content

Commit

Permalink
HDDS-11453. OmSnapshotPurge should be in a different ozone manager do…
Browse files Browse the repository at this point in the history
…uble buffer batch (apache#7188)
  • Loading branch information
swamirishi authored and sarvekshayr committed Oct 7, 2024
1 parent 92061f7 commit 5895197
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -426,28 +427,30 @@ private String addToBatch(Queue<Entry> buffer, BatchOperation batchOperation) {
* in RocksDB callback flush. If multiple operations are flushed in one
* specific batch, we are not sure at the flush of which specific operation
* the callback is coming.
* There could be a possibility of race condition that is exposed to rocksDB
* behaviour for the batch.
* PurgeSnapshot is also considered a barrier, since purgeSnapshot transaction on a standalone basis is an
* idempotent operation. Once the directory gets deleted the previous transactions that have been performed on the
* snapshotted rocksdb would start failing on replay since those transactions have not been committed but the
* directory could have been partially deleted/ fully deleted. This could also lead to inconsistencies in the DB
* reads from the purged rocksdb if operations are not performed consciously.
* There could be a possibility of race condition that is exposed to rocksDB behaviour for the batch.
* Hence, we treat createSnapshot as separate batch flush.
* <p>
* e.g. requestBuffer = [request1, request2, snapshotRequest1,
* request3, snapshotRequest2, request4]
* response = [[request1, request2], [snapshotRequest1], [request3],
* [snapshotRequest2], [request4]]
*/
private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
private synchronized List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
final List<Queue<Entry>> response = new ArrayList<>();

OMResponse previousOmResponse = null;
for (final Entry entry : readyBuffer) {
OMResponse omResponse = entry.getResponse().getOMResponse();
// New queue gets created in three conditions:
// 1. It is first element in the response,
// 2. Current request is createSnapshot request.
// 3. Previous request was createSnapshot request.
if (response.isEmpty() || omResponse.hasCreateSnapshotResponse()
|| (previousOmResponse != null &&
previousOmResponse.hasCreateSnapshotResponse())) {
// 2. Current request is createSnapshot/purgeSnapshot request.
// 3. Previous request was createSnapshot/purgeSnapshot request.
if (response.isEmpty() || isStandaloneBatchCmdTypes(omResponse)
|| isStandaloneBatchCmdTypes(previousOmResponse)) {
response.add(new LinkedList<>());
}

Expand All @@ -458,6 +461,15 @@ private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
return response;
}

private static boolean isStandaloneBatchCmdTypes(OMResponse response) {
if (response == null) {
return false;
}
final OzoneManagerProtocolProtos.Type type = response.getCmdType();
return type == OzoneManagerProtocolProtos.Type.SnapshotPurge
|| type == OzoneManagerProtocolProtos.Type.CreateSnapshot;
}

private void addCleanupEntry(Entry entry, Map<String, List<Long>> cleanupEpochs) {
Class<? extends OMClientResponse> responseClass =
entry.getResponse().getClass();
Expand Down Expand Up @@ -612,7 +624,7 @@ int getCurrentBufferSize() {
return currentBuffer.size();
}

int getReadyBufferSize() {
synchronized int getReadyBufferSize() {
return readyBuffer.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
Expand Down Expand Up @@ -81,12 +81,12 @@ class TestOzoneManagerDoubleBuffer {
private OzoneManagerDoubleBuffer doubleBuffer;
private OzoneManager ozoneManager;
private S3SecretLockedManager secretManager;
private final CreateSnapshotResponse snapshotResponse1 = mock(CreateSnapshotResponse.class);
private final CreateSnapshotResponse snapshotResponse2 = mock(CreateSnapshotResponse.class);
private final OMResponse omKeyResponse = mock(OMResponse.class);
private final OMResponse omBucketResponse = mock(OMResponse.class);
private final OMResponse omSnapshotResponse1 = mock(OMResponse.class);
private final OMResponse omSnapshotResponse2 = mock(OMResponse.class);
private final OMResponse omSnapshotPurgeResponseProto1 = mock(OMResponse.class);
private final OMResponse omSnapshotPurgeResponseProto2 = mock(OMResponse.class);
private static OMClientResponse omKeyCreateResponse =
mock(OMKeyCreateResponse.class);
private static OMClientResponse omBucketCreateResponse =
Expand All @@ -95,6 +95,9 @@ class TestOzoneManagerDoubleBuffer {
mock(OMSnapshotCreateResponse.class);
private static OMClientResponse omSnapshotCreateResponse2 =
mock(OMSnapshotCreateResponse.class);
private static OMClientResponse omSnapshotPurgeResponse1 = mock(OMSnapshotPurgeResponse.class);
private static OMClientResponse omSnapshotPurgeResponse2 = mock(OMSnapshotPurgeResponse.class);

@TempDir
private File tempDir;
private OzoneManagerDoubleBuffer.FlushNotifier flushNotifier;
Expand Down Expand Up @@ -143,26 +146,33 @@ public void setup() throws IOException {
doNothing().when(omBucketCreateResponse).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotCreateResponse1).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotCreateResponse2).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotPurgeResponse1).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotPurgeResponse2).checkAndUpdateDB(any(), any());

when(omKeyResponse.getTraceID()).thenReturn("keyTraceId");
when(omBucketResponse.getTraceID()).thenReturn("bucketTraceId");
when(omSnapshotResponse1.getTraceID()).thenReturn("snapshotTraceId-1");
when(omSnapshotResponse2.getTraceID()).thenReturn("snapshotTraceId-2");
when(omSnapshotResponse1.hasCreateSnapshotResponse())
.thenReturn(true);
when(omSnapshotResponse2.hasCreateSnapshotResponse())
.thenReturn(true);
when(omSnapshotResponse1.getCreateSnapshotResponse())
.thenReturn(snapshotResponse1);
when(omSnapshotResponse2.getCreateSnapshotResponse())
.thenReturn(snapshotResponse2);
when(omSnapshotPurgeResponseProto1.getTraceID()).thenReturn("snapshotPurgeTraceId-1");
when(omSnapshotPurgeResponseProto2.getTraceID()).thenReturn("snapshotPurgeTraceId-2");

when(omKeyResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateKey);
when(omBucketResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateBucket);
when(omSnapshotPurgeResponseProto1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
when(omSnapshotPurgeResponseProto2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
when(omSnapshotResponse1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
when(omSnapshotResponse2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);

when(omKeyCreateResponse.getOMResponse()).thenReturn(omKeyResponse);
when(omBucketCreateResponse.getOMResponse()).thenReturn(omBucketResponse);
when(omSnapshotCreateResponse1.getOMResponse())
.thenReturn(omSnapshotResponse1);
when(omSnapshotCreateResponse2.getOMResponse())
.thenReturn(omSnapshotResponse2);
when(omSnapshotPurgeResponse1.getOMResponse())
.thenReturn(omSnapshotPurgeResponseProto1);
when(omSnapshotPurgeResponse2.getOMResponse())
.thenReturn(omSnapshotPurgeResponseProto2);
}

@AfterEach
Expand Down Expand Up @@ -194,8 +204,35 @@ private static Stream<Arguments> doubleBufferFlushCases() {
omSnapshotCreateResponse1,
omSnapshotCreateResponse2,
omBucketCreateResponse),
4L, 4L, 14L, 16L, 1L, 1.142F)
);
4L, 4L, 14L, 16L, 1L, 1.142F),
Arguments.of(Arrays.asList(omSnapshotPurgeResponse1,
omSnapshotPurgeResponse2),
2L, 2L, 16L, 18L, 1L, 1.125F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omBucketCreateResponse,
omSnapshotPurgeResponse1,
omSnapshotPurgeResponse2),
3L, 4L, 19L, 22L, 2L, 1.157F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omSnapshotPurgeResponse1,
omBucketCreateResponse,
omSnapshotPurgeResponse2),
4L, 4L, 23L, 26L, 1L, 1.1300F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omSnapshotPurgeResponse1,
omSnapshotPurgeResponse2,
omBucketCreateResponse),
4L, 4L, 27L, 30L, 1L, 1.111F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omBucketCreateResponse,
omSnapshotPurgeResponse1,
omSnapshotCreateResponse1,
omSnapshotPurgeResponse2,
omBucketCreateResponse,
omSnapshotCreateResponse2),
6L, 7L, 33L, 37L, 2L, 1.121F)

);
}

/**
Expand Down

0 comments on commit 5895197

Please sign in to comment.