From 7307f5a0dcdbcf08fe77354d4be175a65163d47a Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Thu, 13 May 2021 10:31:34 +0300 Subject: [PATCH] hadoopfs listFiles (#1922) --- clients/hadoopfs/pom.xml | 2 +- .../src/main/java/io/lakefs/Constants.java | 8 +- .../main/java/io/lakefs/LakeFSFileSystem.java | 167 ++++++++++++++---- .../main/java/io/lakefs/ObjectLocation.java | 4 +- .../java/io/lakefs/LakeFSFileSystemTest.java | 28 ++- 5 files changed, 166 insertions(+), 43 deletions(-) diff --git a/clients/hadoopfs/pom.xml b/clients/hadoopfs/pom.xml index d9426331046..c03adde7c71 100644 --- a/clients/hadoopfs/pom.xml +++ b/clients/hadoopfs/pom.xml @@ -8,7 +8,7 @@ jar hadoopfs 0.1.0 - https://github.com/treeverse/lakeFS/tree/master/clients/java + https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs UTF-8 diff --git a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java index f07b86d4b94..7ee0b34d438 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java @@ -1,5 +1,11 @@ package io.lakefs; class Constants { - public static final String SEPARATOR = "/"; + public static final String URI_SCHEME = "lakefs"; + public static final String URI_SEPARATOR = "/"; + public static final String FS_LAKEFS_ENDPOINT_KEY = "fs.lakefs.endpoint"; + public static final String FS_LAKEFS_ACCESS_KEY = "fs.lakefs.access.key"; + public static final String FS_LAKEFS_SECRET_KEY = "fs.lakefs.secret.key"; + public static final String FS_LAKEFS_LIST_AMOUNT_KEY = "fs.lakefs.list.amount"; + public static final int DEFAULT_LIST_AMOUNT = 1000; } diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java index fd53b363570..e48031c6701 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java @@ -3,7 +3,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; @@ -12,16 +16,15 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; +import io.lakefs.clients.api.model.ObjectStatsList; +import io.lakefs.clients.api.model.Pagination; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider; import org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider; import org.apache.hadoop.util.Progressable; + import org.apache.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,8 @@ import javax.annotation.Nonnull; +import static io.lakefs.Constants.*; + /** * A dummy implementation of the core lakeFS Filesystem. * This class implements a {@link LakeFSFileSystem} that can be registered to Spark and support limited write and read actions. @@ -50,18 +55,15 @@ */ public class LakeFSFileSystem extends FileSystem { public static final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystem.class); - public static final String SCHEME = "lakefs"; - public static final String FS_LAKEFS_ENDPOINT = "fs.lakefs.endpoint"; - public static final String FS_LAKEFS_ACCESS_KEY = "fs.lakefs.access.key"; - public static final String FS_LAKEFS_SECRET_KEY = "fs.lakefs.secret.key"; private static final String BASIC_AUTH = "basic_auth"; private Configuration conf; private URI uri; - private Path workingDirectory = new Path(Constants.SEPARATOR); + private Path workingDirectory = new Path(Constants.URI_SEPARATOR); private ApiClient apiClient; private AmazonS3 s3Client; + private int listAmount; private URI translateUri(URI uri) throws java.net.URISyntaxException { switch (uri.getScheme()) { @@ -89,12 +91,12 @@ public void initialize(URI name, Configuration conf) throws IOException { this.uri = name; // setup lakeFS api client - String endpoint = conf.get(FS_LAKEFS_ENDPOINT, "http://localhost:8000/api/v1"); - String accessKey = conf.get(FS_LAKEFS_ACCESS_KEY); + String endpoint = conf.get(Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1"); + String accessKey = conf.get(Constants.FS_LAKEFS_ACCESS_KEY); if (accessKey == null) { throw new IOException("Missing lakeFS access key"); } - String secretKey = conf.get(FS_LAKEFS_SECRET_KEY); + String secretKey = conf.get(Constants.FS_LAKEFS_SECRET_KEY); if (secretKey == null) { throw new IOException("Missing lakeFS secret key"); } @@ -105,6 +107,8 @@ public void initialize(URI name, Configuration conf) throws IOException { basicAuth.setPassword(secretKey); s3Client = createS3ClientFromConf(conf); + + listAmount = conf.getInt(FS_LAKEFS_LIST_AMOUNT_KEY, DEFAULT_LIST_AMOUNT); } /** @@ -172,6 +176,14 @@ public FSDataInputStream open(Path path, int bufSize) throws IOException { } } + + @Override + public RemoteIterator listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException { + LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ listFiles path: {}, recursive {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ", f.toString(), recursive); + + return new ListingIterator(f, recursive, listAmount); + } + /** *{@inheritDoc} * Called on a file write Spark/Hadoop action. This method writes the content of the file in path into stdout. @@ -223,7 +235,7 @@ public boolean delete(Path path, boolean recursive) throws IOException { path, recursive); ObjectLocation objectLoc = pathToObjectLocation(path); - ObjectsApi objectsApi = new ObjectsApi(this.apiClient); + ObjectsApi objectsApi = new ObjectsApi(apiClient); try { objectsApi.deleteObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath()); } catch (ApiException e) { @@ -234,7 +246,7 @@ public boolean delete(Path path, boolean recursive) throws IOException { } throw new IOException("deleteObject", e); } - LOG.debug("Successfully deleted {}", path.toString()); + LOG.debug("Successfully deleted {}", path); return true; } @@ -264,24 +276,10 @@ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { public FileStatus getFileStatus(Path path) throws IOException { LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ getFileStatus, path: {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ", path.toString()); ObjectLocation objectLoc = pathToObjectLocation(path); - if (objectLoc == null) { - throw new FileNotFoundException(path.toString()); - } try { ObjectsApi objectsApi = new ObjectsApi(this.apiClient); ObjectStats objectStat = objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath()); - long length = 0; - Long sizeBytes = objectStat.getSizeBytes(); - if (sizeBytes != null) { - length = sizeBytes; - } - long modificationTime = 0; - Long mtime = objectStat.getMtime(); - if (mtime != null) { - modificationTime = TimeUnit.SECONDS.toMillis(mtime); - } - Path filePath = path.makeQualified(this.uri, this.workingDirectory); - return new FileStatus(length, false, 0, 0, modificationTime, filePath); + return convertObjectStatsToFileStatus(objectStat); } catch (ApiException e) { if (e.getCode() == HttpStatus.SC_NOT_FOUND) { throw new FileNotFoundException(path + " not found"); @@ -290,6 +288,23 @@ public FileStatus getFileStatus(Path path) throws IOException { } } + @Nonnull + static FileStatus convertObjectStatsToFileStatus(ObjectStats objectStat) { + long length = 0; + Long sizeBytes = objectStat.getSizeBytes(); + if (sizeBytes != null) { + length = sizeBytes; + } + long modificationTime = 0; + Long mtime = objectStat.getMtime(); + if (mtime != null) { + modificationTime = TimeUnit.SECONDS.toMillis(mtime); + } + Path filePath = new Path(objectStat.getPath()); + boolean isdir = objectStat.getPathType() == ObjectStats.PathTypeEnum.COMMON_PREFIX; + return new FileStatus(length, isdir, 0, 0, modificationTime, filePath); + } + /** * Return the protocol scheme for the FileSystem. * @@ -297,7 +312,7 @@ public FileStatus getFileStatus(Path path) throws IOException { */ @Override public String getScheme() { - return SCHEME; + return Constants.URI_SCHEME; } @Override @@ -317,7 +332,7 @@ public boolean exists(Path f) throws IOException { /** * Returns Location with repository, ref and path used by lakeFS based on filesystem path. - * @param path + * @param path to extract information from. * @return lakeFS Location with repository, ref and path */ @Nonnull @@ -327,18 +342,102 @@ public ObjectLocation pathToObjectLocation(Path path) { } URI uri = path.toUri(); - ObjectLocation loc = new ObjectLocation(); loc.setRepository(uri.getHost()); // extract ref and rest of the path after removing the '/' prefix String s = ObjectLocation.trimLeadingSlash(uri.getPath()); - int i = s.indexOf(Constants.SEPARATOR); + int i = s.indexOf(Constants.URI_SEPARATOR); if (i == -1) { loc.setRef(s); + loc.setPath(""); } else { loc.setRef(s.substring(0, i)); loc.setPath(s.substring(i+1)); } return loc; } + + class ListingIterator implements RemoteIterator { + private final URI uri; + private final ObjectLocation objectLocation; + private final String delimiter; + private final int amount; + private String nextOffset; + private boolean last; + private List chunk; + private int pos; + + /** + * Returns iterator for files under path. + * When recursive is set, the iterator will list all files under the target path (delimiter is ignored). + * Parameter amount controls the limit for each request for listing. + * + * @param path the location to list + * @param recursive boolean for recursive listing + * @param amount buffer size to fetch listing + */ + public ListingIterator(Path path, boolean recursive, int amount) { + this.uri = path.toUri(); + this.chunk = Collections.emptyList(); + this.objectLocation = pathToObjectLocation(path); + String locationPath = this.objectLocation.getPath(); + // we assume that 'path' is a directory by default + if (!locationPath.isEmpty() && !locationPath.endsWith(URI_SEPARATOR)) { + this.objectLocation.setPath(locationPath + URI_SEPARATOR); + } + this.delimiter = recursive ? "" : URI_SEPARATOR; + this.last = false; + this.pos = 0; + this.amount = amount == 0 ? DEFAULT_LIST_AMOUNT : amount; + this.nextOffset = ""; + } + + @Override + public boolean hasNext() throws IOException { + // read next chunk if needed + if (!this.last && this.pos >= this.chunk.size()) { + this.readNextChunk(); + } + // return if there is next item available + return this.pos < this.chunk.size(); + } + + private void readNextChunk() throws IOException { + do { + try { + ObjectsApi objectsApi = new ObjectsApi(apiClient); + ObjectStatsList resp = objectsApi.listObjects(objectLocation.getRepository(), objectLocation.getRef(), objectLocation.getPath(), nextOffset, amount, delimiter); + chunk = resp.getResults(); + pos = 0; + Pagination pagination = resp.getPagination(); + if (pagination != null) { + nextOffset = pagination.getNextOffset(); + if (!pagination.getHasMore()) { + last = true; + } + } else if (chunk.isEmpty()) { + last = true; + } + } catch (ApiException e) { + throw new IOException("listObjects", e); + } + // filter objects + chunk = chunk.stream().filter(stat -> stat.getPathType() == ObjectStats.PathTypeEnum.OBJECT).collect(Collectors.toList()); + // loop until we have something or last chunk + } while (!chunk.isEmpty() && !last); + } + + @Override + public LocatedFileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entries"); + } + ObjectStats objectStats = chunk.get(pos++); + FileStatus fileStatus = convertObjectStatsToFileStatus(objectStats); + // make path absolute + fileStatus.setPath(fileStatus.getPath().makeQualified(this.uri, workingDirectory)); + // currently do not pass locations of the file blocks - until we understand if it is required in order to work + return new LocatedFileStatus(fileStatus, null); + } + } } diff --git a/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java b/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java index e97b81a0116..262e8bebe6a 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java @@ -1,7 +1,5 @@ package io.lakefs; -import org.apache.hadoop.fs.Path; - class ObjectLocation { private String repository; private String ref; @@ -32,7 +30,7 @@ public void setPath(String path) { } static String trimLeadingSlash(String s) { - if (s.startsWith(Constants.SEPARATOR)) { + if (s.startsWith(Constants.URI_SEPARATOR)) { return s.substring(1); } return s; diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java index 0f7f6586e03..943fcc78a32 100644 --- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java +++ b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java @@ -18,9 +18,9 @@ public class LakeFSFileSystemTest { public void setUp() throws Exception { fs = new LakeFSFileSystem(); Configuration conf = new Configuration(false); - conf.set(LakeFSFileSystem.FS_LAKEFS_ACCESS_KEY, "key"); - conf.set(LakeFSFileSystem.FS_LAKEFS_SECRET_KEY, "secret"); - conf.set(LakeFSFileSystem.FS_LAKEFS_ENDPOINT, "http://localhost:8000/api/v1"); + conf.set(io.lakefs.Constants.FS_LAKEFS_ACCESS_KEY, ""); + conf.set(io.lakefs.Constants.FS_LAKEFS_SECRET_KEY, ""); + conf.set(io.lakefs.Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1"); URI name = new URI("lakefs://repo/master/file.txt"); fs.initialize(name, conf); } @@ -36,9 +36,29 @@ public void getUri() throws URISyntaxException, IOException { Assert.assertNotNull(u); } + /* @Test - public void testGetFileStatus() { + public void listFiles() throws IOException, URISyntaxException { + RemoteIterator it = fs.listFiles(new Path("lakefs://example1/master"), true); + List l = new ArrayList<>(); + while (it.hasNext()) { + l.add(it.next()); + } + // expected 'l' to include all the files in branch - no directory will be listed, with or without recursive + + Configuration conf = new Configuration(false); + conf.set(org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY, ""); + conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, ""); + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + FileSystem fs2 = FileSystem.get(new URI("s3a://bucket/"), conf); + RemoteIterator it2 = fs2.listFiles(new Path("s3a://bucket"), true); + List l2 = new ArrayList<>(); + while (it2.hasNext()) { + l2.add(it2.next()); + } + // expected 'l2' to include all the files in bucket - no directory will be listed, with or without recursive } + */ @Test(expected = UnsupportedOperationException.class) public void testAppend() throws IOException {