Skip to content

Commit

Permalink
Add async write() function - fixes "stalled" uploads deadlocking S3 C…
Browse files Browse the repository at this point in the history
…lient (#418)
  • Loading branch information
graebm authored Apr 8, 2024
1 parent eb3b2bd commit 3334843
Show file tree
Hide file tree
Showing 10 changed files with 1,065 additions and 192 deletions.
40 changes: 32 additions & 8 deletions include/aws/s3/private/s3_meta_request_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,11 @@ struct aws_s3_meta_request {
/* The meta request's outgoing body comes from one of these:
* 1) request_body_async_stream: if set, then async stream 1 part at a time
* 2) request_body_parallel_stream: if set, then stream multiple parts in parallel
* 3) initial_request_message's body_stream: else synchronously stream parts */
* 3) request_body_using_async_writes: if set, then synchronously copy async_write data from 1 part at a time
* 4) initial_request_message's body_stream: else synchronously stream parts */
struct aws_async_input_stream *request_body_async_stream;
struct aws_parallel_input_stream *request_body_parallel_stream;

/* Whether to let this meta-request exceed the regular limits on num-request-being-prepared.
* This lets as many async-stream reads be pending as possible, reducing the chance of deadlock
* when the user can't control when data arrives. */
bool maximize_async_stream_reads;
bool request_body_using_async_writes;

/* Part size to use for uploads and downloads. Passed down by the creating client. */
const size_t part_size;
Expand Down Expand Up @@ -234,6 +231,35 @@ struct aws_s3_meta_request {
/* To track aws_s3_requests with cancellable HTTP streams */
struct aws_linked_list cancellable_http_streams_list;

/* Data for async-writes.
* Currently, for a given meta request, only 1 async-write is allowed at a time.
*
* When the user calls write(), they may not provide enough data for us to send an UploadPart.
* In that case, we copy the data to a buffer and immediately mark the write complete,
* so the user can write more data, so we finally get enough to send. */
struct {
/* The future for whatever async-write is pending.
* If this is NULL, there isn't enough data to send another part.
*
* If this is non-NULL, 1+ part requests can be sent.
* When all the data has been processed, this future is completed
* and cleared, and we can accept another write() call. */
struct aws_future_void *future;

/* True once user passes `eof` to their final write() call */
bool eof;

/* Holds buffered data we can't immediately send.
* The length will always be less than part-size */
struct aws_byte_buf buffered_data;

/* Cursor/pointer to data from the most-recent write() call, which
* provides enough data (combined with any buffered_data) to send 1+ parts.
* If there's data leftover in unbuffered_cursor after these parts are sent,
* it's copied into buffered_data, and we wait for more writes... */
struct aws_byte_cursor unbuffered_cursor;
} async_write;

} synced_data;

/* Anything in this structure should only ever be accessed by the client on its process work event loop task. */
Expand Down Expand Up @@ -390,8 +416,6 @@ struct aws_future_bool *aws_s3_meta_request_read_body(
uint64_t offset,
struct aws_byte_buf *buffer);

bool aws_s3_meta_request_body_has_no_more_data(const struct aws_s3_meta_request *meta_request);

/* Set the meta request finish result as failed. This is meant to be called sometime before aws_s3_meta_request_finish.
* Subsequent calls to this function or to aws_s3_meta_request_set_success_synced will not overwrite the end result of
* the meta request. */
Expand Down
1 change: 1 addition & 0 deletions include/aws/s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum aws_s3_errors {
AWS_ERROR_S3_INVALID_MEMORY_LIMIT_CONFIG,
AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED,
AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE,
AWS_ERROR_S3_REQUEST_HAS_COMPLETED,

AWS_ERROR_S3_END_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_S3_PACKAGE_ID)
};
Expand Down
69 changes: 59 additions & 10 deletions include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ struct aws_s3_checksum_config {
* There are several ways to pass the request's body data:
* 1) If the data is already in memory, set the body-stream on `message`.
* 2) If the data is on disk, set `send_filepath` for best performance.
* 3) If the data will be be produced in asynchronous chunks, set `send_async_stream`.
* 3) If the data is available, but copying each chunk is asynchronous, set `send_async_stream`.
* 4) If you're not sure when each chunk of data will be available, use `send_using_async_writes`.
*/
struct aws_s3_meta_request_options {
/* The type of meta request we will be trying to accelerate. */
Expand Down Expand Up @@ -615,23 +616,27 @@ struct aws_s3_meta_request_options {
* Optional - EXPERIMENTAL/UNSTABLE
* If set, the request body comes from this async stream.
* Use this when outgoing data will be produced in asynchronous chunks.
* The S3 client will read from the stream whenever it's ready to upload another chunk.
*
* WARNING: The S3 client can deadlock if many async streams are "stalled",
* never completing their async read. If you're not sure when (if ever)
* data will be ready, use `send_using_async_writes` instead.
*
* Do not set if the body is being passed by other means (see note above).
*/
struct aws_async_input_stream *send_async_stream;

/**
* NOT FOR PUBLIC USE
* Optional - EXPERIMENTAL/UNSTABLE
* Set this to send request body data using the async aws_s3_meta_request_write() function.
* Use this when outgoing data will be produced in asynchronous chunks,
* and you're not sure when (if ever) each chunk will be ready.
*
* The S3 client can currently deadlock if too many uploads using
* `send_async_stream` are "stalled" and failing to provide data.
* Set this true to raise the number of "stalled" meta-requests the S3 client
* can tolerate before it deadlocks. The downside of setting this is that
* the S3 client will use as much memory as it is allowed.
* (see `aws_s3_client_config.memory_limit_in_bytes`).
* This only works with AWS_S3_META_REQUEST_TYPE_PUT_OBJECT.
*
* This setting will be removed when a better solution is developed.
* Do not set if the body is being passed by other means (see note above).
*/
bool maximize_async_stream_reads_internal_use_only;
bool send_using_async_writes;

/**
* Optional.
Expand Down Expand Up @@ -829,6 +834,50 @@ struct aws_s3_meta_request *aws_s3_client_make_meta_request(
struct aws_s3_client *client,
const struct aws_s3_meta_request_options *options);

/**
* Write the next chunk of data.
*
* You must set `aws_s3_meta_request_options.send_using_async_writes` to use this function.
*
* This function is asynchronous, and returns a future (see <aws/io/future.h>).
* You may not call write() again until the future completes.
*
* If the future completes with an error code, then write() did not succeed
* and you should not call it again. If the future contains any error code,
* the meta request is guaranteed to finish soon (you don't need to worry about
* canceling the meta request yourself after a failed write).
* A common error code is AWS_ERROR_S3_REQUEST_HAS_COMPLETED, indicating
* the meta request completed for reasons unrelated to the write() call
* (e.g. CreateMultipartUpload received a 403 Forbidden response).
* AWS_ERROR_INVALID_STATE usually indicates that you're calling write()
* incorrectly (e.g. not waiting for previous write to complete).
*
* You MUST keep the data in memory until the future completes.
* If you need to free the memory early, call aws_s3_meta_request_cancel().
* cancel() will synchronously complete the future from any pending write with
* error code AWS_ERROR_S3_REQUEST_HAS_COMPLETED.
*
* You can wait any length of time between calls to write().
* If there's not enough data to upload a part, the data will be copied
* to a buffer and the future will immediately complete.
*
* @param meta_request Meta request
*
* @param data The data to send. The data can be any size.
*
* @param eof Pass true to signal EOF (end of file).
* Do not call write() again after passing true.
*
* This function never returns NULL.
*
* WARNING: This feature is experimental.
*/
AWS_S3_API
struct aws_future_void *aws_s3_meta_request_write(
struct aws_s3_meta_request *meta_request,
struct aws_byte_cursor data,
bool eof);

/**
* Increment the flow-control window, so that response data continues downloading.
*
Expand Down
2 changes: 1 addition & 1 deletion source/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static struct aws_error_info s_errors[] = {
"Memory limit should be at least 1GiB. Part size and max part size should be smaller than memory limit."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED, "CreateSession call failed when signing with S3 Express."),
AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE, "part_size mismatch, possibly due to wrong object_size_hint. Retrying with Range instead of partNumber."),

AWS_DEFINE_ERROR_INFO_S3(AWS_ERROR_S3_REQUEST_HAS_COMPLETED, "Request has already completed, action cannot be performed."),
};
/* clang-format on */

Expand Down
7 changes: 7 additions & 0 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,13 @@ static bool s_should_skip_scheduling_more_parts_based_on_flags(
return auto_ranged_put->synced_data.num_parts_pending_read > 0;
}

/* If doing async-writes, only allow a new part if there's a pending write-future,
* and no pending-reads yet to copy that data. */
if (auto_ranged_put->base.request_body_using_async_writes == true) {
return (auto_ranged_put->base.synced_data.async_write.future == NULL) ||
(auto_ranged_put->synced_data.num_parts_pending_read > 0);
}

/* If this is the conservative pass, only allow 1 pending-read.
* Reads are serial anyway, so queuing up a whole bunch isn't necessarily a speedup. */
if ((flags & AWS_S3_META_REQUEST_UPDATE_FLAG_CONSERVATIVE) != 0) {
Expand Down
32 changes: 23 additions & 9 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1140,14 +1140,35 @@ static struct aws_s3_meta_request *s_s3_client_meta_request_factory_default(
if (options->send_filepath.len > 0) {
++body_source_count;
}
if (options->send_using_async_writes == true) {
if (options->type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) {
/* TODO: we could support async-writes for DEFAULT type too, just takes work & testing */
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"Could not create meta request."
"send-using-data-writes can only be used with auto-ranged-put.");
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
if (content_length_found) {
/* TODO: we could support async-writes with content-length, just takes work & testing */
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"Could not create meta request."
"send-using-data-writes can only be used when Content-Length is unknown.");
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
++body_source_count;
}
if (options->send_async_stream != NULL) {
++body_source_count;
}
if (body_source_count > 1) {
AWS_LOGF_ERROR(
AWS_LS_S3_META_REQUEST,
"Could not create auto-ranged-put meta request."
" More than one data source is set (filepath, async stream, body stream).");
"Could not create meta request."
" More than one data source is set (filepath, async stream, body stream, data writes).");
aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
return NULL;
}
Expand Down Expand Up @@ -1716,13 +1737,6 @@ static bool s_s3_client_should_update_meta_request(
}
}

/* If maximize_async_stream_reads, let this meta-request ignore max_requests_prepare & max_requests_in_flight.
* We need to maximize the number of async-streams being read from, because the user has no idea
* when data will arrive to any of them. */
if (meta_request->request_body_async_stream != NULL && meta_request->maximize_async_stream_reads) {
return true;
}

/**
* If number of being-prepared + already-prepared-and-queued requests is more than the max that can
* be in the preparation stage.
Expand Down
Loading

0 comments on commit 3334843

Please sign in to comment.