Skip to content

Commit

Permalink
Use recv path (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Sep 5, 2024
1 parent a7af556 commit e780f10
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 74 deletions.
32 changes: 1 addition & 31 deletions runners/s3-benchrunner-c/CRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ class Task
promise<void> donePromise;
future<void> doneFuture;

FILE *downloadFile = NULL;

static int onDownloadData(
struct aws_s3_meta_request *meta_request,
const struct aws_byte_cursor *body,
uint64_t range_start,
void *user_data);

static void onFinished(
struct aws_s3_meta_request *meta_request,
const struct aws_s3_meta_request_result *meta_request_result,
Expand Down Expand Up @@ -295,10 +287,7 @@ Task::Task(CRunner &runner, size_t taskI)

if (runner.config.filesOnDisk)
{
downloadFile = fopen(config.key.c_str(), "wb");
AWS_FATAL_ASSERT(downloadFile != NULL);

options.body_callback = Task::onDownloadData;
options.recv_filepath = toCursor(config.key);
}
}
else
Expand Down Expand Up @@ -367,29 +356,10 @@ void Task::onFinished(
}

// clean up task
if (task->downloadFile != NULL)
fclose(task->downloadFile);
aws_s3_meta_request_release(task->metaRequest);
task->donePromise.set_value();
}

int Task::onDownloadData(
struct aws_s3_meta_request *meta_request,
const struct aws_byte_cursor *body,
uint64_t range_start,
void *user_data)
{
auto *task = static_cast<Task *>(user_data);

size_t written = fwrite(body->ptr, 1, body->len, task->downloadFile);
AWS_FATAL_ASSERT(written == body->len);

// Increment read window so data will continue downloading
aws_s3_meta_request_increment_read_window(meta_request, body->len);

return AWS_OP_SUCCESS;
}

int main(int argc, char *argv[])
{
return benchmarkRunnerMain(
Expand Down
2 changes: 1 addition & 1 deletion runners/s3-benchrunner-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<properties>
<!-- build with -Dawscrt.version=1.0.0-SNAPSHOT to use the locally installed dev version) -->
<awscrt.version>[0.30,)</awscrt.version>
<awscrt.version>[0.30.10,)</awscrt.version>
<aws.sdk.version>[2.27,)</aws.sdk.version>

<maven.compiler.release>17</maven.compiler.release>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ public class Main {
/////////////// BEGIN ARBITRARY HARDCODED VALUES ///////////////

// 256MiB is Java Transfer Mgr v2 default.
// TODO: Investigate. At time of writing, this noticeably impacts performance.
public static final int BACKPRESSURE_INITIAL_READ_WINDOW_MiB = 256;
// This benchmark can turn off backpressure and rely solely on the
// memory-limiter.
public static final int BACKPRESSURE_INITIAL_READ_WINDOW_MiB = 0;

/////////////// END ARBITRARY HARD-CODED VALUES ///////////////

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CRTJavaBenchmarkRunner(BenchmarkConfig config, String bucket, String regi
// If writing data to disk, enable backpressure.
// This prevents us from running out of memory due to downloading
// data faster than we can write it to disk.
if (config.filesOnDisk) {
if (config.filesOnDisk && Main.BACKPRESSURE_INITIAL_READ_WINDOW_MiB != 0) {
s3ClientOpts.withReadBackpressureEnabled(true);
s3ClientOpts.withInitialReadWindowSize(Util.bytesFromMiB(Main.BACKPRESSURE_INITIAL_READ_WINDOW_MiB));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
import software.amazon.awssdk.crt.s3.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -27,8 +22,6 @@ class CRTJavaTask implements S3MetaRequestResponseHandler {
TaskConfig config;
S3MetaRequest metaRequest;
CompletableFuture<Void> doneFuture;
ReadableByteChannel uploadFileChannel;
WritableByteChannel downloadFileChannel;

CRTJavaTask(CRTJavaBenchmarkRunner runner, int taskI) {
this.runner = runner;
Expand Down Expand Up @@ -66,12 +59,7 @@ class CRTJavaTask implements S3MetaRequestResponseHandler {
headers.add(new HttpHeader("Content-Length", "0"));

if (runner.config.filesOnDisk) {
try {
downloadFileChannel = FileChannel.open(Path.of(config.key),
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
} catch (IOException e) {
throw new RuntimeException(e);
}
options.withResponseFilePath(Path.of(config.key));
}
} else {
throw new RuntimeException("Unknown task action: " + config.action);
Expand Down Expand Up @@ -102,21 +90,6 @@ void waitUntilDone() {
}
}

@Override
public int onResponseBody(ByteBuffer bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
int amountReceived = bodyBytesIn.remaining();

if (downloadFileChannel != null) {
try {
downloadFileChannel.write(bodyBytesIn);
} catch (IOException e) {
Util.exitWithError("Failed writing to file: " + e.toString());
}
}

return amountReceived;
}

@Override
public void onFinished(S3FinishedResponseContext context) {
if (context.getErrorCode() != 0) {
Expand All @@ -135,17 +108,6 @@ public void onFinished(S3FinishedResponseContext context) {
Util.exitWithError("S3MetaRequest failed");
} else {
// CRTJavaTask succeeded. Clean up...
try {
if (downloadFileChannel != null) {
downloadFileChannel.close();
}
if (uploadFileChannel != null) {
uploadFileChannel.close();
}
} catch (IOException e) {
Util.exitWithError("Failed closing file: " + e.toString());
}

// work around API-gotcha where callbacks can fire on other threads
// before makeMetaRequest() has returned
synchronized (this) {
Expand Down

0 comments on commit e780f10

Please sign in to comment.