Skip to content

Commit 3612317

Browse files
authored
HADOOP-16818. ABFS: Combine append+flush calls for blockblob & appendblob
Contributed by Ishani Ahuja.
1 parent f02d5ab commit 3612317

File tree

11 files changed

+569
-33
lines changed

11 files changed

+569
-33
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ public class AbfsConfiguration{
143143
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
144144
private String azureAtomicDirs;
145145

146+
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
147+
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
148+
private String azureAppendBlobDirs;
149+
146150
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
147151
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
148152
private boolean createRemoteFileSystemDuringInitialization;
@@ -163,6 +167,10 @@ public class AbfsConfiguration{
163167
DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH)
164168
private boolean disableOutputStreamFlush;
165169

170+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_APPEND_WITH_FLUSH,
171+
DefaultValue = DEFAULT_ENABLE_APPEND_WITH_FLUSH)
172+
private boolean enableAppendWithFlush;
173+
166174
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
167175
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
168176
private boolean enableAutoThrottling;
@@ -449,6 +457,10 @@ public String getAzureAtomicRenameDirs() {
449457
return this.azureAtomicDirs;
450458
}
451459

460+
public String getAppendBlobDirs() {
461+
return this.azureAppendBlobDirs;
462+
}
463+
452464
public boolean getCreateRemoteFileSystemDuringInitialization() {
453465
// we do not support creating the filesystem when AuthType is SAS
454466
return this.createRemoteFileSystemDuringInitialization
@@ -471,6 +483,10 @@ public boolean isOutputStreamFlushDisabled() {
471483
return this.disableOutputStreamFlush;
472484
}
473485

486+
public boolean isAppendWithFlushEnabled() {
487+
return this.enableAppendWithFlush;
488+
}
489+
474490
public boolean isAutoThrottlingEnabled() {
475491
return this.enableAutoThrottling;
476492
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ public class AzureBlobFileSystemStore implements Closeable {
137137
private final IdentityTransformer identityTransformer;
138138
private final AbfsPerfTracker abfsPerfTracker;
139139

140+
/**
141+
* The set of directories where we should store files as append blobs.
142+
*/
143+
private Set<String> appendBlobDirSet;
144+
140145
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
141146
throws IOException {
142147
this.uri = uri;
@@ -177,6 +182,22 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
177182
initializeClient(uri, fileSystemName, accountName, useHttps);
178183
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
179184
LOG.trace("IdentityTransformer init complete");
185+
// Extract the directories that should contain append blobs
186+
String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
187+
if (appendBlobDirs.trim().isEmpty()) {
188+
this.appendBlobDirSet = new HashSet<String>();
189+
} else {
190+
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
191+
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
192+
}
193+
}
194+
195+
/**
196+
* Checks if the given key in Azure Storage should be stored as a page
197+
* blob instead of block blob.
198+
*/
199+
public boolean isAppendBlobKey(String key) {
200+
return isKeyForDirectorySet(key, appendBlobDirSet);
180201
}
181202

182203
/**
@@ -403,18 +424,25 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
403424
umask.toString(),
404425
isNamespaceEnabled);
405426

406-
final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
407-
isNamespaceEnabled ? getOctalNotation(permission) : null,
408-
isNamespaceEnabled ? getOctalNotation(umask) : null);
409-
perfInfo.registerResult(op.getResult()).registerSuccess(true);
427+
boolean appendBlob = false;
428+
if (isAppendBlobKey(path.toString())) {
429+
appendBlob = true;
430+
}
431+
432+
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
433+
isNamespaceEnabled ? getOctalNotation(permission) : null,
434+
isNamespaceEnabled ? getOctalNotation(umask) : null,
435+
appendBlob);
410436

411437
return new AbfsOutputStream(
412-
client,
413-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
414-
0,
415-
abfsConfiguration.getWriteBufferSize(),
416-
abfsConfiguration.isFlushEnabled(),
417-
abfsConfiguration.isOutputStreamFlushDisabled());
438+
client,
439+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
440+
0,
441+
abfsConfiguration.getWriteBufferSize(),
442+
abfsConfiguration.isFlushEnabled(),
443+
abfsConfiguration.isOutputStreamFlushDisabled(),
444+
abfsConfiguration.isAppendWithFlushEnabled(),
445+
appendBlob);
418446
}
419447
}
420448

@@ -430,8 +458,8 @@ public void createDirectory(final Path path, final FsPermission permission, fina
430458
isNamespaceEnabled);
431459

432460
final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
433-
isNamespaceEnabled ? getOctalNotation(permission) : null,
434-
isNamespaceEnabled ? getOctalNotation(umask) : null);
461+
isNamespaceEnabled ? getOctalNotation(permission) : null,
462+
isNamespaceEnabled ? getOctalNotation(umask) : null, false);
435463
perfInfo.registerResult(op.getResult()).registerSuccess(true);
436464
}
437465
}
@@ -494,13 +522,20 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
494522

495523
perfInfo.registerSuccess(true);
496524

525+
boolean appendBlob = false;
526+
if (isAppendBlobKey(path.toString())) {
527+
appendBlob = true;
528+
}
529+
497530
return new AbfsOutputStream(
498-
client,
499-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
500-
offset,
501-
abfsConfiguration.getWriteBufferSize(),
502-
abfsConfiguration.isFlushEnabled(),
503-
abfsConfiguration.isOutputStreamFlushDisabled());
531+
client,
532+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
533+
offset,
534+
abfsConfiguration.getWriteBufferSize(),
535+
abfsConfiguration.isFlushEnabled(),
536+
abfsConfiguration.isOutputStreamFlushDisabled(),
537+
abfsConfiguration.isAppendWithFlushEnabled(),
538+
appendBlob);
504539
}
505540
}
506541

@@ -1386,4 +1421,4 @@ public String toString() {
13861421
AbfsClient getClient() {
13871422
return this.client;
13881423
}
1389-
}
1424+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class AbfsHttpConstants {
4040
public static final String CHECK_ACCESS = "checkAccess";
4141
public static final String GET_STATUS = "getStatus";
4242
public static final String DEFAULT_TIMEOUT = "90";
43+
public static final String APPEND_BLOB_TYPE = "appendblob";
4344
public static final String TOKEN_VERSION = "2";
4445

4546
public static final String JAVA_VERSION = "java.version";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public final class ConfigurationKeys {
5151
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
5252
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
5353
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
54+
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.key";
5455
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
5556
/** Provides a config control to enable or disable ABFS Flush operations -
5657
* HFlush and HSync. Default is true. **/
@@ -61,6 +62,10 @@ public final class ConfigurationKeys {
6162
* documentation does not have such expectations of data being persisted.
6263
* Default value of this config is true. **/
6364
public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
65+
/** Provides a config control to enable OutputStream AppendWithFlush API
66+
* operations in AbfsOutputStream.
67+
* Default value of this config is true. **/
68+
public static final String FS_AZURE_ENABLE_APPEND_WITH_FLUSH = "fs.azure.enable.appendwithflush";
6469
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
6570
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
6671
/** Provides a config to enable/disable the checkAccess API.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ public final class FileSystemConfigurations {
5555
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
5656

5757
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
58+
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
5859

5960
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
6061
public static final boolean DEFAULT_ENABLE_FLUSH = true;
6162
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
63+
public static final boolean DEFAULT_ENABLE_APPEND_WITH_FLUSH = true;
6264
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
6365

6466
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ public final class HttpQueryParams {
3838
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
3939
public static final String QUERY_PARAM_CLOSE = "close";
4040
public static final String QUERY_PARAM_UPN = "upn";
41+
public static final String QUERY_PARAM_FLUSH = "flush";
42+
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
4143

4244
private HttpQueryParams() {}
4345
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
119119
this.sasTokenProvider = sasTokenProvider;
120120
}
121121

122-
@Override
123122
public void close() throws IOException {
124123
if (tokenProvider instanceof Closeable) {
125124
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
@@ -261,7 +260,8 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException
261260
}
262261

263262
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
264-
final String permission, final String umask) throws AzureBlobFileSystemException {
263+
final String permission, final String umask,
264+
final boolean appendBlob) throws AzureBlobFileSystemException {
265265
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
266266
if (!overwrite) {
267267
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -277,6 +277,9 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
277277

278278
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
279279
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
280+
if (appendBlob) {
281+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
282+
}
280283

281284
String operation = isFile
282285
? SASTokenProvider.CREATEFILE_OPERATION
@@ -325,7 +328,8 @@ public AbfsRestOperation renamePath(String source, final String destination, fin
325328
}
326329

327330
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
328-
final int length) throws AzureBlobFileSystemException {
331+
final int length, boolean flush, boolean isClose)
332+
throws AzureBlobFileSystemException {
329333
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
330334
// JDK7 does not support PATCH, so to workaround the issue we will use
331335
// PUT and specify the real method in the X-Http-Method-Override header.
@@ -335,6 +339,8 @@ public AbfsRestOperation append(final String path, final long position, final by
335339
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
336340
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
337341
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
342+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, String.valueOf(flush));
343+
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
338344
appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
339345

340346
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());

0 commit comments

Comments
 (0)