Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;

import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
Expand All @@ -120,6 +121,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -324,6 +326,12 @@ public FSDataOutputStream create(final Path f,

statIncrement(CALL_CREATE);
trailingPeriodCheck(f);
if (f.isRoot()) {
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
ERR_CREATE_ON_ROOT,
null);
}

Path qualifiedPath = makeQualified(f);

Expand All @@ -348,6 +356,12 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
final Progressable progress) throws IOException {

statIncrement(CALL_CREATE_NON_RECURSIVE);
if (f.isRoot()) {
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
ERR_CREATE_ON_ROOT,
null);
}
final Path parent = f.getParent();
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
Expand Down Expand Up @@ -965,15 +979,25 @@ public void setXAttr(final Path path,
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.SET_ATTR, true, tracingHeaderFormat,
listener);
Hashtable<String, String> properties = abfsStore
.getPathStatus(qualifiedPath, tracingContext);
Hashtable<String, String> properties;
String xAttrName = ensureValidAttributeName(name);

if (path.isRoot()) {
properties = abfsStore.getFilesystemProperties(tracingContext);
} else {
properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
}

boolean xAttrExists = properties.containsKey(xAttrName);
XAttrSetFlag.validate(name, xAttrExists, flag);

String xAttrValue = abfsStore.decodeAttribute(value);
properties.put(xAttrName, xAttrValue);
abfsStore.setPathProperties(qualifiedPath, properties, tracingContext);
if (path.isRoot()) {
abfsStore.setFilesystemProperties(properties, tracingContext);
} else {
abfsStore.setPathProperties(qualifiedPath, properties, tracingContext);
}
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
Expand Down Expand Up @@ -1005,9 +1029,15 @@ public byte[] getXAttr(final Path path, final String name)
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.GET_ATTR, true, tracingHeaderFormat,
listener);
Hashtable<String, String> properties = abfsStore
.getPathStatus(qualifiedPath, tracingContext);
Hashtable<String, String> properties;
String xAttrName = ensureValidAttributeName(name);

if (path.isRoot()) {
properties = abfsStore.getFilesystemProperties(tracingContext);
} else {
properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
}

if (properties.containsKey(xAttrName)) {
String xAttrValue = properties.get(xAttrName);
value = abfsStore.encodeAttribute(xAttrValue);
Expand Down Expand Up @@ -1488,7 +1518,7 @@ public static void checkException(final Path path,
case HttpURLConnection.HTTP_NOT_FOUND:
throw (IOException) new FileNotFoundException(message)
.initCause(exception);
case HttpURLConnection.HTTP_CONFLICT:
case HTTP_CONFLICT:
throw (IOException) new FileAlreadyExistsException(message)
.initCause(exception);
case HttpURLConnection.HTTP_FORBIDDEN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public void setPathProperties(final Path path,
final Hashtable<String, String> properties, TracingContext tracingContext)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){
LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}",
LOG.debug("setPathProperties for filesystem: {} path: {} with properties: {}",
client.getFileSystem(),
path,
properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public final class AbfsErrors {
+ "operation";
public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
+ "configured, set " + FS_AZURE_LEASE_THREADS;

public static final String ERR_CREATE_ON_ROOT = "Cannot create file over root path";
private AbfsErrors() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ String getSasToken() {
/**
* Execute a AbfsRestOperation. Track the Duration of a request if
* abfsCounters isn't null.
* @param tracingContext TracingContext instance to track correlation IDs
* @param tracingContext TracingContext instance to track correlation IDs.
*/
public void execute(TracingContext tracingContext)
throws AzureBlobFileSystemException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.EnumSet;

import org.junit.Assume;
import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.Path;
Expand All @@ -46,45 +46,24 @@ public ITestAzureBlobFileSystemAttributes() throws Exception {
public void testSetGetXAttr() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
Assume.assumeTrue(getIsNamespaceEnabled(fs));

byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("hi");
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("你好");
String attributeName1 = "user.asciiAttribute";
String attributeName2 = "user.unicodeAttribute";
Path testFile = path("setGetXAttr");

// after creating a file, the xAttr should not be present
touch(testFile);
assertNull(fs.getXAttr(testFile, attributeName1));

// after setting the xAttr on the file, the value should be retrievable
fs.registerListener(
new TracingHeaderValidator(conf.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.SET_ATTR, true, 0));
fs.setXAttr(testFile, attributeName1, attributeValue1);
fs.setListenerOperation(FSOperationType.GET_ATTR);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
fs.registerListener(null);

// after setting a second xAttr on the file, the first xAttr values should not be overwritten
fs.setXAttr(testFile, attributeName2, attributeValue2);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
final Path testPath = path("setGetXAttr");
fs.create(testPath);
testGetSetXAttrHelper(fs, testPath);
}

@Test
public void testSetGetXAttrCreateReplace() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(getIsNamespaceEnabled(fs));
byte[] attributeValue = fs.getAbfsStore().encodeAttribute("one");
String attributeName = "user.someAttribute";
Path testFile = path("createReplaceXAttr");

// after creating a file, it must be possible to create a new xAttr
touch(testFile);
fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
Assertions.assertThat(fs.getXAttr(testFile, attributeName))
.describedAs("Retrieved Attribute Value is Not as Expected")
.containsExactly(attributeValue);

// however after the xAttr is created, creating it again must fail
intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG));
Expand All @@ -93,7 +72,6 @@ public void testSetGetXAttrCreateReplace() throws Exception {
@Test
public void testSetGetXAttrReplace() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(getIsNamespaceEnabled(fs));
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("one");
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("two");
String attributeName = "user.someAttribute";
Expand All @@ -108,6 +86,72 @@ public void testSetGetXAttrReplace() throws Exception {
// however after the xAttr is created, replacing it must succeed
fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
Assertions.assertThat(fs.getXAttr(testFile, attributeName))
.describedAs("Retrieved Attribute Value is Not as Expected")
.containsExactly(attributeValue2);
}

@Test
public void testGetSetXAttrOnRoot() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
final Path testPath = new Path("/");
testGetSetXAttrHelper(fs, testPath);
}

private void testGetSetXAttrHelper(final AzureBlobFileSystem fs,
final Path testPath) throws Exception {

String attributeName1 = "user.attribute1";
String attributeName2 = "user.attribute2";
String decodedAttributeValue1 = "hi";
String decodedAttributeValue2 = "hello";
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute(decodedAttributeValue1);
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute(decodedAttributeValue2);

// Attribute not present initially
Assertions.assertThat(fs.getXAttr(testPath, attributeName1))
.describedAs("Cannot get attribute before setting it")
.isNull();
Assertions.assertThat(fs.getXAttr(testPath, attributeName2))
.describedAs("Cannot get attribute before setting it")
.isNull();

// Set the Attributes
fs.registerListener(
new TracingHeaderValidator(fs.getAbfsStore().getAbfsConfiguration()
.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.SET_ATTR, true, 0));
fs.setXAttr(testPath, attributeName1, attributeValue1);

// Check if the attribute is retrievable
fs.setListenerOperation(FSOperationType.GET_ATTR);
byte[] rv = fs.getXAttr(testPath, attributeName1);
Assertions.assertThat(rv)
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
.containsExactly(attributeValue1);
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
.isEqualTo(decodedAttributeValue1);
fs.registerListener(null);

// Set the second Attribute
fs.setXAttr(testPath, attributeName2, attributeValue2);

// Check all the attributes present and previous Attribute not overridden
rv = fs.getXAttr(testPath, attributeName1);
Assertions.assertThat(rv)
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
.containsExactly(attributeValue1);
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
.isEqualTo(decodedAttributeValue1);

rv = fs.getXAttr(testPath, attributeName2);
Assertions.assertThat(rv)
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
.containsExactly(attributeValue2);
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
.isEqualTo(decodedAttributeValue2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.EnumSet;
import java.util.UUID;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -146,6 +148,26 @@ public void testCreateNonRecursive2() throws Exception {
assertIsFile(fs, testFile);
}

@Test
public void testCreateOnRoot() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testFile = path(AbfsHttpConstants.ROOT_PATH);
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, () ->
fs.create(testFile, true));
if (ex.getStatusCode() != HTTP_CONFLICT) {
// Request should fail with 409.
throw ex;
}

ex = intercept(AbfsRestOperationException.class, () ->
fs.createNonRecursive(testFile, FsPermission.getDefault(),
false, 1024, (short) 1, 1024, null));
if (ex.getStatusCode() != HTTP_CONFLICT) {
// Request should fail with 409.
throw ex;
}
}

/**
* Attempts to use to the ABFS stream after it is closed.
*/
Expand Down Expand Up @@ -190,7 +212,8 @@ public void testTryWithResources() throws Throwable {
// the exception raised in close() must be in the caught exception's
// suppressed list
Throwable[] suppressed = fnfe.getSuppressed();
assertEquals("suppressed count", 1, suppressed.length);
Assertions.assertThat(suppressed.length)
.describedAs("suppressed count should be 1").isEqualTo(1);
Throwable inner = suppressed[0];
if (!(inner instanceof IOException)) {
throw inner;
Expand Down
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-azure/src/test/resources/abfs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>false</value>
<value>true</value>
</property>

<property>
Expand Down