Skip to content

Commit

Permalink
improve stream-to-file handling
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Oct 29, 2021
1 parent 3cd1f37 commit 71e18e2
Showing 1 changed file with 29 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
Expand All @@ -51,8 +50,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -78,6 +75,7 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
Expand All @@ -90,15 +88,13 @@ class RecordingsPostHandler implements RequestHandler {

private static final Pattern RECORDING_FILENAME_PATTERN =
Pattern.compile("([A-Za-z\\d-]*)_([A-Za-z\\d-_]*)_([\\d]*T[\\d]*Z)(\\.[\\d]+)?");
private static final Duration READ_TIMEOUT = Duration.ofSeconds(10);

private final AuthManager auth;
private final FileSystem fs;
private final Path savedRecordingsPath;
private final Gson gson;
private final Logger logger;
private final NotificationFactory notificationFactory;
private final AtomicLong lastReadTimestamp = new AtomicLong(0);

private static final String NOTIFICATION_CATEGORY = "RecordingSaved";

Expand Down Expand Up @@ -179,44 +175,47 @@ public void handle(RoutingContext ctx) {
.resolve(UUID.randomUUID().toString())
.toString();
CompletableFuture<String> fileUploadPath = new CompletableFuture<>();
long timerId =
ctx.vertx()
.setPeriodic(
READ_TIMEOUT.toMillis(),
id -> {
if (System.nanoTime() - lastReadTimestamp.get()
> READ_TIMEOUT.toNanos()) {
fileUploadPath.completeExceptionally(
new TimeoutException());
}
});
ctx.vertx()
.fileSystem()
.open(
destinationFile,
new OpenOptions().setAppend(true).setCreateNew(true),
new OpenOptions().setCreateNew(true).setWrite(true).setAppend(true),
openFile -> {
if (openFile.failed()) {
ctx.fail(new HttpStatusException(500, openFile.cause()));
ctx.fail(openFile.cause());
return;
}
AsyncFile as = openFile.result();
ctx.request()
.pipeTo(
as,
pipe -> {
if (pipe.failed()) {
ctx.fail(pipe.cause());
return;
}
});
ctx.request()
.handler(
buffer -> {
lastReadTimestamp.set(System.nanoTime());
openFile.result().write(buffer);
})
.exceptionHandler(fileUploadPath::completeExceptionally)
.endHandler(
v -> {
ctx.vertx().cancelTimer(timerId);
openFile.result().close();
fileUploadPath.complete(destinationFile);
as.flush(
flush -> {
if (flush.failed()) {
ctx.fail(flush.cause());
return;
}
as.close(
close -> {
if (close.failed()) {
ctx.fail(close.cause());
return;
}
fileUploadPath.complete(
destinationFile);
});
});
});
ctx.addEndHandler(
ar -> {
ctx.vertx().cancelTimer(timerId);
});
ctx.request().resume();
});

Expand Down Expand Up @@ -332,7 +331,6 @@ private void validateRecording(
}
event.complete();
} catch (CouldNotLoadRecordingException | IOException e) {
// FIXME need to reject the request and clean up the file here
event.fail(e);
}
},
Expand Down

0 comments on commit 71e18e2

Please sign in to comment.