Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
Expand Down Expand Up @@ -118,7 +117,6 @@
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.impl.BackReference;
Expand Down Expand Up @@ -1298,13 +1296,8 @@ public String listStatus(final Path path, final String startFrom,
}
} while (shouldContinue);

if (listingClient instanceof AbfsBlobClient) {
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
} else {
fileStatuses.addAll(fileStatusList);
}
fileStatuses.addAll(listingClient.postListProcessing(
relativePath, fileStatusList, tracingContext, uri));

return continuation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.Path;
Expand All @@ -77,6 +78,7 @@
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

import static java.net.HttpURLConnection.HTTP_CONFLICT;
Expand Down Expand Up @@ -348,13 +350,9 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
*/
@Override
public ListResponseData listPath(final String relativePath, final boolean recursive,
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException {
return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true);
}

public ListResponseData listPath(final String relativePath, final boolean recursive,
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired)
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri)
throws AzureBlobFileSystemException {

final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();

AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
Expand Down Expand Up @@ -400,37 +398,46 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
listResponseData.setOp(retryListOp);
}
}
return listResponseData;
}

if (isEmptyListResults(listResponseData) && is404CheckRequired) {
/**
* Post-processing of the list operation on Blob endpoint.
* There are two client handing to be done on list output.
* 1. Empty List returned on server could potentially mean path is a file.
* 2. There can be duplicates returned from the server for explicit non-empty directory.
* @param relativePath relative path to be listed.
* @param fileStatuses list of file statuses returned from the server.
* @param tracingContext tracing context to trace server calls.
* @param uri URI to be used for path conversion.
* @return rectified list of file statuses.
* @throws AzureBlobFileSystemException if any failure occurs.
*/
@Override
public List<FileStatus> postListProcessing(String relativePath, List<FileStatus> fileStatuses,
TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException {
List<FileStatus> rectifiedFileStatuses = new ArrayList<>();
if (fileStatuses.isEmpty() && !ROOT_PATH.equals(relativePath)) {
// If the list operation returns no paths, we need to check if the path is a file.
// If it is a file, we need to return the file in the list.
// If it is a directory or root path, we need to return an empty list.
// If it is a non-existing path, we need to throw a FileNotFoundException.
if (relativePath.equals(ROOT_PATH)) {
// Root Always exists as directory. It can be an empty listing.
return listResponseData;
}
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
LOG.debug("ListBlob attempted on a file path. Returning file status.");
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
LOG.debug("ListStatus attempted on a file path {}. Returning file status.", relativePath);
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
rectifiedFileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
}
AbfsRestOperation listOp = getAbfsRestOperation(
AbfsRestOperationType.ListBlobs,
HTTP_METHOD_GET,
url,
requestHeaders);
listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
listResponseData.setFileStatusList(fileStatusList);
listResponseData.setContinuationToken(null);
listResponseData.setRenamePendingJsonPaths(null);
listResponseData.setOp(listOp);
return listResponseData;
} else {
// Remove duplicates from the non-empty list output only.
rectifiedFileStatuses.addAll(ListUtils.getUniqueListResult(fileStatuses));
LOG.debug(
"ListBlob API returned a total of {} elements including duplicates."
+ "Number of unique Elements are {}", fileStatuses.size(),
rectifiedFileStatuses.size());
}
return listResponseData;
return rectifiedFileStatuses;
}

/**
* Filter the paths for which no rename redo operation is performed.
* Update BlobListResultSchema path with filtered entries.
Expand Down Expand Up @@ -2013,6 +2020,8 @@ private static String decodeMetadataAttribute(String encoded)

/**
* Checks if the listing of the specified path is non-empty.
* Since listing is incomplete as long as continuation token is returned by server,
* we need to iterate until either we get one entry or continuation token becomes null.
*
* @param path The path to be listed.
* @param tracingContext The tracing context for tracking the operation.
Expand All @@ -2024,26 +2033,15 @@ public boolean isNonEmptyDirectory(String path,
TracingContext tracingContext) throws AzureBlobFileSystemException {
// This method is only called internally to determine state of a path
// and hence don't need identity transformation to happen.
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false);
return !isEmptyListResults(listResponseData);
}

/**
* Check if the list call returned empty results without any continuation token.
* @param listResponseData The response of listing API from the server.
* @return True if empty results without continuation token.
*/
private boolean isEmptyListResults(ListResponseData listResponseData) {
AbfsHttpOperation result = listResponseData.getOp().getResult();
boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
result.getListResultSchema() != null && // Parsing of list response was successful
listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned
StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned
if (isEmptyList) {
LOG.debug("List call returned empty results without any continuation token.");
return true;
}
return false;
String continuationToken = null;
List<FileStatus> fileStatusList = new ArrayList<>();
// We need to loop on continuation token until we get an entry or continuation token becomes null.
do {
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null);
fileStatusList.addAll(listResponseData.getFileStatusList());
continuationToken = listResponseData.getContinuationToken();
} while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty());
return !fileStatusList.isEmpty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
Expand Down Expand Up @@ -164,6 +165,8 @@ public abstract class AbfsClient implements Closeable {
private String clientProvidedEncryptionKey = null;
private String clientProvidedEncryptionKeySHA = null;
private final IdentityTransformerInterface identityTransformer;
private final String userName;
private String primaryUserGroup;

private final String accountName;
private final AuthType authType;
Expand Down Expand Up @@ -305,6 +308,22 @@ private AbfsClient(final URL baseUrl,
throw new IOException(e);
}
LOG.trace("IdentityTransformer init complete");

UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
this.userName = userGroupInformation.getShortUserName();
LOG.trace("UGI init complete");
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
try {
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
} catch (IOException ex) {
LOG.error("Failed to get primary group for {}, using user name as primary group name", userName);
this.primaryUserGroup = userName;
}
} else {
//Provide a default group name
this.primaryUserGroup = userName;
}
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
}

public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
Expand Down Expand Up @@ -528,6 +547,18 @@ public abstract AbfsRestOperation setFilesystemProperties(Hashtable<String, Stri
public abstract ListResponseData listPath(String relativePath, boolean recursive,
int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException;

/**
* Post-processing of the list operation.
* @param relativePath which is used to list the blobs.
* @param fileStatuses list of file statuses to be processed.
* @param tracingContext for tracing the server calls.
* @param uri to be used for the path conversion.
* @return list of file statuses to be returned.
* @throws AzureBlobFileSystemException if rest operation fails.
*/
public abstract List<FileStatus> postListProcessing(String relativePath,
List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException;

/**
* Retrieves user-defined metadata on filesystem.
* @param tracingContext for tracing the server calls.
Expand Down Expand Up @@ -1776,37 +1807,6 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy
return successOp;
}

/**
* Get the primary user group name.
* @return primary user group name
* @throws AzureBlobFileSystemException if unable to get the primary user group
*/
private String getPrimaryUserGroup() throws AzureBlobFileSystemException {
if (!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) {
try {
return UserGroupInformation.getCurrentUser().getPrimaryGroupName();
} catch (IOException ex) {
LOG.error("Failed to get primary group for {}, using user name as primary group name",
getPrimaryUser());
}
}
//Provide a default group name
return getPrimaryUser();
}

/**
* Get the primary username.
* @return primary username
* @throws AzureBlobFileSystemException if unable to get the primary user
*/
private String getPrimaryUser() throws AzureBlobFileSystemException {
try {
return UserGroupInformation.getCurrentUser().getUserName();
} catch (IOException ex) {
throw new AbfsDriverException(ex);
}
}

/**
* Creates a VersionedFileStatus object from the ListResultEntrySchema.
* @param entry ListResultEntrySchema object.
Expand All @@ -1820,10 +1820,10 @@ protected VersionedFileStatus getVersionedFileStatusFromEntry(
String owner = null, group = null;
try{
if (identityTransformer != null) {
owner = identityTransformer.transformIdentityForGetRequest(
entry.owner(), true, getPrimaryUser());
group = identityTransformer.transformIdentityForGetRequest(
entry.group(), false, getPrimaryUserGroup());
owner = identityTransformer.transformIdentityForGetRequest(entry.owner(),
true, userName);
group = identityTransformer.transformIdentityForGetRequest(entry.group(),
false, primaryUserGroup);
}
} catch (IOException ex) {
LOG.error("Failed to get owner/group for path {}", entry.name(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
Expand Down Expand Up @@ -348,6 +349,21 @@ public ListResponseData listPath(final String relativePath,
return listResponseData;
}

/**
* Non-functional implementation.
* Client side handling to remove duplicates not needed in DFSClient.
* @param relativePath on which listing was attempted.
* @param fileStatuses result of listing operation.
* @param tracingContext for tracing the server calls.
* @param uri to be used for path conversion.
* @return fileStatuses as it is without any processing.
*/
@Override
public List<FileStatus> postListProcessing(String relativePath,
List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri){
return fileStatuses;
}

/**
* Get Rest Operation for API
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ public void testSetFileSystemProperties() throws Exception {
//Test list and delete operation on implicit paths
public void testListAndDeleteImplicitPaths() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());
assumeBlobServiceType();
AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());

Path file1 = new Path("/testDir/dir1/file1");
Path file2 = new Path("/testDir/dir1/file2");
Expand All @@ -458,7 +458,7 @@ public void testListAndDeleteImplicitPaths() throws Exception {

AbfsRestOperation op = client.listPath(
implicitDir.toString(), false, 2, null,
getTestTracingContext(getFileSystem(), false), null, false).getOp();
getTestTracingContext(getFileSystem(), false), null).getOp();
List<? extends ListResultEntrySchema> list = op.getResult()
.getListResultSchema()
.paths();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
doCallRealMethod().when((AbfsBlobClient) mockClient)
.listPath(Mockito.nullable(String.class), Mockito.anyBoolean(),
Mockito.anyInt(), Mockito.nullable(String.class),
Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class), Mockito.anyBoolean());
Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class));
doCallRealMethod().when((AbfsBlobClient) mockClient)
.getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class),
Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void testListStatusIsCalledForImplicitPathOnBlobEndpoint() throws Excepti
fs.getFileStatus(implicitPath);

Mockito.verify(abfsClient, Mockito.times(1)).getPathStatus(any(), eq(false), any(), any());
Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any(), eq(false));
Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any());
}

/**
Expand Down
Loading