Skip to content

Commit

Permalink
Filesystem: non-atomic files rename (#1972)
Browse files Browse the repository at this point in the history
  • Loading branch information
talSofer authored May 23, 2021
1 parent 4711cb0 commit 4c9b404
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 7 deletions.
83 changes: 76 additions & 7 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import io.lakefs.clients.api.model.ObjectStatsList;
import io.lakefs.clients.api.model.Pagination;
import io.lakefs.clients.api.model.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
Expand All @@ -32,8 +31,6 @@
import io.lakefs.clients.api.ApiException;
import io.lakefs.clients.api.ObjectsApi;
import io.lakefs.clients.api.StagingApi;
import io.lakefs.clients.api.model.ObjectStats;
import io.lakefs.clients.api.model.StagingLocation;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -243,10 +240,82 @@ public FSDataOutputStream append(Path path, int i, Progressable progressable) th
throw new UnsupportedOperationException("Append is not supported by LakeFSFileSystem");
}

/**
* This method is implemented under the following assumptions:
* 1. rename is only supported for uncommitted data on the same branch.
* 2. file rename operation is supported, directories rename is unsupported.
* 3. the rename dst path can be an uncommitted file, that will be overridden as a result of the rename operation.
* 4. On rename operation a new mtime is generated, therefore we don't preserve the mtime of the src object.
*
* @throws IOException
*/
@Override
public boolean rename(Path path, Path path1) throws IOException {
LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ rename $$$$$$$$$$$$$$$$$$$$$$$$$$$$ ");
return false;
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ Rename path {} to {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$", src, dst);

ObjectLocation srcObjectLoc = pathToObjectLocation(src);
ObjectLocation dstObjectLoc = pathToObjectLocation(dst);
if (srcObjectLoc.equals(dstObjectLoc)) {
LOG.debug("rename: src and dst refer to the same lakefs object location: {}", dst);
return true;
}

if (!srcObjectLoc.onSameBranch(dstObjectLoc)) {
LOG.error("rename: src {} and dst {} are not on the same branch. rename outside this scope is unsupported "
+ "by lakefs.", src, dst);
return false;
}

ObjectStats srcStat;
ObjectsApi objects = lfsClient.getObjects();
try {
// Stat src file to get its metadata
srcStat = objects.statObject(srcObjectLoc.getRepository(), srcObjectLoc.getRef(),
srcObjectLoc.getPath());
} catch (ApiException e) {
LOG.error("rename: could not get src object stats. src:{}", src, e);
return false;
}

return renameObject(srcStat, srcObjectLoc, dstObjectLoc);
}

/**
* Non-atomic rename operation.
* @return true if rename succeeded, false otherwise
*/
private boolean renameObject(ObjectStats srcStat, ObjectLocation srcObjectLoc, ObjectLocation dstObjectLoc)
throws IOException {
ObjectsApi objects = lfsClient.getObjects();

//TODO (Tals): Can we add metadata? we currently don't have an API to get the metadata of an object.
ObjectStageCreation creationReq = new ObjectStageCreation()
.checksum(srcStat.getChecksum())
.sizeBytes(srcStat.getSizeBytes())
.physicalAddress(srcStat.getPhysicalAddress());

try {
objects.stageObject(dstObjectLoc.getRepository(), dstObjectLoc.getRef(), dstObjectLoc.getPath(),
creationReq);
} catch (ApiException e) {
LOG.error("rename: Could not stage object on dst:{}", dstObjectLoc.getPath(), e);
return false;
}

// delete src path
try {
objects.deleteObject(srcObjectLoc.getRepository(), srcObjectLoc.getRef(), srcObjectLoc.getPath());
} catch (ApiException e) {
// This condition mimics s3a behaviour in https://github.com/apache/hadoop/blob/2960d83c255a00a549f8809882cd3b73a6266b6d/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L2741
if (e.getCode() == HttpStatus.SC_NOT_FOUND) {
LOG.error("Could not delete: {}, reason: {}", srcObjectLoc.getPath(), e.getResponseBody());
return false;
}
throw new IOException("deleteObject", e);
}

LOG.debug("rename: successfully renamed {} to {}", srcObjectLoc.getPath(), dstObjectLoc.getPath());
return true;
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,28 @@ static String trimLeadingSlash(String s) {
}
return s;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof ObjectLocation)) {
return false;
}

ObjectLocation objLoc = (ObjectLocation) obj;
return this.repository.equals(objLoc.getRepository()) &&
this.ref.equals(objLoc.getRef()) && this.path.equals(objLoc.getPath());
}

/**
* Checks if an ObjectLocation is on the same branch.
*
* @param otherObjLoc the objectLocation to compare
* @return true if the object location is on same branch, false otherwise
*/
public boolean onSameBranch(ObjectLocation otherObjLoc) {
return this.repository.equals(otherObjLoc.getRepository()) && this.ref.equals(otherObjLoc.getRef());
}
}
38 changes: 38 additions & 0 deletions clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,42 @@ public void listFiles() throws IOException, URISyntaxException {
public void testAppend() throws IOException {
fs.append(null, 0, null);
}

/*
@Test
public void testRename() throws URISyntaxException, IOException {
Configuration conf = new Configuration(true);
conf.set(Constants.FS_LAKEFS_ACCESS_KEY, "<access_key>");
conf.set(Constants.FS_LAKEFS_SECRET_KEY, "<secret_key>");
conf.set(Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1");
conf.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem");
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
// With lakefsFS the user does not need to point to the s3 gateway
conf.set("fs.s3a.access.key", "<aws_access_key>");
conf.set("fs.s3a.secret.key", "<aws_secret_key>");
LakeFSFileSystem lfs = (LakeFSFileSystem)FileSystem.get(new URI("lakefs://aws-repo/main/nothere.txt"), conf);
// Uncommitted -
// rename existing src file to non-existing dst
Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_temporary/0/_temporary/attempt_202105191158068718340739981962409_0001_m_000000_1/part-00000-10b8c14f-51c0-4604-b7b5-45bf009bd3b0-c000.snappy.parquet");
Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/new-name.parquet");
lfs.rename(src, dst);
// rename non-existing src file - src not found, return false.
Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_temporary/0/_temporary/attempt_202105161150342255421072959703851_0001_m_000000_1/part-00000-c72e1fa6-9d86-4032-a2b1-f8dd1334e52e-c000.snappy.parquet");
Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/dst2.parquet");
lfs.rename(src, dst);
// rename existing src file to existing dst - no failure, src is rename, dst file is overridden with the renamed file.
Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_SUCCESS");
Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/new-name.parquet");
lfs.rename(src, dst);
// rename dir (a common prefix?), currently not working. for path type = common prefix I can't stat the object.
Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_temporary");
Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet");
lfs.rename(src, dst);
}
*/
}

0 comments on commit 4c9b404

Please sign in to comment.