Skip to content

Commit

Permalink
hadoopfs listFiles (#1922)
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder authored May 13, 2021
1 parent be53f58 commit 7307f5a
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 43 deletions.
2 changes: 1 addition & 1 deletion clients/hadoopfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<packaging>jar</packaging>
<name>hadoopfs</name>
<version>0.1.0</version>
<url>https://github.com/treeverse/lakeFS/tree/master/clients/java</url>
<url>https://github.com/treeverse/lakeFS/tree/master/clients/hadoopfs</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
Expand Down
8 changes: 7 additions & 1 deletion clients/hadoopfs/src/main/java/io/lakefs/Constants.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package io.lakefs;

class Constants {
public static final String SEPARATOR = "/";
public static final String URI_SCHEME = "lakefs";
public static final String URI_SEPARATOR = "/";
public static final String FS_LAKEFS_ENDPOINT_KEY = "fs.lakefs.endpoint";
public static final String FS_LAKEFS_ACCESS_KEY = "fs.lakefs.access.key";
public static final String FS_LAKEFS_SECRET_KEY = "fs.lakefs.secret.key";
public static final String FS_LAKEFS_LIST_AMOUNT_KEY = "fs.lakefs.list.amount";
public static final int DEFAULT_LIST_AMOUNT = 1000;
}
167 changes: 133 additions & 34 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
Expand All @@ -12,16 +16,15 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import io.lakefs.clients.api.model.ObjectStatsList;
import io.lakefs.clients.api.model.Pagination;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider;
import org.apache.hadoop.util.Progressable;

import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,6 +39,8 @@

import javax.annotation.Nonnull;

import static io.lakefs.Constants.*;

/**
* A dummy implementation of the core lakeFS Filesystem.
* This class implements a {@link LakeFSFileSystem} that can be registered to Spark and support limited write and read actions.
Expand All @@ -50,18 +55,15 @@
*/
public class LakeFSFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystem.class);
public static final String SCHEME = "lakefs";
public static final String FS_LAKEFS_ENDPOINT = "fs.lakefs.endpoint";
public static final String FS_LAKEFS_ACCESS_KEY = "fs.lakefs.access.key";
public static final String FS_LAKEFS_SECRET_KEY = "fs.lakefs.secret.key";

private static final String BASIC_AUTH = "basic_auth";

private Configuration conf;
private URI uri;
private Path workingDirectory = new Path(Constants.SEPARATOR);
private Path workingDirectory = new Path(Constants.URI_SEPARATOR);
private ApiClient apiClient;
private AmazonS3 s3Client;
private int listAmount;

private URI translateUri(URI uri) throws java.net.URISyntaxException {
switch (uri.getScheme()) {
Expand Down Expand Up @@ -89,12 +91,12 @@ public void initialize(URI name, Configuration conf) throws IOException {
this.uri = name;

// setup lakeFS api client
String endpoint = conf.get(FS_LAKEFS_ENDPOINT, "http://localhost:8000/api/v1");
String accessKey = conf.get(FS_LAKEFS_ACCESS_KEY);
String endpoint = conf.get(Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1");
String accessKey = conf.get(Constants.FS_LAKEFS_ACCESS_KEY);
if (accessKey == null) {
throw new IOException("Missing lakeFS access key");
}
String secretKey = conf.get(FS_LAKEFS_SECRET_KEY);
String secretKey = conf.get(Constants.FS_LAKEFS_SECRET_KEY);
if (secretKey == null) {
throw new IOException("Missing lakeFS secret key");
}
Expand All @@ -105,6 +107,8 @@ public void initialize(URI name, Configuration conf) throws IOException {
basicAuth.setPassword(secretKey);

s3Client = createS3ClientFromConf(conf);

listAmount = conf.getInt(FS_LAKEFS_LIST_AMOUNT_KEY, DEFAULT_LIST_AMOUNT);
}

/**
Expand Down Expand Up @@ -172,6 +176,14 @@ public FSDataInputStream open(Path path, int bufSize) throws IOException {
}
}


@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws FileNotFoundException, IOException {
LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ listFiles path: {}, recursive {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ", f.toString(), recursive);

return new ListingIterator(f, recursive, listAmount);
}

/**
*{@inheritDoc}
* Called on a file write Spark/Hadoop action. This method writes the content of the file in path into stdout.
Expand Down Expand Up @@ -223,7 +235,7 @@ public boolean delete(Path path, boolean recursive) throws IOException {
path, recursive);

ObjectLocation objectLoc = pathToObjectLocation(path);
ObjectsApi objectsApi = new ObjectsApi(this.apiClient);
ObjectsApi objectsApi = new ObjectsApi(apiClient);
try {
objectsApi.deleteObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath());
} catch (ApiException e) {
Expand All @@ -234,7 +246,7 @@ public boolean delete(Path path, boolean recursive) throws IOException {
}
throw new IOException("deleteObject", e);
}
LOG.debug("Successfully deleted {}", path.toString());
LOG.debug("Successfully deleted {}", path);
return true;
}

Expand Down Expand Up @@ -264,24 +276,10 @@ public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
public FileStatus getFileStatus(Path path) throws IOException {
LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ getFileStatus, path: {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ", path.toString());
ObjectLocation objectLoc = pathToObjectLocation(path);
if (objectLoc == null) {
throw new FileNotFoundException(path.toString());
}
try {
ObjectsApi objectsApi = new ObjectsApi(this.apiClient);
ObjectStats objectStat = objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath());
long length = 0;
Long sizeBytes = objectStat.getSizeBytes();
if (sizeBytes != null) {
length = sizeBytes;
}
long modificationTime = 0;
Long mtime = objectStat.getMtime();
if (mtime != null) {
modificationTime = TimeUnit.SECONDS.toMillis(mtime);
}
Path filePath = path.makeQualified(this.uri, this.workingDirectory);
return new FileStatus(length, false, 0, 0, modificationTime, filePath);
return convertObjectStatsToFileStatus(objectStat);
} catch (ApiException e) {
if (e.getCode() == HttpStatus.SC_NOT_FOUND) {
throw new FileNotFoundException(path + " not found");
Expand All @@ -290,14 +288,31 @@ public FileStatus getFileStatus(Path path) throws IOException {
}
}

@Nonnull
static FileStatus convertObjectStatsToFileStatus(ObjectStats objectStat) {
long length = 0;
Long sizeBytes = objectStat.getSizeBytes();
if (sizeBytes != null) {
length = sizeBytes;
}
long modificationTime = 0;
Long mtime = objectStat.getMtime();
if (mtime != null) {
modificationTime = TimeUnit.SECONDS.toMillis(mtime);
}
Path filePath = new Path(objectStat.getPath());
boolean isdir = objectStat.getPathType() == ObjectStats.PathTypeEnum.COMMON_PREFIX;
return new FileStatus(length, isdir, 0, 0, modificationTime, filePath);
}

/**
* Return the protocol scheme for the FileSystem.
*
* @return lakefs scheme
*/
@Override
public String getScheme() {
return SCHEME;
return Constants.URI_SCHEME;
}

@Override
Expand All @@ -317,7 +332,7 @@ public boolean exists(Path f) throws IOException {

/**
* Returns Location with repository, ref and path used by lakeFS based on filesystem path.
* @param path
* @param path to extract information from.
* @return lakeFS Location with repository, ref and path
*/
@Nonnull
Expand All @@ -327,18 +342,102 @@ public ObjectLocation pathToObjectLocation(Path path) {
}

URI uri = path.toUri();

ObjectLocation loc = new ObjectLocation();
loc.setRepository(uri.getHost());
// extract ref and rest of the path after removing the '/' prefix
String s = ObjectLocation.trimLeadingSlash(uri.getPath());
int i = s.indexOf(Constants.SEPARATOR);
int i = s.indexOf(Constants.URI_SEPARATOR);
if (i == -1) {
loc.setRef(s);
loc.setPath("");
} else {
loc.setRef(s.substring(0, i));
loc.setPath(s.substring(i+1));
}
return loc;
}

class ListingIterator implements RemoteIterator<LocatedFileStatus> {
private final URI uri;
private final ObjectLocation objectLocation;
private final String delimiter;
private final int amount;
private String nextOffset;
private boolean last;
private List<ObjectStats> chunk;
private int pos;

/**
* Returns iterator for files under path.
* When recursive is set, the iterator will list all files under the target path (delimiter is ignored).
* Parameter amount controls the limit for each request for listing.
*
* @param path the location to list
* @param recursive boolean for recursive listing
* @param amount buffer size to fetch listing
*/
public ListingIterator(Path path, boolean recursive, int amount) {
this.uri = path.toUri();
this.chunk = Collections.emptyList();
this.objectLocation = pathToObjectLocation(path);
String locationPath = this.objectLocation.getPath();
// we assume that 'path' is a directory by default
if (!locationPath.isEmpty() && !locationPath.endsWith(URI_SEPARATOR)) {
this.objectLocation.setPath(locationPath + URI_SEPARATOR);
}
this.delimiter = recursive ? "" : URI_SEPARATOR;
this.last = false;
this.pos = 0;
this.amount = amount == 0 ? DEFAULT_LIST_AMOUNT : amount;
this.nextOffset = "";
}

@Override
public boolean hasNext() throws IOException {
// read next chunk if needed
if (!this.last && this.pos >= this.chunk.size()) {
this.readNextChunk();
}
// return if there is next item available
return this.pos < this.chunk.size();
}

private void readNextChunk() throws IOException {
do {
try {
ObjectsApi objectsApi = new ObjectsApi(apiClient);
ObjectStatsList resp = objectsApi.listObjects(objectLocation.getRepository(), objectLocation.getRef(), objectLocation.getPath(), nextOffset, amount, delimiter);
chunk = resp.getResults();
pos = 0;
Pagination pagination = resp.getPagination();
if (pagination != null) {
nextOffset = pagination.getNextOffset();
if (!pagination.getHasMore()) {
last = true;
}
} else if (chunk.isEmpty()) {
last = true;
}
} catch (ApiException e) {
throw new IOException("listObjects", e);
}
// filter objects
chunk = chunk.stream().filter(stat -> stat.getPathType() == ObjectStats.PathTypeEnum.OBJECT).collect(Collectors.toList());
// loop until we have something or last chunk
} while (!chunk.isEmpty() && !last);
}

@Override
public LocatedFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more entries");
}
ObjectStats objectStats = chunk.get(pos++);
FileStatus fileStatus = convertObjectStatsToFileStatus(objectStats);
// make path absolute
fileStatus.setPath(fileStatus.getPath().makeQualified(this.uri, workingDirectory));
// currently do not pass locations of the file blocks - until we understand if it is required in order to work
return new LocatedFileStatus(fileStatus, null);
}
}
}
4 changes: 1 addition & 3 deletions clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.lakefs;

import org.apache.hadoop.fs.Path;

class ObjectLocation {
private String repository;
private String ref;
Expand Down Expand Up @@ -32,7 +30,7 @@ public void setPath(String path) {
}

static String trimLeadingSlash(String s) {
if (s.startsWith(Constants.SEPARATOR)) {
if (s.startsWith(Constants.URI_SEPARATOR)) {
return s.substring(1);
}
return s;
Expand Down
28 changes: 24 additions & 4 deletions clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public class LakeFSFileSystemTest {
public void setUp() throws Exception {
fs = new LakeFSFileSystem();
Configuration conf = new Configuration(false);
conf.set(LakeFSFileSystem.FS_LAKEFS_ACCESS_KEY, "key");
conf.set(LakeFSFileSystem.FS_LAKEFS_SECRET_KEY, "secret");
conf.set(LakeFSFileSystem.FS_LAKEFS_ENDPOINT, "http://localhost:8000/api/v1");
conf.set(io.lakefs.Constants.FS_LAKEFS_ACCESS_KEY, "<lakefs key>");
conf.set(io.lakefs.Constants.FS_LAKEFS_SECRET_KEY, "<lakefs secret>");
conf.set(io.lakefs.Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1");
URI name = new URI("lakefs://repo/master/file.txt");
fs.initialize(name, conf);
}
Expand All @@ -36,9 +36,29 @@ public void getUri() throws URISyntaxException, IOException {
Assert.assertNotNull(u);
}

/*
@Test
public void testGetFileStatus() {
public void listFiles() throws IOException, URISyntaxException {
RemoteIterator<LocatedFileStatus> it = fs.listFiles(new Path("lakefs://example1/master"), true);
List<LocatedFileStatus> l = new ArrayList<>();
while (it.hasNext()) {
l.add(it.next());
}
// expected 'l' to include all the files in branch - no directory will be listed, with or without recursive
Configuration conf = new Configuration(false);
conf.set(org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY, "<s3a key>");
conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, "<s3a secret>");
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
FileSystem fs2 = FileSystem.get(new URI("s3a://bucket/"), conf);
RemoteIterator<LocatedFileStatus> it2 = fs2.listFiles(new Path("s3a://bucket"), true);
List<LocatedFileStatus> l2 = new ArrayList<>();
while (it2.hasNext()) {
l2.add(it2.next());
}
// expected 'l2' to include all the files in bucket - no directory will be listed, with or without recursive
}
*/

@Test(expected = UnsupportedOperationException.class)
public void testAppend() throws IOException {
Expand Down

0 comments on commit 7307f5a

Please sign in to comment.