Skip to content

Commit 140f047

Browse files
authored
HADOOP-19474: [ABFS][FnsOverBlob] Listing Optimizations to avoid multiple iteration over list response. (#7421)
Contributed by Anuj Modi Reviewed by: Anmol Asrani, Manish Bhatt, Manika Joshi Signed off by: Anuj Modi<anujmodi@apache.org>
1 parent 3b8942f commit 140f047

22 files changed

+875
-417
lines changed

hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
4949
<suppress checks="ParameterNumber"
5050
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/>
51+
<suppress checks="ParameterNumber|MagicNumber"
52+
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]VersionedFileStatus.java"/>
5153
<suppress checks="ParameterNumber"
5254
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]contracts[\\/]services[\\/]AppendRequestParameters.java"/>
5355
<suppress checks="ParameterNumber|MagicNumber"

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 16 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.hadoop.fs.azurebfs;
1919

2020
import java.io.Closeable;
21-
import java.io.File;
2221
import java.io.IOException;
2322
import java.io.OutputStream;
2423
import java.io.UnsupportedEncodingException;
@@ -59,7 +58,6 @@
5958
import org.apache.hadoop.classification.InterfaceStability;
6059
import org.apache.hadoop.classification.VisibleForTesting;
6160
import org.apache.hadoop.conf.Configuration;
62-
import org.apache.hadoop.fs.EtagSource;
6361
import org.apache.hadoop.fs.FileStatus;
6462
import org.apache.hadoop.fs.FileSystem;
6563
import org.apache.hadoop.fs.Path;
@@ -78,8 +76,7 @@
7876
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
7977
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
8078
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
81-
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
82-
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
79+
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
8380
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
8481
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
8582
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
@@ -115,6 +112,7 @@
115112
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
116113
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
117114
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
115+
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
118116
import org.apache.hadoop.fs.azurebfs.utils.Base64;
119117
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
120118
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
@@ -1266,7 +1264,7 @@ public String listStatus(final Path path, final String startFrom,
12661264
if (startFrom != null && !startFrom.isEmpty()) {
12671265
/*
12681266
* Blob Endpoint Does not support startFrom yet. Fallback to DFS Client.
1269-
* startFrom remains null for all HDFS APIs. This is only for internal use.
1267+
* startFrom remains null for all HDFS APIs. This is used only for tests.
12701268
*/
12711269
listingClient = getClient(AbfsServiceType.DFS);
12721270
continuation = getIsNamespaceEnabled(tracingContext)
@@ -1277,58 +1275,16 @@ public String listStatus(final Path path, final String startFrom,
12771275

12781276
do {
12791277
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
1280-
AbfsRestOperation op = listingClient.listPath(relativePath, false,
1281-
abfsConfiguration.getListMaxResults(), continuation,
1282-
tracingContext);
1278+
ListResponseData listResponseData = listingClient.listPath(relativePath,
1279+
false, abfsConfiguration.getListMaxResults(), continuation,
1280+
tracingContext, this.uri);
1281+
AbfsRestOperation op = listResponseData.getOp();
12831282
perfInfo.registerResult(op.getResult());
1284-
continuation = listingClient.getContinuationFromResponse(op.getResult());
1285-
ListResultSchema retrievedSchema = op.getResult().getListResultSchema();
1286-
if (retrievedSchema == null) {
1287-
throw new AbfsRestOperationException(
1288-
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
1289-
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
1290-
"listStatusAsync path not found",
1291-
null, op.getResult());
1292-
}
1293-
1294-
long blockSize = abfsConfiguration.getAzureBlockSize();
1295-
1296-
for (ListResultEntrySchema entry : retrievedSchema.paths()) {
1297-
final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), true, userName);
1298-
final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), false, primaryUserGroup);
1299-
final String encryptionContext = entry.getXMsEncryptionContext();
1300-
final FsPermission fsPermission = entry.permissions() == null
1301-
? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)
1302-
: AbfsPermission.valueOf(entry.permissions());
1303-
final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions());
1304-
1305-
long lastModifiedMillis = 0;
1306-
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
1307-
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
1308-
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
1309-
lastModifiedMillis = DateTimeUtils.parseLastModifiedTime(
1310-
entry.lastModified());
1311-
}
1312-
1313-
Path entryPath = new Path(File.separator + entry.name());
1314-
entryPath = entryPath.makeQualified(this.uri, entryPath);
1315-
1316-
fileStatuses.add(
1317-
new VersionedFileStatus(
1318-
owner,
1319-
group,
1320-
fsPermission,
1321-
hasAcl,
1322-
contentLength,
1323-
isDirectory,
1324-
1,
1325-
blockSize,
1326-
lastModifiedMillis,
1327-
entryPath,
1328-
entry.eTag(),
1329-
encryptionContext));
1283+
continuation = listResponseData.getContinuationToken();
1284+
List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
1285+
if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
1286+
fileStatuses.addAll(fileStatusListInCurrItr);
13301287
}
1331-
13321288
perfInfo.registerSuccess(true);
13331289
countAggregate++;
13341290
shouldContinue =
@@ -1931,110 +1887,6 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) {
19311887
return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
19321888
}
19331889

1934-
/**
1935-
* A File status with version info extracted from the etag value returned
1936-
* in a LIST or HEAD request.
1937-
* The etag is included in the java serialization.
1938-
*/
1939-
static final class VersionedFileStatus extends FileStatus
1940-
implements EtagSource {
1941-
1942-
/**
1943-
* The superclass is declared serializable; this subclass can also
1944-
* be serialized.
1945-
*/
1946-
private static final long serialVersionUID = -2009013240419749458L;
1947-
1948-
/**
1949-
* The etag of an object.
1950-
* Not-final so that serialization via reflection will preserve the value.
1951-
*/
1952-
private String version;
1953-
1954-
private String encryptionContext;
1955-
1956-
private VersionedFileStatus(
1957-
final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl,
1958-
final long length, final boolean isdir, final int blockReplication,
1959-
final long blocksize, final long modificationTime, final Path path,
1960-
final String version, final String encryptionContext) {
1961-
super(length, isdir, blockReplication, blocksize, modificationTime, 0,
1962-
fsPermission,
1963-
owner,
1964-
group,
1965-
null,
1966-
path,
1967-
hasAcl, false, false);
1968-
1969-
this.version = version;
1970-
this.encryptionContext = encryptionContext;
1971-
}
1972-
1973-
/** Compare if this object is equal to another object.
1974-
* @param obj the object to be compared.
1975-
* @return true if two file status has the same path name; false if not.
1976-
*/
1977-
@Override
1978-
public boolean equals(Object obj) {
1979-
if (!(obj instanceof FileStatus)) {
1980-
return false;
1981-
}
1982-
1983-
FileStatus other = (FileStatus) obj;
1984-
1985-
if (!this.getPath().equals(other.getPath())) {// compare the path
1986-
return false;
1987-
}
1988-
1989-
if (other instanceof VersionedFileStatus) {
1990-
return this.version.equals(((VersionedFileStatus) other).version);
1991-
}
1992-
1993-
return true;
1994-
}
1995-
1996-
/**
1997-
* Returns a hash code value for the object, which is defined as
1998-
* the hash code of the path name.
1999-
*
2000-
* @return a hash code value for the path name and version
2001-
*/
2002-
@Override
2003-
public int hashCode() {
2004-
int hash = getPath().hashCode();
2005-
hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0);
2006-
return hash;
2007-
}
2008-
2009-
/**
2010-
* Returns the version of this FileStatus
2011-
*
2012-
* @return a string value for the FileStatus version
2013-
*/
2014-
public String getVersion() {
2015-
return this.version;
2016-
}
2017-
2018-
@Override
2019-
public String getEtag() {
2020-
return getVersion();
2021-
}
2022-
2023-
public String getEncryptionContext() {
2024-
return encryptionContext;
2025-
}
2026-
2027-
@Override
2028-
public String toString() {
2029-
final StringBuilder sb = new StringBuilder(
2030-
"VersionedFileStatus{");
2031-
sb.append(super.toString());
2032-
sb.append("; version='").append(version).append('\'');
2033-
sb.append('}');
2034-
return sb.toString();
2035-
}
2036-
}
2037-
20381890
/**
20391891
* Permissions class contain provided permission and umask in octalNotation.
20401892
* If the object is created for namespace-disabled account, the permission and
@@ -2176,6 +2028,11 @@ void setNamespaceEnabled(Trilean isNamespaceEnabled){
21762028
this.isNamespaceEnabled = isNamespaceEnabled;
21772029
}
21782030

2031+
@VisibleForTesting
2032+
public URI getUri() {
2033+
return this.uri;
2034+
}
2035+
21792036
private void updateInfiniteLeaseDirs() {
21802037
this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
21812038
abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.http.Header;
3939
import org.apache.http.HttpEntity;
4040
import org.apache.http.HttpResponse;
41-
import org.apache.http.client.methods.CloseableHttpResponse;
4241
import org.apache.http.client.methods.HttpDelete;
4342
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
4443
import org.apache.http.client.methods.HttpGet;
@@ -48,7 +47,6 @@
4847
import org.apache.http.client.methods.HttpPut;
4948
import org.apache.http.client.methods.HttpRequestBase;
5049
import org.apache.http.entity.ByteArrayEntity;
51-
import org.apache.http.util.EntityUtils;
5250

5351
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
5452
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
@@ -194,26 +192,14 @@ String getConnResponseMessage() throws IOException {
194192
public void processResponse(final byte[] buffer,
195193
final int offset,
196194
final int length) throws IOException {
197-
try {
198-
if (!isPayloadRequest) {
199-
prepareRequest();
200-
LOG.debug("Sending request: {}", httpRequestBase);
201-
httpResponse = executeRequest();
202-
LOG.debug("Request sent: {}; response {}", httpRequestBase,
203-
httpResponse);
204-
}
205-
parseResponseHeaderAndBody(buffer, offset, length);
206-
} finally {
207-
if (httpResponse != null) {
208-
try {
209-
EntityUtils.consume(httpResponse.getEntity());
210-
} finally {
211-
if (httpResponse instanceof CloseableHttpResponse) {
212-
((CloseableHttpResponse) httpResponse).close();
213-
}
214-
}
215-
}
195+
if (!isPayloadRequest) {
196+
prepareRequest();
197+
LOG.debug("Sending request: {}", httpRequestBase);
198+
httpResponse = executeRequest();
199+
LOG.debug("Request sent: {}; response {}", httpRequestBase,
200+
httpResponse);
216201
}
202+
parseResponseHeaderAndBody(buffer, offset, length);
217203
}
218204

219205
/**

0 commit comments

Comments
 (0)