Skip to content

Commit 745a6c1

Browse files
committed
Revert "HADOOP-16818. ABFS: Combine append+flush calls for blockblob & appendblob"
This reverts commit 3612317. Change-Id: Ie0d36f25de0b55a937894f4d9963c495bae0576a
1 parent 679631b commit 745a6c1

File tree

11 files changed

+33
-569
lines changed

11 files changed

+33
-569
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,6 @@ public class AbfsConfiguration{
143143
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
144144
private String azureAtomicDirs;
145145

146-
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
147-
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
148-
private String azureAppendBlobDirs;
149-
150146
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
151147
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
152148
private boolean createRemoteFileSystemDuringInitialization;
@@ -167,10 +163,6 @@ public class AbfsConfiguration{
167163
DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH)
168164
private boolean disableOutputStreamFlush;
169165

170-
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_APPEND_WITH_FLUSH,
171-
DefaultValue = DEFAULT_ENABLE_APPEND_WITH_FLUSH)
172-
private boolean enableAppendWithFlush;
173-
174166
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
175167
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
176168
private boolean enableAutoThrottling;
@@ -457,10 +449,6 @@ public String getAzureAtomicRenameDirs() {
457449
return this.azureAtomicDirs;
458450
}
459451

460-
public String getAppendBlobDirs() {
461-
return this.azureAppendBlobDirs;
462-
}
463-
464452
public boolean getCreateRemoteFileSystemDuringInitialization() {
465453
// we do not support creating the filesystem when AuthType is SAS
466454
return this.createRemoteFileSystemDuringInitialization
@@ -483,10 +471,6 @@ public boolean isOutputStreamFlushDisabled() {
483471
return this.disableOutputStreamFlush;
484472
}
485473

486-
public boolean isAppendWithFlushEnabled() {
487-
return this.enableAppendWithFlush;
488-
}
489-
490474
public boolean isAutoThrottlingEnabled() {
491475
return this.enableAutoThrottling;
492476
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 19 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,6 @@ public class AzureBlobFileSystemStore implements Closeable {
137137
private final IdentityTransformer identityTransformer;
138138
private final AbfsPerfTracker abfsPerfTracker;
139139

140-
/**
141-
* The set of directories where we should store files as append blobs.
142-
*/
143-
private Set<String> appendBlobDirSet;
144-
145140
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
146141
throws IOException {
147142
this.uri = uri;
@@ -182,22 +177,6 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
182177
initializeClient(uri, fileSystemName, accountName, useHttps);
183178
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
184179
LOG.trace("IdentityTransformer init complete");
185-
// Extract the directories that should contain append blobs
186-
String appendBlobDirs = abfsConfiguration.getAppendBlobDirs();
187-
if (appendBlobDirs.trim().isEmpty()) {
188-
this.appendBlobDirSet = new HashSet<String>();
189-
} else {
190-
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
191-
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
192-
}
193-
}
194-
195-
/**
196-
* Checks if the given key in Azure Storage should be stored as a page
197-
* blob instead of block blob.
198-
*/
199-
public boolean isAppendBlobKey(String key) {
200-
return isKeyForDirectorySet(key, appendBlobDirSet);
201180
}
202181

203182
/**
@@ -424,25 +403,18 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
424403
umask.toString(),
425404
isNamespaceEnabled);
426405

427-
boolean appendBlob = false;
428-
if (isAppendBlobKey(path.toString())) {
429-
appendBlob = true;
430-
}
431-
432-
client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
433-
isNamespaceEnabled ? getOctalNotation(permission) : null,
434-
isNamespaceEnabled ? getOctalNotation(umask) : null,
435-
appendBlob);
406+
final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite,
407+
isNamespaceEnabled ? getOctalNotation(permission) : null,
408+
isNamespaceEnabled ? getOctalNotation(umask) : null);
409+
perfInfo.registerResult(op.getResult()).registerSuccess(true);
436410

437411
return new AbfsOutputStream(
438-
client,
439-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
440-
0,
441-
abfsConfiguration.getWriteBufferSize(),
442-
abfsConfiguration.isFlushEnabled(),
443-
abfsConfiguration.isOutputStreamFlushDisabled(),
444-
abfsConfiguration.isAppendWithFlushEnabled(),
445-
appendBlob);
412+
client,
413+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
414+
0,
415+
abfsConfiguration.getWriteBufferSize(),
416+
abfsConfiguration.isFlushEnabled(),
417+
abfsConfiguration.isOutputStreamFlushDisabled());
446418
}
447419
}
448420

@@ -458,8 +430,8 @@ public void createDirectory(final Path path, final FsPermission permission, fina
458430
isNamespaceEnabled);
459431

460432
final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true,
461-
isNamespaceEnabled ? getOctalNotation(permission) : null,
462-
isNamespaceEnabled ? getOctalNotation(umask) : null, false);
433+
isNamespaceEnabled ? getOctalNotation(permission) : null,
434+
isNamespaceEnabled ? getOctalNotation(umask) : null);
463435
perfInfo.registerResult(op.getResult()).registerSuccess(true);
464436
}
465437
}
@@ -522,20 +494,13 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
522494

523495
perfInfo.registerSuccess(true);
524496

525-
boolean appendBlob = false;
526-
if (isAppendBlobKey(path.toString())) {
527-
appendBlob = true;
528-
}
529-
530497
return new AbfsOutputStream(
531-
client,
532-
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
533-
offset,
534-
abfsConfiguration.getWriteBufferSize(),
535-
abfsConfiguration.isFlushEnabled(),
536-
abfsConfiguration.isOutputStreamFlushDisabled(),
537-
abfsConfiguration.isAppendWithFlushEnabled(),
538-
appendBlob);
498+
client,
499+
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
500+
offset,
501+
abfsConfiguration.getWriteBufferSize(),
502+
abfsConfiguration.isFlushEnabled(),
503+
abfsConfiguration.isOutputStreamFlushDisabled());
539504
}
540505
}
541506

@@ -1421,4 +1386,4 @@ public String toString() {
14211386
AbfsClient getClient() {
14221387
return this.client;
14231388
}
1424-
}
1389+
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public final class AbfsHttpConstants {
4040
public static final String CHECK_ACCESS = "checkAccess";
4141
public static final String GET_STATUS = "getStatus";
4242
public static final String DEFAULT_TIMEOUT = "90";
43-
public static final String APPEND_BLOB_TYPE = "appendblob";
4443
public static final String TOKEN_VERSION = "2";
4544

4645
public static final String JAVA_VERSION = "java.version";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public final class ConfigurationKeys {
5151
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
5252
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
5353
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
54-
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.key";
5554
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
5655
/** Provides a config control to enable or disable ABFS Flush operations -
5756
* HFlush and HSync. Default is true. **/
@@ -62,10 +61,6 @@ public final class ConfigurationKeys {
6261
* documentation does not have such expectations of data being persisted.
6362
* Default value of this config is true. **/
6463
public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush";
65-
/** Provides a config control to enable OutputStream AppendWithFlush API
66-
* operations in AbfsOutputStream.
67-
* Default value of this config is true. **/
68-
public static final String FS_AZURE_ENABLE_APPEND_WITH_FLUSH = "fs.azure.enable.appendwithflush";
6964
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
7065
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
7166
/** Provides a config to enable/disable the checkAccess API.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,10 @@ public final class FileSystemConfigurations {
5555
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
5656

5757
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
58-
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
5958

6059
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
6160
public static final boolean DEFAULT_ENABLE_FLUSH = true;
6261
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
63-
public static final boolean DEFAULT_ENABLE_APPEND_WITH_FLUSH = true;
6462
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
6563

6664
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ public final class HttpQueryParams {
3838
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
3939
public static final String QUERY_PARAM_CLOSE = "close";
4040
public static final String QUERY_PARAM_UPN = "upn";
41-
public static final String QUERY_PARAM_FLUSH = "flush";
42-
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
4341

4442
private HttpQueryParams() {}
4543
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
119119
this.sasTokenProvider = sasTokenProvider;
120120
}
121121

122+
@Override
122123
public void close() throws IOException {
123124
if (tokenProvider instanceof Closeable) {
124125
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
@@ -260,8 +261,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException
260261
}
261262

262263
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
263-
final String permission, final String umask,
264-
final boolean appendBlob) throws AzureBlobFileSystemException {
264+
final String permission, final String umask) throws AzureBlobFileSystemException {
265265
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
266266
if (!overwrite) {
267267
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@@ -277,9 +277,6 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
277277

278278
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
279279
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
280-
if (appendBlob) {
281-
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
282-
}
283280

284281
String operation = isFile
285282
? SASTokenProvider.CREATEFILE_OPERATION
@@ -328,8 +325,7 @@ public AbfsRestOperation renamePath(String source, final String destination, fin
328325
}
329326

330327
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
331-
final int length, boolean flush, boolean isClose)
332-
throws AzureBlobFileSystemException {
328+
final int length) throws AzureBlobFileSystemException {
333329
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
334330
// JDK7 does not support PATCH, so to workaround the issue we will use
335331
// PUT and specify the real method in the X-Http-Method-Override header.
@@ -339,8 +335,6 @@ public AbfsRestOperation append(final String path, final long position, final by
339335
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
340336
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
341337
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
342-
abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, String.valueOf(flush));
343-
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
344338
appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
345339

346340
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());

0 commit comments

Comments
 (0)