Skip to content

Commit ce57458

Browse files
committed
HADOOP-14546. Azure: Concurrent I/O does not work when secure.mode is enabled. Contributed by Thomas
(cherry picked from commit 7e031c2)
1 parent 1b1065a commit ce57458

File tree

5 files changed

+69
-87
lines changed

5 files changed

+69
-87
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,6 @@ private void connectToAzureStorageInSecureMode(String accountName,
852852
rootDirectory = container.getDirectoryReference("");
853853

854854
canCreateOrModifyContainer = true;
855-
tolerateOobAppends = false;
856855
}
857856

858857
/**
@@ -1911,8 +1910,7 @@ private OperationContext getInstrumentedContext(boolean bindConcurrentOOBIo) {
19111910
// If reads concurrent to OOB writes are allowed, the interception will reset
19121911
// the conditional header on all Azure blob storage read requests.
19131912
if (bindConcurrentOOBIo) {
1914-
SendRequestIntercept.bind(storageInteractionLayer.getCredentials(),
1915-
operationContext, true);
1913+
SendRequestIntercept.bind(operationContext);
19161914
}
19171915

19181916
if (testHookOperationContext != null) {

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SendRequestIntercept.java

Lines changed: 9 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -35,78 +35,30 @@
3535

3636
/**
3737
* Manages the lifetime of binding on the operation contexts to intercept send
38-
* request events to Azure storage.
38+
* request events to Azure storage and allow concurrent OOB I/Os.
3939
*/
4040
@InterfaceAudience.Private
4141
public final class SendRequestIntercept extends StorageEvent<SendingRequestEvent> {
4242

4343
public static final Log LOG = LogFactory.getLog(SendRequestIntercept.class);
4444

4545
private static final String ALLOW_ALL_REQUEST_PRECONDITIONS = "*";
46-
private final StorageCredentials storageCreds;
47-
private final boolean allowConcurrentOOBIo;
48-
private final OperationContext opContext;
4946

5047
/**
51-
* Getter returning the storage account credentials.
52-
*
53-
* @return storageCreds - account storage credentials.
54-
*/
55-
private StorageCredentials getCredentials() {
56-
return storageCreds;
57-
}
58-
59-
/**
60-
* Query if out-of-band I/Os are allowed.
61-
*
62-
* return allowConcurrentOOBIo - true if OOB I/O is allowed, and false
63-
* otherwise.
48+
* Hidden default constructor for SendRequestIntercept.
6449
*/
65-
private boolean isOutOfBandIoAllowed() {
66-
return allowConcurrentOOBIo;
67-
}
68-
69-
/**
70-
* Getter returning the operation context.
71-
*
72-
* @return storageCreds - account storage credentials.
73-
*/
74-
private OperationContext getOperationContext() {
75-
return opContext;
76-
}
77-
78-
/**
79-
* Constructor for SendRequestThrottle.
80-
*
81-
* @param storageCreds
82-
* - storage account credentials for signing packets.
83-
*
84-
*/
85-
private SendRequestIntercept(StorageCredentials storageCreds,
86-
boolean allowConcurrentOOBIo, OperationContext opContext) {
87-
// Capture the send delay callback interface.
88-
this.storageCreds = storageCreds;
89-
this.allowConcurrentOOBIo = allowConcurrentOOBIo;
90-
this.opContext = opContext;
50+
private SendRequestIntercept() {
9151
}
9252

9353
/**
9454
* Binds a new lister to the operation context so the WASB file system can
95-
* appropriately intercept sends. By allowing concurrent OOB I/Os, we bypass
96-
* the blob immutability check when reading streams.
55+
* appropriately intercept sends and allow concurrent OOB I/Os. This
56+
* by-passes the blob immutability check when reading streams.
9757
*
98-
* @param storageCreds The credential of blob storage.
99-
* @param opContext
100-
* The operation context to bind to listener.
101-
*
102-
* @param allowConcurrentOOBIo
103-
* True if reads are allowed with concurrent OOB writes.
58+
* @param opContext the operation context assocated with this request.
10459
*/
105-
public static void bind(StorageCredentials storageCreds,
106-
OperationContext opContext, boolean allowConcurrentOOBIo) {
107-
SendRequestIntercept sendListener = new SendRequestIntercept(storageCreds,
108-
allowConcurrentOOBIo, opContext);
109-
opContext.getSendingRequestEventHandler().addListener(sendListener);
60+
public static void bind(OperationContext opContext) {
61+
opContext.getSendingRequestEventHandler().addListener(new SendRequestIntercept());
11062
}
11163

11264
/**
@@ -134,36 +86,11 @@ public void eventOccurred(SendingRequestEvent sendEvent) {
13486
// Determine whether this is a download request by checking that the request
13587
// method
13688
// is a "GET" operation.
137-
if (urlConnection.getRequestMethod().equalsIgnoreCase("GET")
138-
&& isOutOfBandIoAllowed()) {
89+
if (urlConnection.getRequestMethod().equalsIgnoreCase("GET")) {
13990
// If concurrent reads on OOB writes are allowed, reset the if-match
14091
// condition on the conditional header.
14192
urlConnection.setRequestProperty(HeaderConstants.IF_MATCH,
14293
ALLOW_ALL_REQUEST_PRECONDITIONS);
143-
144-
// In the Java AzureSDK the packet is signed before firing the
145-
// SendRequest. Setting
146-
// the conditional packet header property changes the contents of the
147-
// packet, therefore the packet has to be re-signed.
148-
try {
149-
// Sign the request. GET's have no payload so the content length is
150-
// zero.
151-
StorageCredentialsHelper.signBlobQueueAndFileRequest(getCredentials(),
152-
urlConnection, -1L, getOperationContext());
153-
} catch (InvalidKeyException e) {
154-
// Log invalid key exception to track signing error before the send
155-
// fails.
156-
String errString = String.format(
157-
"Received invalid key exception when attempting sign packet."
158-
+ " Cause: %s", e.getCause().toString());
159-
LOG.error(errString);
160-
} catch (StorageException e) {
161-
// Log storage exception to track signing error before the call fails.
162-
String errString = String.format(
163-
"Received storage exception when attempting to sign packet."
164-
+ " Cause: %s", e.getCause().toString());
165-
LOG.error(errString);
166-
}
16794
}
16895
}
16996
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME;
4444
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_LOCAL_SAS_KEY_MODE;
45+
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
4546

4647
/**
4748
* Helper class to create WASB file systems backed by either a mock in-memory
@@ -335,6 +336,11 @@ public static AzureBlobStorageTestAccount createForEmulator()
335336

336337
public static AzureBlobStorageTestAccount createOutOfBandStore(
337338
int uploadBlockSize, int downloadBlockSize) throws Exception {
339+
return createOutOfBandStore(uploadBlockSize, downloadBlockSize, false);
340+
}
341+
342+
public static AzureBlobStorageTestAccount createOutOfBandStore(
343+
int uploadBlockSize, int downloadBlockSize, boolean enableSecureMode) throws Exception {
338344

339345
saveMetricsConfigFile();
340346

@@ -359,6 +365,7 @@ public static AzureBlobStorageTestAccount createOutOfBandStore(
359365
// out-of-band appends.
360366
conf.setBoolean(KEY_DISABLE_THROTTLING, true);
361367
conf.setBoolean(KEY_READ_TOLERATE_CONCURRENT_APPEND, true);
368+
conf.setBoolean(KEY_USE_SECURE_MODE, enableSecureMode);
362369
configureSecureModeTestSettings(conf);
363370

364371
// Set account URI and initialize Azure file system.

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public class TestAzureConcurrentOutOfBandIo {
4040
static final int BLOB_SIZE = 32 * 1024 * 1024;
4141

4242
// Number of blocks to be written before flush.
43-
private static final int NUMBER_OF_BLOCKS = 2;
43+
static final int NUMBER_OF_BLOCKS = 2;
4444

45-
private AzureBlobStorageTestAccount testAccount;
45+
protected AzureBlobStorageTestAccount testAccount;
4646

4747
// Overridden TestCase methods.
4848
@Before
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azure;
20+
21+
import org.apache.hadoop.fs.permission.FsPermission;
22+
import org.apache.hadoop.fs.permission.PermissionStatus;
23+
import org.junit.After;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
27+
import java.io.DataInputStream;
28+
import java.io.IOException;
29+
import java.io.OutputStream;
30+
import java.util.Arrays;
31+
32+
import static org.junit.Assert.assertEquals;
33+
import static org.junit.Assert.fail;
34+
import static org.junit.Assume.assumeNotNull;
35+
36+
/**
37+
* Extends TestAzureConcurrentOutOfBandIo in order to run testReadOOBWrites with secure mode
38+
* (fs.azure.secure.mode) both enabled and disabled.
39+
*/
40+
public class TestAzureConcurrentOutOfBandIoWithSecureMode extends TestAzureConcurrentOutOfBandIo {
41+
42+
// Overridden TestCase methods.
43+
@Before
44+
@Override
45+
public void setUp() throws Exception {
46+
testAccount = AzureBlobStorageTestAccount.createOutOfBandStore(
47+
UPLOAD_BLOCK_SIZE, DOWNLOAD_BLOCK_SIZE, true);
48+
assumeNotNull(testAccount);
49+
}
50+
}

0 commit comments

Comments
 (0)