diff --git a/clients/hadoopfs/pom.xml b/clients/hadoopfs/pom.xml
index d9426331046..c03adde7c71 100644
--- a/clients/hadoopfs/pom.xml
+++ b/clients/hadoopfs/pom.xml
@@ -8,7 +8,7 @@
jar
hadoopfs
0.1.0
- https://github.com/treeverse/lakeFS/tree/master/clients/java
+ https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs
UTF-8
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java
index f07b86d4b94..7ee0b34d438 100644
--- a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java
+++ b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java
@@ -1,5 +1,11 @@
package io.lakefs;
class Constants {
- public static final String SEPARATOR = "/";
+ public static final String URI_SCHEME = "lakefs";
+ public static final String URI_SEPARATOR = "/";
+ public static final String FS_LAKEFS_ENDPOINT_KEY = "fs.lakefs.endpoint";
+ public static final String FS_LAKEFS_ACCESS_KEY = "fs.lakefs.access.key";
+ public static final String FS_LAKEFS_SECRET_KEY = "fs.lakefs.secret.key";
+ public static final String FS_LAKEFS_LIST_AMOUNT_KEY = "fs.lakefs.list.amount";
+ public static final int DEFAULT_LIST_AMOUNT = 1000;
}
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
index fd53b363570..e48031c6701 100644
--- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
+++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
@@ -3,7 +3,11 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
@@ -12,16 +16,15 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
+import io.lakefs.clients.api.model.ObjectStatsList;
+import io.lakefs.clients.api.model.Pagination;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider;
import org.apache.hadoop.util.Progressable;
+
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,8 @@
import javax.annotation.Nonnull;
+import static io.lakefs.Constants.*;
+
/**
* A dummy implementation of the core lakeFS Filesystem.
* This class implements a {@link LakeFSFileSystem} that can be registered to Spark and support limited write and read actions.
@@ -50,18 +55,15 @@
*/
public class LakeFSFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystem.class);
- public static final String SCHEME = "lakefs";
- public static final String FS_LAKEFS_ENDPOINT = "fs.lakefs.endpoint";
- public static final String FS_LAKEFS_ACCESS_KEY = "fs.lakefs.access.key";
- public static final String FS_LAKEFS_SECRET_KEY = "fs.lakefs.secret.key";
private static final String BASIC_AUTH = "basic_auth";
private Configuration conf;
private URI uri;
- private Path workingDirectory = new Path(Constants.SEPARATOR);
+ private Path workingDirectory = new Path(Constants.URI_SEPARATOR);
private ApiClient apiClient;
private AmazonS3 s3Client;
+ private int listAmount;
private URI translateUri(URI uri) throws java.net.URISyntaxException {
switch (uri.getScheme()) {
@@ -89,12 +91,12 @@ public void initialize(URI name, Configuration conf) throws IOException {
this.uri = name;
// setup lakeFS api client
- String endpoint = conf.get(FS_LAKEFS_ENDPOINT, "http://localhost:8000/api/v1");
- String accessKey = conf.get(FS_LAKEFS_ACCESS_KEY);
+ String endpoint = conf.get(Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1");
+ String accessKey = conf.get(Constants.FS_LAKEFS_ACCESS_KEY);
if (accessKey == null) {
throw new IOException("Missing lakeFS access key");
}
- String secretKey = conf.get(FS_LAKEFS_SECRET_KEY);
+ String secretKey = conf.get(Constants.FS_LAKEFS_SECRET_KEY);
if (secretKey == null) {
throw new IOException("Missing lakeFS secret key");
}
@@ -105,6 +107,8 @@ public void initialize(URI name, Configuration conf) throws IOException {
basicAuth.setPassword(secretKey);
s3Client = createS3ClientFromConf(conf);
+
+ listAmount = conf.getInt(FS_LAKEFS_LIST_AMOUNT_KEY, DEFAULT_LIST_AMOUNT);
}
/**
@@ -172,6 +176,14 @@ public FSDataInputStream open(Path path, int bufSize) throws IOException {
}
}
+
+ @Override
+ public RemoteIterator listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException {
+ LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ listFiles path: {}, recursive {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ", f.toString(), recursive);
+
+ return new ListingIterator(f, recursive, listAmount);
+ }
+
/**
*{@inheritDoc}
* Called on a file write Spark/Hadoop action. This method writes the content of the file in path into stdout.
@@ -223,7 +235,7 @@ public boolean delete(Path path, boolean recursive) throws IOException {
path, recursive);
ObjectLocation objectLoc = pathToObjectLocation(path);
- ObjectsApi objectsApi = new ObjectsApi(this.apiClient);
+ ObjectsApi objectsApi = new ObjectsApi(apiClient);
try {
objectsApi.deleteObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath());
} catch (ApiException e) {
@@ -234,7 +246,7 @@ public boolean delete(Path path, boolean recursive) throws IOException {
}
throw new IOException("deleteObject", e);
}
- LOG.debug("Successfully deleted {}", path.toString());
+ LOG.debug("Successfully deleted {}", path);
return true;
}
@@ -264,24 +276,10 @@ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ getFileStatus, path: {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ", path.toString());
ObjectLocation objectLoc = pathToObjectLocation(path);
- if (objectLoc == null) {
- throw new FileNotFoundException(path.toString());
- }
try {
ObjectsApi objectsApi = new ObjectsApi(this.apiClient);
ObjectStats objectStat = objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath());
- long length = 0;
- Long sizeBytes = objectStat.getSizeBytes();
- if (sizeBytes != null) {
- length = sizeBytes;
- }
- long modificationTime = 0;
- Long mtime = objectStat.getMtime();
- if (mtime != null) {
- modificationTime = TimeUnit.SECONDS.toMillis(mtime);
- }
- Path filePath = path.makeQualified(this.uri, this.workingDirectory);
- return new FileStatus(length, false, 0, 0, modificationTime, filePath);
+ return convertObjectStatsToFileStatus(objectStat);
} catch (ApiException e) {
if (e.getCode() == HttpStatus.SC_NOT_FOUND) {
throw new FileNotFoundException(path + " not found");
@@ -290,6 +288,23 @@ public FileStatus getFileStatus(Path path) throws IOException {
}
}
+ @Nonnull
+ static FileStatus convertObjectStatsToFileStatus(ObjectStats objectStat) {
+ long length = 0;
+ Long sizeBytes = objectStat.getSizeBytes();
+ if (sizeBytes != null) {
+ length = sizeBytes;
+ }
+ long modificationTime = 0;
+ Long mtime = objectStat.getMtime();
+ if (mtime != null) {
+ modificationTime = TimeUnit.SECONDS.toMillis(mtime);
+ }
+ Path filePath = new Path(objectStat.getPath());
+ boolean isdir = objectStat.getPathType() == ObjectStats.PathTypeEnum.COMMON_PREFIX;
+ return new FileStatus(length, isdir, 0, 0, modificationTime, filePath);
+ }
+
/**
* Return the protocol scheme for the FileSystem.
*
@@ -297,7 +312,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
*/
@Override
public String getScheme() {
- return SCHEME;
+ return Constants.URI_SCHEME;
}
@Override
@@ -317,7 +332,7 @@ public boolean exists(Path f) throws IOException {
/**
* Returns Location with repository, ref and path used by lakeFS based on filesystem path.
- * @param path
+ * @param path to extract information from.
* @return lakeFS Location with repository, ref and path
*/
@Nonnull
@@ -327,18 +342,102 @@ public ObjectLocation pathToObjectLocation(Path path) {
}
URI uri = path.toUri();
-
ObjectLocation loc = new ObjectLocation();
loc.setRepository(uri.getHost());
// extract ref and rest of the path after removing the '/' prefix
String s = ObjectLocation.trimLeadingSlash(uri.getPath());
- int i = s.indexOf(Constants.SEPARATOR);
+ int i = s.indexOf(Constants.URI_SEPARATOR);
if (i == -1) {
loc.setRef(s);
+ loc.setPath("");
} else {
loc.setRef(s.substring(0, i));
loc.setPath(s.substring(i+1));
}
return loc;
}
+
+ class ListingIterator implements RemoteIterator {
+ private final URI uri;
+ private final ObjectLocation objectLocation;
+ private final String delimiter;
+ private final int amount;
+ private String nextOffset;
+ private boolean last;
+ private List chunk;
+ private int pos;
+
+ /**
+ * Returns iterator for files under path.
+ * When recursive is set, the iterator will list all files under the target path (delimiter is ignored).
+ * Parameter amount controls the limit for each request for listing.
+ *
+ * @param path the location to list
+ * @param recursive boolean for recursive listing
+ * @param amount buffer size to fetch listing
+ */
+ public ListingIterator(Path path, boolean recursive, int amount) {
+ this.uri = path.toUri();
+ this.chunk = Collections.emptyList();
+ this.objectLocation = pathToObjectLocation(path);
+ String locationPath = this.objectLocation.getPath();
+ // we assume that 'path' is a directory by default
+ if (!locationPath.isEmpty() && !locationPath.endsWith(URI_SEPARATOR)) {
+ this.objectLocation.setPath(locationPath + URI_SEPARATOR);
+ }
+ this.delimiter = recursive ? "" : URI_SEPARATOR;
+ this.last = false;
+ this.pos = 0;
+ this.amount = amount == 0 ? DEFAULT_LIST_AMOUNT : amount;
+ this.nextOffset = "";
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ // read next chunk if needed
+ if (!this.last && this.pos >= this.chunk.size()) {
+ this.readNextChunk();
+ }
+ // return if there is next item available
+ return this.pos < this.chunk.size();
+ }
+
+ private void readNextChunk() throws IOException {
+ do {
+ try {
+ ObjectsApi objectsApi = new ObjectsApi(apiClient);
+ ObjectStatsList resp = objectsApi.listObjects(objectLocation.getRepository(), objectLocation.getRef(), objectLocation.getPath(), nextOffset, amount, delimiter);
+ chunk = resp.getResults();
+ pos = 0;
+ Pagination pagination = resp.getPagination();
+ if (pagination != null) {
+ nextOffset = pagination.getNextOffset();
+ if (!pagination.getHasMore()) {
+ last = true;
+ }
+ } else if (chunk.isEmpty()) {
+ last = true;
+ }
+ } catch (ApiException e) {
+ throw new IOException("listObjects", e);
+ }
+ // filter objects
+ chunk = chunk.stream().filter(stat -> stat.getPathType() == ObjectStats.PathTypeEnum.OBJECT).collect(Collectors.toList());
+ // loop until we have something or last chunk
+ } while (!chunk.isEmpty() && !last);
+ }
+
+ @Override
+ public LocatedFileStatus next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more entries");
+ }
+ ObjectStats objectStats = chunk.get(pos++);
+ FileStatus fileStatus = convertObjectStatsToFileStatus(objectStats);
+ // make path absolute
+ fileStatus.setPath(fileStatus.getPath().makeQualified(this.uri, workingDirectory));
+ // currently do not pass locations of the file blocks - until we understand if it is required in order to work
+ return new LocatedFileStatus(fileStatus, null);
+ }
+ }
}
diff --git a/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java b/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java
index e97b81a0116..262e8bebe6a 100644
--- a/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java
+++ b/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java
@@ -1,7 +1,5 @@
package io.lakefs;
-import org.apache.hadoop.fs.Path;
-
class ObjectLocation {
private String repository;
private String ref;
@@ -32,7 +30,7 @@ public void setPath(String path) {
}
static String trimLeadingSlash(String s) {
- if (s.startsWith(Constants.SEPARATOR)) {
+ if (s.startsWith(Constants.URI_SEPARATOR)) {
return s.substring(1);
}
return s;
diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java
index 0f7f6586e03..943fcc78a32 100644
--- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java
+++ b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java
@@ -18,9 +18,9 @@ public class LakeFSFileSystemTest {
public void setUp() throws Exception {
fs = new LakeFSFileSystem();
Configuration conf = new Configuration(false);
- conf.set(LakeFSFileSystem.FS_LAKEFS_ACCESS_KEY, "key");
- conf.set(LakeFSFileSystem.FS_LAKEFS_SECRET_KEY, "secret");
- conf.set(LakeFSFileSystem.FS_LAKEFS_ENDPOINT, "http://localhost:8000/api/v1");
+ conf.set(io.lakefs.Constants.FS_LAKEFS_ACCESS_KEY, "");
+ conf.set(io.lakefs.Constants.FS_LAKEFS_SECRET_KEY, "");
+ conf.set(io.lakefs.Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1");
URI name = new URI("lakefs://repo/master/file.txt");
fs.initialize(name, conf);
}
@@ -36,9 +36,29 @@ public void getUri() throws URISyntaxException, IOException {
Assert.assertNotNull(u);
}
+ /*
@Test
- public void testGetFileStatus() {
+ public void listFiles() throws IOException, URISyntaxException {
+ RemoteIterator it = fs.listFiles(new Path("lakefs://example1/master"), true);
+ List l = new ArrayList<>();
+ while (it.hasNext()) {
+ l.add(it.next());
+ }
+ // expected 'l' to include all the files in branch - no directory will be listed, with or without recursive
+
+ Configuration conf = new Configuration(false);
+ conf.set(org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY, "");
+ conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, "");
+ conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ FileSystem fs2 = FileSystem.get(new URI("s3a://bucket/"), conf);
+ RemoteIterator it2 = fs2.listFiles(new Path("s3a://bucket"), true);
+ List l2 = new ArrayList<>();
+ while (it2.hasNext()) {
+ l2.add(it2.next());
+ }
+ // expected 'l2' to include all the files in bucket - no directory will be listed, with or without recursive
}
+ */
@Test(expected = UnsupportedOperationException.class)
public void testAppend() throws IOException {