From 4c9b4046f713ec0e491006321cf243d965c5a4b4 Mon Sep 17 00:00:00 2001 From: talSofer Date: Sun, 23 May 2021 10:32:35 +0300 Subject: [PATCH] Filesystem: non-atomic files rename (#1972) --- .../main/java/io/lakefs/LakeFSFileSystem.java | 83 +++++++++++++++++-- .../main/java/io/lakefs/ObjectLocation.java | 24 ++++++ .../java/io/lakefs/LakeFSFileSystemTest.java | 38 +++++++++ 3 files changed, 138 insertions(+), 7 deletions(-) diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java index ce890a79e32..dad1ae0597f 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java @@ -16,8 +16,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; -import io.lakefs.clients.api.model.ObjectStatsList; -import io.lakefs.clients.api.model.Pagination; +import io.lakefs.clients.api.model.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; @@ -32,8 +31,6 @@ import io.lakefs.clients.api.ApiException; import io.lakefs.clients.api.ObjectsApi; import io.lakefs.clients.api.StagingApi; -import io.lakefs.clients.api.model.ObjectStats; -import io.lakefs.clients.api.model.StagingLocation; import javax.annotation.Nonnull; @@ -243,10 +240,82 @@ public FSDataOutputStream append(Path path, int i, Progressable progressable) th throw new UnsupportedOperationException("Append is not supported by LakeFSFileSystem"); } + /** + * This method is implemented under the following assumptions: + * 1. rename is only supported for uncommitted data on the same branch. + * 2. file rename operation is supported, directories rename is unsupported. + * 3. the rename dst path can be an uncommitted file, that will be overridden as a result of the rename operation. + * 4. On rename operation a new mtime is generated, therefore we don't preserve the mtime of the src object. + * + * @throws IOException + */ @Override - public boolean rename(Path path, Path path1) throws IOException { - LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ rename $$$$$$$$$$$$$$$$$$$$$$$$$$$$ "); - return false; + public boolean rename(Path src, Path dst) throws IOException { + LOG.debug("$$$$$$$$$$$$$$$$$$$$$$$$$$$$ Rename path {} to {} $$$$$$$$$$$$$$$$$$$$$$$$$$$$", src, dst); + + ObjectLocation srcObjectLoc = pathToObjectLocation(src); + ObjectLocation dstObjectLoc = pathToObjectLocation(dst); + if (srcObjectLoc.equals(dstObjectLoc)) { + LOG.debug("rename: src and dst refer to the same lakefs object location: {}", dst); + return true; + } + + if (!srcObjectLoc.onSameBranch(dstObjectLoc)) { + LOG.error("rename: src {} and dst {} are not on the same branch. rename outside this scope is unsupported " + + "by lakefs.", src, dst); + return false; + } + + ObjectStats srcStat; + ObjectsApi objects = lfsClient.getObjects(); + try { + // Stat src file to get its metadata + srcStat = objects.statObject(srcObjectLoc.getRepository(), srcObjectLoc.getRef(), + srcObjectLoc.getPath()); + } catch (ApiException e) { + LOG.error("rename: could not get src object stats. src:{}", src, e); + return false; + } + + return renameObject(srcStat, srcObjectLoc, dstObjectLoc); + } + + /** + * Non-atomic rename operation. + * @return true if rename succeeded, false otherwise + */ + private boolean renameObject(ObjectStats srcStat, ObjectLocation srcObjectLoc, ObjectLocation dstObjectLoc) + throws IOException { + ObjectsApi objects = lfsClient.getObjects(); + + //TODO (Tals): Can we add metadata? we currently don't have an API to get the metadata of an object. + ObjectStageCreation creationReq = new ObjectStageCreation() + .checksum(srcStat.getChecksum()) + .sizeBytes(srcStat.getSizeBytes()) + .physicalAddress(srcStat.getPhysicalAddress()); + + try { + objects.stageObject(dstObjectLoc.getRepository(), dstObjectLoc.getRef(), dstObjectLoc.getPath(), + creationReq); + } catch (ApiException e) { + LOG.error("rename: Could not stage object on dst:{}", dstObjectLoc.getPath(), e); + return false; + } + + // delete src path + try { + objects.deleteObject(srcObjectLoc.getRepository(), srcObjectLoc.getRef(), srcObjectLoc.getPath()); + } catch (ApiException e) { + // This condition mimics s3a behaviour in https://github.com/apache/hadoop/blob/2960d83c255a00a549f8809882cd3b73a6266b6d/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java#L2741 + if (e.getCode() == HttpStatus.SC_NOT_FOUND) { + LOG.error("Could not delete: {}, reason: {}", srcObjectLoc.getPath(), e.getResponseBody()); + return false; + } + throw new IOException("deleteObject", e); + } + + LOG.debug("rename: successfully renamed {} to {}", srcObjectLoc.getPath(), dstObjectLoc.getPath()); + return true; } @Override diff --git a/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java b/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java index 262e8bebe6a..19fc1fbbebd 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/ObjectLocation.java @@ -35,4 +35,28 @@ static String trimLeadingSlash(String s) { } return s; } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof ObjectLocation)) { + return false; + } + + ObjectLocation objLoc = (ObjectLocation) obj; + return this.repository.equals(objLoc.getRepository()) && + this.ref.equals(objLoc.getRef()) && this.path.equals(objLoc.getPath()); + } + + /** + * Checks if an ObjectLocation is on the same branch. + * + * @param otherObjLoc the objectLocation to compare + * @return true if the object location is on same branch, false otherwise + */ + public boolean onSameBranch(ObjectLocation otherObjLoc) { + return this.repository.equals(otherObjLoc.getRepository()) && this.ref.equals(otherObjLoc.getRef()); + } } diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java index 943fcc78a32..df1edf32592 100644 --- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java +++ b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java @@ -64,4 +64,42 @@ public void listFiles() throws IOException, URISyntaxException { public void testAppend() throws IOException { fs.append(null, 0, null); } + + /* + @Test + public void testRename() throws URISyntaxException, IOException { + Configuration conf = new Configuration(true); + conf.set(Constants.FS_LAKEFS_ACCESS_KEY, ""); + conf.set(Constants.FS_LAKEFS_SECRET_KEY, ""); + conf.set(Constants.FS_LAKEFS_ENDPOINT_KEY, "http://localhost:8000/api/v1"); + conf.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem"); + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + // With lakefsFS the user does not need to point to the s3 gateway + conf.set("fs.s3a.access.key", ""); + conf.set("fs.s3a.secret.key", ""); + + LakeFSFileSystem lfs = (LakeFSFileSystem)FileSystem.get(new URI("lakefs://aws-repo/main/nothere.txt"), conf); + + // Uncommitted - + // rename existing src file to non-existing dst + Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_temporary/0/_temporary/attempt_202105191158068718340739981962409_0001_m_000000_1/part-00000-10b8c14f-51c0-4604-b7b5-45bf009bd3b0-c000.snappy.parquet"); + Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/new-name.parquet"); + lfs.rename(src, dst); + + // rename non-existing src file - src not found, return false. + Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_temporary/0/_temporary/attempt_202105161150342255421072959703851_0001_m_000000_1/part-00000-c72e1fa6-9d86-4032-a2b1-f8dd1334e52e-c000.snappy.parquet"); + Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/dst2.parquet"); + lfs.rename(src, dst); + + // rename existing src file to existing dst - no failure, src is rename, dst file is overridden with the renamed file. + Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_SUCCESS"); + Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/new-name.parquet"); + lfs.rename(src, dst); + + // rename dir (a common prefix?), currently not working. for path type = common prefix I can't stat the object. + Path src = new Path("lakefs://aws-repo/main/peopleLakefs.parquet/_temporary"); + Path dst = new Path("lakefs://aws-repo/main/peopleLakefs.parquet"); + lfs.rename(src, dst); + } + */ }