Skip to content

Commit e0d739a

Browse files
HADOOP-19444: ABFS: [FnsOverBlob][Tests] Add Tests For Negative Scenarios Identified for Ingress Operations (#7424)
Contributed by Anmol Asrani Reviewed by Manish Bhatt, Manika Joshi Signed off by Anuj Modi <anujmodi@apache.org>
1 parent 2b32e46 commit e0d739a

File tree

13 files changed

+228
-34
lines changed

13 files changed

+228
-34
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
7979
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
8080
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
81+
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE;
8182

8283
/**
8384
* Configuration for Azure Blob FileSystem.
@@ -560,6 +561,10 @@ public void validateConfiguredServiceType(boolean isHNSEnabled)
560561
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
561562
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
562563
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
564+
} else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB
565+
&& getIngressServiceType() == AbfsServiceType.DFS) {
566+
throw new InvalidConfigurationValueException(
567+
FS_AZURE_INGRESS_SERVICE_TYPE, INCORRECT_INGRESS_TYPE);
563568
}
564569
}
565570

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2011,18 +2011,24 @@ private boolean isEmptyListResults(AbfsHttpOperation result) {
20112011
}
20122012

20132013
/**
2014-
* Generates an XML string representing the block list.
2014+
* Generate the XML block list using a comma-separated string of block IDs.
20152015
*
2016-
* @param blockIds the set of block IDs
2017-
* @return the generated XML string
2016+
* @param blockIdString The comma-separated block IDs.
2017+
* @return the XML representation of the block list.
20182018
*/
2019-
public static String generateBlockListXml(List<String> blockIds) {
2019+
public static String generateBlockListXml(String blockIdString) {
20202020
StringBuilder stringBuilder = new StringBuilder();
20212021
stringBuilder.append(String.format(XML_VERSION));
20222022
stringBuilder.append(String.format(BLOCK_LIST_START_TAG));
2023-
for (String blockId : blockIds) {
2024-
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));
2023+
2024+
// Split the block ID string by commas and generate XML for each block ID
2025+
if (!blockIdString.isEmpty()) {
2026+
String[] blockIds = blockIdString.split(",");
2027+
for (String blockId : blockIds) {
2028+
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));
2029+
}
20252030
}
2031+
20262032
stringBuilder.append(String.format(BLOCK_LIST_END_TAG));
20272033
return stringBuilder.toString();
20282034
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,6 @@ public final class AbfsErrors {
7373
"Error while recovering from create failure.";
7474
public static final String ERR_RENAME_RECOVERY =
7575
"Error while recovering from rename failure.";
76+
public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem.";
7677
private AbfsErrors() {}
7778
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -708,9 +708,7 @@ public synchronized void close() throws IOException {
708708
bufferIndex = 0;
709709
closed = true;
710710
writeOperations.clear();
711-
if (getBlockManager().hasActiveBlock()) {
712-
getBlockManager().clearActiveBlock();
713-
}
711+
getBlockManager().close();
714712
}
715713
LOG.debug("Closing AbfsOutputStream : {}", this);
716714
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
3131
import org.apache.hadoop.fs.store.DataBlocks;
3232

33+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
34+
3335
/**
3436
* Manages Azure Blob blocks for append operations.
3537
*/
@@ -38,9 +40,8 @@ public class AzureBlobBlockManager extends AzureBlockManager {
3840
private static final Logger LOG = LoggerFactory.getLogger(
3941
AbfsOutputStream.class);
4042

41-
42-
/** The list of already committed blocks is stored in this list. */
43-
private List<String> committedBlockEntries = new ArrayList<>();
43+
/** Cached list of committed block IDs */
44+
private final StringBuilder committedBlockEntries = new StringBuilder();
4445

4546
/** The list to store blockId, position, and status. */
4647
private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
@@ -60,7 +61,10 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
6061
throws AzureBlobFileSystemException {
6162
super(abfsOutputStream, blockFactory, bufferSize);
6263
if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) {
63-
this.committedBlockEntries = getBlockList(abfsOutputStream.getTracingContext());
64+
List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext());
65+
if (!committedBlocks.isEmpty()) {
66+
committedBlockEntries.append(String.join(COMMA, committedBlocks));
67+
}
6468
}
6569
LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}",
6670
abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
@@ -146,11 +150,12 @@ protected synchronized void updateEntry(AbfsBlock block) {
146150
* @return whether we have some data to commit or not.
147151
* @throws IOException if an I/O error occurs
148152
*/
149-
protected synchronized boolean hasListToCommit() throws IOException {
153+
protected synchronized boolean hasBlocksToCommit() throws IOException {
150154
// Adds all the committed blocks if available to the list of blocks to be added in putBlockList.
151155
if (blockEntryList.isEmpty()) {
152156
return false; // No entries to commit
153157
}
158+
154159
while (!blockEntryList.isEmpty()) {
155160
BlockEntry current = blockEntryList.poll();
156161
if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
@@ -177,7 +182,11 @@ protected synchronized boolean hasListToCommit() throws IOException {
177182
throw new IOException(errorMessage);
178183
}
179184
}
180-
committedBlockEntries.add(current.getBlockId());
185+
// Append the current block's ID to the committedBlockBuilder
186+
if (committedBlockEntries.length() > 0) {
187+
committedBlockEntries.append(COMMA);
188+
}
189+
committedBlockEntries.append(current.getBlockId());
181190
LOG.debug("Block {} added to committed entries.", current.getBlockId());
182191
}
183192
return true;
@@ -188,7 +197,13 @@ protected synchronized boolean hasListToCommit() throws IOException {
188197
*
189198
* @return the block ID list
190199
*/
191-
protected List<String> getBlockIdList() {
192-
return committedBlockEntries;
200+
protected String getBlockIdToCommit() {
201+
return committedBlockEntries.toString();
202+
}
203+
204+
@Override
205+
public void close(){
206+
super.close();
207+
committedBlockEntries.setLength(0);
193208
}
194209
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,13 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
168168
if (getAbfsOutputStream().isAppendBlob()) {
169169
return null;
170170
}
171-
if (!blobBlockManager.hasListToCommit()) {
171+
if (!blobBlockManager.hasBlocksToCommit()) {
172172
return null;
173173
}
174174
try {
175175
// Generate the xml with the list of blockId's to generate putBlockList call.
176176
String blockListXml = generateBlockListXml(
177-
blobBlockManager.getBlockIdList());
177+
blobBlockManager.getBlockIdToCommit());
178178
TracingContext tracingContextFlush = new TracingContext(tracingContext);
179179
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
180180
tracingContextFlush.setPosition(String.valueOf(offset));
@@ -297,7 +297,8 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
297297
activeBlock, reqParams,
298298
new TracingContext(getAbfsOutputStream().getTracingContext()));
299299
} catch (InvalidIngressServiceException ex) {
300-
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath());
300+
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.",
301+
getAbfsOutputStream().getPath());
301302
getAbfsOutputStream().switchHandler();
302303
op = getAbfsOutputStream().getIngressHandler()
303304
.remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,12 @@ void clearActiveBlock() {
167167
activeBlock = null;
168168
}
169169
}
170+
171+
// Used to clear any resources used by the block manager.
172+
void close() {
173+
if (hasActiveBlock()) {
174+
clearActiveBlock();
175+
}
176+
LOG.debug("AzureBlockManager closed.");
177+
}
170178
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,10 @@ protected synchronized AbfsBlock getActiveBlock() {
8686
protected synchronized boolean hasActiveBlock() {
8787
return super.hasActiveBlock();
8888
}
89+
90+
@Override
91+
public void close() {
92+
super.close();
93+
LOG.debug("AzureDFSBlockManager closed.");
94+
}
8995
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
147147
final String leaseId,
148148
TracingContext tracingContext) throws IOException {
149149
AbfsRestOperation op;
150-
if (!blobBlockManager.hasListToCommit()) {
150+
if (!blobBlockManager.hasBlocksToCommit()) {
151151
return null;
152152
}
153153
try {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
import java.io.IOException;
2222
import java.nio.charset.Charset;
2323
import java.nio.charset.StandardCharsets;
24-
import java.util.ArrayList;
25-
import java.util.Collections;
26-
import java.util.List;
2724
import java.util.Random;
2825

2926
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -221,8 +218,7 @@ void createRenamePendingJson(Path path, byte[] bytes)
221218
abfsClient.append(path.toUri().getPath(), bytes,
222219
appendRequestParameters, null, null, tracingContext);
223220

224-
List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId));
225-
String blockList = generateBlockListXml(blockIdList);
221+
String blockList = generateBlockListXml(blockId);
226222
// PutBlockList on the path.
227223
abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8),
228224
path.toUri().getPath(), true, null, null, eTag, null, tracingContext);

0 commit comments

Comments
 (0)