Skip to content

Commit 6039e0e

Browse files
Add javadocs
1 parent f932c8d commit 6039e0e

File tree

5 files changed

+70
-83
lines changed

5 files changed

+70
-83
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
3131
import org.apache.hadoop.fs.store.DataBlocks;
3232

33+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
34+
3335
/**
3436
* Manages Azure Blob blocks for append operations.
3537
*/
@@ -61,7 +63,7 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
6163
if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) {
6264
List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext());
6365
if (!committedBlocks.isEmpty()) {
64-
committedBlockEntries.append(String.join(",", committedBlocks)).append(",");
66+
committedBlockEntries.append(String.join(COMMA, committedBlocks));
6567
}
6668
}
6769
LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}",
@@ -182,7 +184,7 @@ protected synchronized boolean hasBlocksToCommit() throws IOException {
182184
}
183185
// Append the current block's ID to the committedBlockBuilder
184186
if (committedBlockEntries.length() > 0) {
185-
committedBlockEntries.append(",");
187+
committedBlockEntries.append(COMMA);
186188
}
187189
committedBlockEntries.append(current.getBlockId());
188190
LOG.debug("Block {} added to committed entries.", current.getBlockId());

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ protected InvalidIngressServiceException getIngressHandlerSwitchException(
198198
*
199199
* @return the block manager
200200
*/
201-
public abstract AzureBlockManager getBlockManager();
201+
protected abstract AzureBlockManager getBlockManager();
202202

203203
/**
204204
* Gets the client associated with this handler.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,7 @@ void createRenamePendingJson(Path path, byte[] bytes)
221221
abfsClient.append(path.toUri().getPath(), bytes,
222222
appendRequestParameters, null, null, tracingContext);
223223

224-
//List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId));
225-
String blockList = generateBlockListXml(blockId);
224+
String blockList = generateBlockListXml(blockId);
226225
// PutBlockList on the path.
227226
abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8),
228227
path.toUri().getPath(), true, null, null, eTag, null, tracingContext);

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java

Lines changed: 62 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,6 @@
8787
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
8888
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
8989
import static org.mockito.ArgumentMatchers.anyString;
90-
import static org.mockito.Mockito.doReturn;
91-
import static org.mockito.Mockito.spy;
92-
import static org.mockito.Mockito.when;
9390

9491
/**
9592
* Test append operations.
@@ -169,16 +166,16 @@ public void testCloseOfDataBlockOnAppendComplete() throws Exception {
169166
for (String blockBufferType : blockBufferTypes) {
170167
Configuration configuration = new Configuration(getRawConfiguration());
171168
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
172-
try (AzureBlobFileSystem fs = spy(
169+
try (AzureBlobFileSystem fs = Mockito.spy(
173170
(AzureBlobFileSystem) FileSystem.newInstance(configuration))) {
174-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
175-
doReturn(store).when(fs).getAbfsStore();
171+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
172+
Mockito.doReturn(store).when(fs).getAbfsStore();
176173
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
177174
Mockito.doAnswer(getBlobFactoryInvocation -> {
178-
DataBlocks.BlockFactory factory = spy(
175+
DataBlocks.BlockFactory factory = Mockito.spy(
179176
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
180177
Mockito.doAnswer(factoryCreateInvocation -> {
181-
dataBlock[0] = spy(
178+
dataBlock[0] = Mockito.spy(
182179
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
183180
return dataBlock[0];
184181
})
@@ -247,8 +244,11 @@ public void testCreateOverDfsAppendOverBlob() throws IOException {
247244
.isInstanceOf(AbfsDfsClient.class);
248245
}
249246

247+
/**
248+
* This test verifies that if multiple appends qualify for switch, no appends should fail.
249+
*/
250250
@Test
251-
public void testMultipleAppendSwitches() throws Exception {
251+
public void testMultipleAppendsQualifyForSwitch() throws Exception {
252252
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
253253
final AzureBlobFileSystem fs = getFileSystem();
254254
Path testPath = path(TEST_FILE_PATH);
@@ -309,8 +309,11 @@ public void testMultipleAppendSwitches() throws Exception {
309309
.isInstanceOf(AbfsDfsClient.class);
310310
}
311311

312+
/**
313+
* This test verifies that parallel writes on dfs and blob endpoint should not fail.
314+
*/
312315
@Test
313-
public void testParallelDfsBlob() throws Exception {
316+
public void testParallelWritesOnDfsAndBlob() throws Exception {
314317
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
315318
final AzureBlobFileSystem fs = getFileSystem();
316319
Path testPath = path(TEST_FILE_PATH);
@@ -413,10 +416,10 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
413416
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
414417
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
415418
String.valueOf(AbfsServiceType.DFS));
416-
try (AzureBlobFileSystem fs = spy(
419+
try (AzureBlobFileSystem fs = Mockito.spy(
417420
(AzureBlobFileSystem) FileSystem.newInstance(conf))) {
418-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
419-
doReturn(true).when(store).isAppendBlobKey(anyString());
421+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
422+
Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
420423

421424
// Set abfsStore as our mocked value.
422425
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
@@ -458,9 +461,9 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
458461
public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
459462
throws IOException, NoSuchFieldException, IllegalAccessException {
460463
assumeHnsEnabled("FNS does not support append blob creation for DFS endpoint");
461-
final AzureBlobFileSystem fs = spy(getFileSystem());
462-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
463-
doReturn(true).when(store).isAppendBlobKey(anyString());
464+
final AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
465+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
466+
Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());
464467

465468
// Set abfsStore as our mocked value.
466469
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
@@ -500,7 +503,6 @@ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
500503
}
501504

502505

503-
504506
/**
505507
* Tests the correct retrieval of the AzureIngressHandler based on the configured ingress service type.
506508
*
@@ -610,22 +612,6 @@ public void testRecreateAppendAndFlush() throws IOException {
610612
}
611613
}
612614

613-
@Test
614-
public void testFlush() throws IOException {
615-
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
616-
final AzureBlobFileSystem fs = getFileSystem();
617-
final Path filePath = path(TEST_FILE_PATH);
618-
fs.create(filePath);
619-
Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
620-
FSDataOutputStream outputStream = fs.append(filePath);
621-
outputStream.write(TEN);
622-
outputStream.write(20);
623-
outputStream.hsync();
624-
outputStream.write(30);
625-
outputStream.write(40);
626-
outputStream.close();
627-
}
628-
629615
/**
630616
* Recreate directory between append and flush. Etag mismatch happens.
631617
**/
@@ -834,7 +820,7 @@ public void testEtagMismatch() throws Exception {
834820
public void testAppendWithLease() throws Exception {
835821
final Path testFilePath = new Path(path(methodName.getMethodName()),
836822
TEST_FILE_PATH);
837-
final AzureBlobFileSystem fs = spy(
823+
final AzureBlobFileSystem fs = Mockito.spy(
838824
getCustomFileSystem(testFilePath.getParent(), 1));
839825
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
840826
FsAction.ALL);
@@ -876,18 +862,18 @@ public void testAppendImplicitDirectoryAzcopy() throws Exception {
876862
@Test
877863
public void testIntermittentAppendFailureToBeReported() throws Exception {
878864
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
879-
try (AzureBlobFileSystem fs = spy(
865+
try (AzureBlobFileSystem fs = Mockito.spy(
880866
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
881867
assumeHnsDisabled();
882-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
868+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
883869
assumeBlobServiceType();
884870

885-
AbfsClientHandler clientHandler = spy(store.getClientHandler());
886-
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
871+
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
872+
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
887873

888-
doReturn(clientHandler).when(store).getClientHandler();
889-
doReturn(blobClient).when(clientHandler).getBlobClient();
890-
doReturn(blobClient).when(clientHandler).getIngressClient();
874+
Mockito.doReturn(clientHandler).when(store).getClientHandler();
875+
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
876+
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
891877

892878
Mockito.doThrow(
893879
new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception()))
@@ -956,14 +942,14 @@ public void testIntermittentAppendFailureToBeReported() throws Exception {
956942
private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs,
957943
Path path,
958944
AbfsClient client) throws IOException {
959-
AbfsOutputStream abfsOutputStream = spy(
945+
AbfsOutputStream abfsOutputStream = Mockito.spy(
960946
(AbfsOutputStream) fs.create(path).getWrappedStream());
961-
AzureIngressHandler ingressHandler = spy(
947+
AzureIngressHandler ingressHandler = Mockito.spy(
962948
abfsOutputStream.getIngressHandler());
963-
doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
964-
doReturn(client).when(ingressHandler).getClient();
949+
Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
950+
Mockito.doReturn(client).when(ingressHandler).getClient();
965951

966-
FSDataOutputStream fsDataOutputStream = spy(
952+
FSDataOutputStream fsDataOutputStream = Mockito.spy(
967953
new FSDataOutputStream(abfsOutputStream, null));
968954
return fsDataOutputStream;
969955
}
@@ -976,22 +962,22 @@ private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs,
976962
@Test
977963
public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception {
978964
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
979-
try (AzureBlobFileSystem fs = spy(
965+
try (AzureBlobFileSystem fs = Mockito.spy(
980966
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
981-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
982-
AbfsClientHandler clientHandler = spy(store.getClientHandler());
983-
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
984-
AbfsDfsClient dfsClient = spy(clientHandler.getDfsClient());
967+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
968+
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
969+
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
970+
AbfsDfsClient dfsClient = Mockito.spy(clientHandler.getDfsClient());
985971

986972
AbfsClient client = clientHandler.getIngressClient();
987973
if (clientHandler.getIngressClient() instanceof AbfsBlobClient) {
988-
doReturn(blobClient).when(clientHandler).getBlobClient();
989-
doReturn(blobClient).when(clientHandler).getIngressClient();
974+
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
975+
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
990976
} else {
991-
doReturn(dfsClient).when(clientHandler).getDfsClient();
992-
doReturn(dfsClient).when(clientHandler).getIngressClient();
977+
Mockito.doReturn(dfsClient).when(clientHandler).getDfsClient();
978+
Mockito.doReturn(dfsClient).when(clientHandler).getIngressClient();
993979
}
994-
doReturn(clientHandler).when(store).getClientHandler();
980+
Mockito.doReturn(clientHandler).when(store).getClientHandler();
995981

996982
byte[] bytes = new byte[1024 * 1024 * 8];
997983
new Random().nextBytes(bytes);
@@ -1081,21 +1067,21 @@ private String generateBlockId(AbfsOutputStream os, long position) {
10811067
public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Exception {
10821068
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
10831069
// Create a spy of AzureBlobFileSystem
1084-
try (AzureBlobFileSystem fs = spy(
1070+
try (AzureBlobFileSystem fs = Mockito.spy(
10851071
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
10861072
assumeHnsDisabled();
10871073
// Create a spy of AzureBlobFileSystemStore
1088-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
1074+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
10891075
assumeBlobServiceType();
10901076

10911077
// Create spies for the client handler and blob client
1092-
AbfsClientHandler clientHandler = spy(store.getClientHandler());
1093-
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
1078+
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
1079+
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
10941080

10951081
// Set up the spies to return the mocked objects
1096-
doReturn(clientHandler).when(store).getClientHandler();
1097-
doReturn(blobClient).when(clientHandler).getBlobClient();
1098-
doReturn(blobClient).when(clientHandler).getIngressClient();
1082+
Mockito.doReturn(clientHandler).when(store).getClientHandler();
1083+
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
1084+
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
10991085
AtomicInteger flushCount = new AtomicInteger(0);
11001086
FSDataOutputStream os = createMockedOutputStream(fs,
11011087
new Path("/test/file"), blobClient);
@@ -1119,10 +1105,10 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep
11191105

11201106
int currentCount = flushCount.incrementAndGet();
11211107
if (currentCount == 1) {
1122-
when(httpOperation.getStatusCode())
1108+
Mockito.when(httpOperation.getStatusCode())
11231109
.thenReturn(
11241110
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
1125-
when(httpOperation.getStorageErrorMessage())
1111+
Mockito.when(httpOperation.getStorageErrorMessage())
11261112
.thenReturn("CONNECTION_RESET"); // Error message
11271113
throw new IOException("Connection Reset");
11281114
}
@@ -1177,22 +1163,22 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep
11771163
public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exception {
11781164
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
11791165
// Create a spy of AzureBlobFileSystem
1180-
try (AzureBlobFileSystem fs = spy(
1166+
try (AzureBlobFileSystem fs = Mockito.spy(
11811167
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
11821168
assumeHnsDisabled();
11831169

11841170
// Create a spy of AzureBlobFileSystemStore
1185-
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
1171+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
11861172
assumeBlobServiceType();
11871173

11881174
// Create spies for the client handler and blob client
1189-
AbfsClientHandler clientHandler = spy(store.getClientHandler());
1190-
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
1175+
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
1176+
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
11911177

11921178
// Set up the spies to return the mocked objects
1193-
doReturn(clientHandler).when(store).getClientHandler();
1194-
doReturn(blobClient).when(clientHandler).getBlobClient();
1195-
doReturn(blobClient).when(clientHandler).getIngressClient();
1179+
Mockito.doReturn(clientHandler).when(store).getClientHandler();
1180+
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
1181+
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
11961182
AtomicInteger flushCount = new AtomicInteger(0);
11971183
FSDataOutputStream os = createMockedOutputStream(fs,
11981184
new Path("/test/file"), blobClient);
@@ -1216,16 +1202,16 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc
12161202

12171203
int currentCount = flushCount.incrementAndGet();
12181204
if (currentCount == 1) {
1219-
when(httpOperation.getStatusCode())
1205+
Mockito.when(httpOperation.getStatusCode())
12201206
.thenReturn(
12211207
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
1222-
when(httpOperation.getStorageErrorMessage())
1208+
Mockito.when(httpOperation.getStorageErrorMessage())
12231209
.thenReturn("CONNECTION_RESET"); // Error message
12241210
throw new IOException("Connection Reset");
12251211
} else if (currentCount == 2) {
1226-
when(httpOperation.getStatusCode())
1212+
Mockito.when(httpOperation.getStatusCode())
12271213
.thenReturn(HTTP_OK);
1228-
when(httpOperation.getStorageErrorMessage())
1214+
Mockito.when(httpOperation.getStorageErrorMessage())
12291215
.thenReturn("HTTP_OK");
12301216
}
12311217
return null;

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ public void testGetAclCallOnHnsConfigAbsence() throws Exception {
7979
AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(
8080
getRawConfiguration()));
8181
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
82-
AbfsClient client = Mockito.spy(fs.getAbfsClient());
83-
Mockito.doReturn(client).when(store).getClient();
82+
AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient(AbfsServiceType.DFS));
83+
Mockito.doReturn(client).when(store).getClient(AbfsServiceType.DFS);
8484

8585
Mockito.doThrow(TrileanConversionException.class)
8686
.when(store)

0 commit comments

Comments
 (0)