Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lakeFSFS] Use bulk deletes in recursive FileSystem delete #4204

Merged
2 changes: 1 addition & 1 deletion clients/hadoopfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ To export to S3:
<dependency>
<groupId>io.lakefs</groupId>
<artifactId>api-client</artifactId>
<version>0.56.0</version>
<version>0.81.0</version>
</dependency>

<dependency>
Expand Down
150 changes: 150 additions & 0 deletions clients/hadoopfs/src/main/java/io/lakefs/BulkDeleter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.lakefs;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import io.lakefs.clients.api.ApiException;
import io.lakefs.clients.api.model.ObjectErrorList;
import io.lakefs.clients.api.model.PathList;

class BulkDeleter implements Closeable {
private static final int defaultBulkSize = 1000;

private final ExecutorService executor;
private final Callback callback;
private final String repository;
private final String branch;
private final int bulkSize;

private PathList pathList;
// TODO(ariels): Configure this!
private final int concurrency = 1;
private Queue<Future<ObjectErrorList>> deletions = new ArrayDeque<>();

public static interface Callback {
Copy link
Contributor

@Jonathan-Rosenberg Jonathan-Rosenberg Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why static?
Also, this looks like a pretty generic interface. It could be used with other types of bulk operations.
You might consider renaming it and declaring it in its own file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's static because it is not inner AFAIK. I realize that Java probably ends up building the class of an interface as non-inner even without this keyword... but that is even more confusing to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it's needed, but it doesn't matter too much...

ObjectErrorList apply(String repository, String branch, PathList pathList) throws ApiException;
}

public static class DeleteFailuresException extends IOException {
public DeleteFailuresException(ObjectErrorList errorList) {
super("failed to delete: " + errorList.toString());
}
}

/**
* Construct a BulkDeleter to bulk-delete objects on branch in repository,
* using callback on executor.
*/
BulkDeleter(ExecutorService executor, Callback callback, String repository, String branch, int bulkSize) {
this.executor = executor;
this.callback = callback;
this.repository = repository;
this.branch = branch;
if (bulkSize <= 0) {
bulkSize = defaultBulkSize;
}
this.bulkSize = bulkSize;
}

BulkDeleter(ExecutorService executor, Callback callback, String repository, String branch) {
this(executor, callback, repository, branch, defaultBulkSize);
}

/**
* Add another key to be deleted. If a bulk is ready, delete it. Any
* errors thrown may be related to previously-added keys.
*/
public synchronized void add(String key) throws IOException, DeleteFailuresException {
if (pathList == null) {
pathList = new PathList();
}
pathList.addPathsItem(key);
if (pathList.getPaths().size() >= bulkSize) {
startDeletingUnlocked();
}
}

/**
* Close this BulkDeleter, possibly performing one last deletion.
*
* @throws DeleteFailuresException if last deletion did not (entirely) succeed.
*/
@Override
public synchronized void close() throws IOException, DeleteFailuresException {
if (pathList != null && !pathList.getPaths().isEmpty()) {
startDeletingUnlocked();
}
drainDeletionsUnlocked();
}

/**
* Start deleting everything in pathList and empty it. Must call locked.
*/
private void startDeletingUnlocked() throws IOException, DeleteFailuresException {
maybeWaitForDeletionUnlocked();
PathList toDelete = pathList;
pathList = null;
deletions.add(executor.submit(new Callable() {
@Override
public ObjectErrorList call() throws ApiException, InterruptedException, DeleteFailuresException {
ObjectErrorList ret = callback.apply(repository, branch, toDelete);
return ret;
}
}));
}

/**
* Wait for deletion callbacks to end until deletions has space. Must
* call locked.
*
* @throws DeleteFailuresException if deletion did not (entirely) succeed.
*/
private void maybeWaitForDeletionUnlocked() throws DeleteFailuresException, IOException {
while (deletions.size() >= concurrency) {
waitForOneDeletionUnlocked();
}
}

/**
* Wait for deletion callbacks to end until deletions has space. Must
* call locked.
*
* @throws DeleteFailuresException if deletion did not (entirely) succeed.
*/
private void drainDeletionsUnlocked() throws DeleteFailuresException, IOException {
while (!deletions.isEmpty()) {
waitForOneDeletionUnlocked();
}
}

private void waitForOneDeletionUnlocked() throws DeleteFailuresException, IOException {
try {
Future<ObjectErrorList> deletion = deletions.poll();
if (deletion == null) return;

ObjectErrorList errors = deletion.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case there are multiple threads that add deletions to the list, and there are more than concurrency deletions in the answers queue, won't this line make all threads halt until there is at least one answer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. But: what would they be doing if they were not blocked? They would still need to block on startDeletingUnlocked :-)

if (errors != null && errors.getErrors() != null && !errors.getErrors().isEmpty()) {
throw new DeleteFailuresException(errors);
}
} catch (ExecutionException e) {
// Unwrap and re-throw e (usually)
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
} else if (cause instanceof Error) {
// Don't wrap serious errors.
throw (Error)cause;
} else {
throw new IOException("failed to wait for bulk delete", cause);
}
} catch (InterruptedException ie) {
throw new IOException("wait for deletion", ie);
}
}
}
63 changes: 51 additions & 12 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.net.URISyntaxException;
import java.nio.file.AccessDeniedException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -40,14 +43,26 @@
public class LakeFSFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystem.class);
public static final Logger OPERATIONS_LOG = LoggerFactory.getLogger(LakeFSFileSystem.class + "[OPERATION]");
public static final String LAKEFS_DELETE_BULK_SIZE = "fs.lakefs.delete.bulk_size";

private Configuration conf;
private URI uri;
private Path workingDirectory = new Path(Constants.SEPARATOR);
private ClientFactory clientFactory;
private LakeFSClient lfsClient;
private int listAmount;
private FileSystem fsForConfig;

// Currently bulk deletes *must* receive a single-threaded executor!
private ExecutorService deleteExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});

private URI translateUri(URI uri) throws java.net.URISyntaxException {
switch (uri.getScheme()) {
case "s3":
Expand All @@ -62,16 +77,23 @@ public URI getUri() {
return uri;
}

public interface ClientFactory {
LakeFSClient newClient() throws IOException;
}

@Override
public void initialize(URI name, Configuration conf) throws IOException {
initializeWithClient(name, conf, new LakeFSClient(name.getScheme(), conf));
initializeWithClientFactory(name, conf, new ClientFactory() {
public LakeFSClient newClient() throws IOException { return new LakeFSClient(name.getScheme(), conf); }
});
}

void initializeWithClient(URI name, Configuration conf, LakeFSClient lfsClient) throws IOException {
void initializeWithClientFactory(URI name, Configuration conf, ClientFactory clientFactory) throws IOException {
super.initialize(name, conf);
this.uri = name;
this.conf = conf;
this.lfsClient = lfsClient;
this.clientFactory = clientFactory;
this.lfsClient = clientFactory.newClient();

String host = name.getHost();
if (host == null) {
Expand Down Expand Up @@ -218,7 +240,7 @@ private FSDataOutputStream createDataOutputStream(BiFunctionWithIOException<File
OutputStream physicalOut = createStream.apply(physicalFs, physicalPath);
MetadataClient metadataClient = new MetadataClient(physicalFs);
LinkOnCloseOutputStream out = new LinkOnCloseOutputStream(this,
stagingLoc, objectLoc, physicalUri, metadataClient, physicalOut);
stagingLoc, objectLoc, physicalUri, metadataClient, physicalOut);
// TODO(ariels): add fs.FileSystem.Statistics here to keep track.
return new FSDataOutputStream(out, null);
}
Expand Down Expand Up @@ -476,15 +498,21 @@ public boolean delete(Path path, boolean recursive) throws IOException {
loc = loc.toDirectory();
deleted = deleteHelper(loc);
} else {
ListingIterator iterator = new ListingIterator(path, true, listAmount);
iterator.setRemoveDirectory(false);
while (iterator.hasNext()) {
LakeFSFileStatus fileStatus = iterator.next();
ObjectLocation fileLoc = pathToObjectLocation(fileStatus.getPath());
if (fileStatus.isDirectory()) {
fileLoc = fileLoc.toDirectory();
ObjectLocation location = pathToObjectLocation(path);
try (BulkDeleter deleter = newDeleter(location.getRepository(), location.getRef())) {
ListingIterator iterator = new ListingIterator(path, true, listAmount);
iterator.setRemoveDirectory(false);
while (iterator.hasNext()) {
LakeFSFileStatus fileStatus = iterator.next();
ObjectLocation fileLoc = pathToObjectLocation(fileStatus.getPath());
if (fileStatus.isDirectory()) {
fileLoc = fileLoc.toDirectory();
}
deleter.add(fileLoc.getPath());
}
deleteHelper(fileLoc);
} catch (BulkDeleter.DeleteFailuresException e) {
LOG.error("delete(%s, %b): %s", path, recursive, e.toString());
deleted = false;
}
}
} else {
Expand All @@ -495,6 +523,17 @@ public boolean delete(Path path, boolean recursive) throws IOException {
return deleted;
}

private BulkDeleter newDeleter(String repository, String branch) throws IOException {
// Use a different client -- a different thread waits for its calls,
// *late*.
ObjectsApi objectsApi = clientFactory.newClient().getObjects();
return new BulkDeleter(deleteExecutor, new BulkDeleter.Callback() {
public ObjectErrorList apply(String repository, String branch, PathList pathList) throws ApiException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using a lambda for readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, I <3 functional programming. But... the test is easier to write with a Callback that has state. So I stayed all Java 7-y.

return objectsApi.deleteObjects(repository, branch, pathList);
}
}, repository, branch, conf.getInt(LAKEFS_DELETE_BULK_SIZE, 0));
}

private boolean deleteHelper(ObjectLocation loc) throws IOException {
try {
ObjectsApi objectsApi = lfsClient.getObjects();
Expand Down
115 changes: 115 additions & 0 deletions clients/hadoopfs/src/test/java/io/lakefs/BulkDeleterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.lakefs;

import java.io.IOException;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.hamcrest.CustomTypeSafeMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import io.lakefs.clients.api.ApiException;
import io.lakefs.clients.api.model.ObjectErrorList;
import io.lakefs.clients.api.model.PathList;

public class BulkDeleterTest {
protected ExecutorService executorService = Executors.newFixedThreadPool(3);

@After
public void shutdownExecutor() {
executorService.shutdown();
}

@Test
public void nothing() throws IOException {
BulkDeleter deleter = new BulkDeleter(executorService, new BulkDeleter.Callback() {
public ObjectErrorList apply(String repository, String branch, PathList pathList) throws ApiException {
throw new ApiException("failed for testing");
}
},
"repo", "branch", 50);
Comment on lines +28 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
BulkDeleter deleter = new BulkDeleter(executorService, new BulkDeleter.Callback() {
public ObjectErrorList apply(String repository, String branch, PathList pathList) throws ApiException {
throw new ApiException("failed for testing");
}
},
"repo", "branch", 50);
BulkDeleter deleter = new BulkDeleter(executorService, (repo, branch, pathList) -> {
throw new ApiException("failed for testing");
}, "repo", "branch", 50);

Any reason for us not to support Lambdas at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Callback on l. 34 is harder to write. Even more so with its new improved form that checks concurrency.

deleter.close();
}

class Callback implements BulkDeleter.Callback {
private int bulkSize;
private int numPaths;
private Set<String> expected;

Callback(int bulkSize, int numPaths, Set<String> expected) {
this.bulkSize = bulkSize;
this.numPaths = numPaths;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nuked.

this.expected = new HashSet(expected);
}

public ObjectErrorList apply(String repository, String branch, PathList pathList) throws ApiException {
Assert.assertNotNull(pathList);
Assert.assertThat(pathList.getPaths().size(),
new CustomTypeSafeMatcher<Integer>(String.format("has at most %d elements", bulkSize)) {
public boolean matchesSafely(Integer size) {
return size <= bulkSize;
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Assert.assertThat(pathList.getPaths().size(),
new CustomTypeSafeMatcher<Integer>(String.format("has at most %d elements", bulkSize)) {
public boolean matchesSafely(Integer size) {
return size <= bulkSize;
}
});
Assert.assertTrue(pathList.getPaths().size() <= bulkSize);

Why not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't that give a horrible message on failure? I want to know the actual numbers!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arielshaqed, then:

Suggested change
Assert.assertThat(pathList.getPaths().size(),
new CustomTypeSafeMatcher<Integer>(String.format("has at most %d elements", bulkSize)) {
public boolean matchesSafely(Integer size) {
return size <= bulkSize;
}
});
Assert.assertTrue(String.format("expected at most %d paths", bulkSize), pathList.getPaths().size() <= bulkSize);

Copy link
Contributor Author

@arielshaqed arielshaqed Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will add the complete sentence (efficiency doesn't matter here, and I guess it's shorter than the full Matcher boilerplate...).
Thanks!

synchronized(expected) {
for(String p: pathList.getPaths()) {
Assert.assertTrue(expected.remove(p));
}
}
return new ObjectErrorList();
}

public void verify() {
Assert.assertEquals(java.util.Collections.emptySet(), expected);
}
}

protected void goodBulkCase(int bulkSize, int numPaths) throws IOException {
Set<String> toDelete = new HashSet<>();
for (int i = 0; i < numPaths; i++) {
toDelete.add(String.format("%d", i));
}
Callback callback = new Callback(bulkSize, numPaths, toDelete);

BulkDeleter deleter = new BulkDeleter(executorService, callback, "repo", "branch", 50);
for (int i = 0; i < numPaths; i++) {
deleter.add(String.format("%d", i));
}

deleter.close();
callback.verify();
}

@Test
public void exactGoodBatches() throws IOException {
goodBulkCase(50, 100);
}

@Test
public void inexactGoodBatches() throws IOException {
goodBulkCase(50, 103);
}

@Test
public void exactGoodSingleBatch() throws IOException {
goodBulkCase(50, 50);
}

@Test
public void inexactGoodSingleBatch() throws IOException {
goodBulkCase(50, 47);
}

@Test
public void exactGoodManyBatches() throws IOException {
goodBulkCase(50, 500);
}

@Test
public void inexactGoodManyBatches() throws IOException {
goodBulkCase(50, 493);
}
}
Loading