Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Seunghyun Lee committed Mar 17, 2020
1 parent 67feb10 commit a9ec33c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

/**
* Azure Data Lake Storage Gen2 implementation for the PinotFS interface.
*
* TODO: add the unit test
*/
public class AzureGen2PinotFS extends PinotFS {
private static final Logger LOGGER = LoggerFactory.getLogger(AzureGen2PinotFS.class);
Expand Down Expand Up @@ -115,7 +117,7 @@ public void init(Configuration config) {
_blobServiceClient =
new BlobServiceClientBuilder().credential(sharedKeyCredential).endpoint(blobServiceEndpointUrl).buildClient();
_fileSystemClient = serviceClient.getFileSystemClient(fileSystemName);
LOGGER.error("AzureGen2PinotFS is initialized (accountName={}, fileSystemName={}, dfsServiceEndpointUrl={}, "
LOGGER.info("AzureGen2PinotFS is initialized (accountName={}, fileSystemName={}, dfsServiceEndpointUrl={}, "
+ "blobServiceEndpointUrl={}, enableChecksum={})", accountName, fileSystemName, dfsServiceEndpointUrl,
blobServiceEndpointUrl, _enableChecksum);
}
Expand All @@ -135,7 +137,7 @@ public boolean mkdir(URI uri) throws IOException {
if (e.getStatusCode() == ALREADY_EXISTS_STATUS_CODE && e.getErrorCode().equals(PATH_ALREADY_EXISTS_ERROR_CODE)) {
return true;
}
LOGGER.error("Exception thrown while calling mkdir (uri = {})", uri, e);
LOGGER.error("Exception thrown while calling mkdir (uri={}, errorStatus ={})", uri, e.getStatusCode(), e);
throw new IOException(e);
}
}
Expand Down Expand Up @@ -197,7 +199,7 @@ public boolean copy(URI srcUri, URI dstUri) throws IOException {
// In case we are copying a directory, we need to recursively look into the directory and copy all the files and
// directories accordingly
try {
boolean copySucceeded = false;
boolean copySucceeded = true;
Path srcPath = Paths.get(srcUri.getPath());
for (String path : listFiles(srcUri, true)) {
// Compute the src path for the given path
Expand All @@ -213,7 +215,7 @@ public boolean copy(URI srcUri, URI dstUri) throws IOException {
mkdir(newDst);
} else {
// If src is a file, we need to copy.
copySucceeded |= copySrcToDst(currentSrc, newDst);
copySucceeded &= copySrcToDst(currentSrc, newDst);
}
}
return copySucceeded;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ public boolean copy(URI srcUri, URI dstUri) throws IOException {
ImmutableList.Builder<URI> builder = ImmutableList.builder();
Path srcPath = Paths.get(srcUri.getPath());
try {
boolean copySucceeded = false;
boolean copySucceeded = true;
for (String directoryEntry : listFiles(srcUri, true)) {
URI src = new URI(srcUri.getScheme(), srcUri.getHost(), directoryEntry, null);
String relativeSrcPath = srcPath.relativize(Paths.get(directoryEntry)).toString();
String dstPath = dstUri.resolve(relativeSrcPath).getPath();
URI dst = new URI(dstUri.getScheme(), dstUri.getHost(), dstPath, null);
copySucceeded |= copyFile(src, dst);
copySucceeded &= copyFile(src, dst);
}
return copySucceeded;
} catch (URISyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public void testFS()
Assert.assertTrue(localPinotFS.exists(new File(newAbsoluteTempDirPath3, "testDir").toURI()));
Assert.assertTrue(localPinotFS.exists(new File(new File(newAbsoluteTempDirPath3, "testDir"), "testFile").toURI()));

// Check if using a different scheme on URI still works
URI uri = URI.create("hdfs://localhost:9999" + newAbsoluteTempDirPath.getPath());
localPinotFS.move(newAbsoluteTempDirPath3.toURI(), uri, true);
Assert.assertFalse(localPinotFS.exists(newAbsoluteTempDirPath3.toURI()));
Assert.assertTrue(localPinotFS.exists(newAbsoluteTempDirPath.toURI()));
Assert.assertTrue(localPinotFS.exists(new File(newAbsoluteTempDirPath, "testDir").toURI()));
Assert.assertTrue(localPinotFS.exists(new File(new File(newAbsoluteTempDirPath, "testDir"), "testFile").toURI()));

// Check file copy to location where something already exists still works
localPinotFS.copy(testFileUri, thirdTestFile.toURI());
// Check length of file
Expand Down

0 comments on commit a9ec33c

Please sign in to comment.