Skip to content

Commit

Permalink
Adds Connection Pool Destroy Delay Take 2 (#419)
Browse files Browse the repository at this point in the history
  • Loading branch information
waahm7 authored Apr 4, 2024
1 parent 0ce756e commit eb3b2bd
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 14 deletions.
12 changes: 12 additions & 0 deletions include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ struct aws_s3_client {
/* Task for trimming buffer bool. */
struct aws_task trim_buffer_pool_task;

/* Task to cleanup endpoints */
struct aws_task endpoints_cleanup_task;

/* Number of endpoints currently allocated. Used during clean up to know how many endpoints are still in
* memory.*/
uint32_t num_endpoints_allocated;
Expand All @@ -376,6 +379,9 @@ struct aws_s3_client {
/* True if client has been flagged to finish destroying itself. Used to catch double-destroy bugs.*/
uint32_t finish_destroy : 1;

/* Whether or not endpoints cleanup task is currently scheduled. */
uint32_t endpoints_cleanup_task_scheduled : 1;

struct aws_s3_upload_part_timeout_stats upload_part_stats;
} synced_data;

Expand Down Expand Up @@ -483,6 +489,12 @@ struct aws_s3_endpoint *aws_s3_endpoint_acquire(struct aws_s3_endpoint *endpoint
* from the client's hashtable) */
void aws_s3_endpoint_release(struct aws_s3_endpoint *endpoint);

/*
* Destroys the endpoint. Before calling this function, the endpoint must be removed from the Client's hash table, and
* its ref count must be zero. You MUST NOT call this while the client's lock is held.
*/
void aws_s3_endpoint_destroy(struct aws_s3_endpoint *endpoint);

AWS_S3_API
extern const uint32_t g_min_num_connections;

Expand Down
76 changes: 74 additions & 2 deletions source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ static const uint32_t s_default_throughput_failure_interval_seconds = 30;
/* Amount of time spent idling before trimming buffer. */
static const size_t s_buffer_pool_trim_time_offset_in_s = 5;

/* Interval for scheduling endpoints cleanup task. This is to trim endpoints with a zero reference
* count. S3 closes the idle connections in ~5 seconds. */
static const uint32_t s_endpoints_cleanup_time_offset_in_s = 5;

/* Called when ref count is 0. */
static void s_s3_client_start_destroy(void *user_data);

Expand All @@ -98,6 +102,8 @@ static void s_s3_client_body_streaming_elg_shutdown(void *user_data);

static void s_s3_client_create_connection_for_request(struct aws_s3_client *client, struct aws_s3_request *request);

static void s_s3_endpoints_cleanup_task(struct aws_task *task, void *arg, enum aws_task_status task_status);

/* Callback which handles the HTTP connection retrieved by acquire_http_connection. */
static void s_s3_client_on_acquire_http_connection(
struct aws_http_connection *http_connection,
Expand Down Expand Up @@ -563,6 +569,8 @@ struct aws_s3_client *aws_s3_client_new(
aws_hash_callback_string_eq,
aws_hash_callback_string_destroy,
NULL);
aws_task_init(
&client->synced_data.endpoints_cleanup_task, s_s3_endpoints_cleanup_task, client, "s3_endpoints_cleanup_task");

/* Initialize shutdown options and tracking. */
client->shutdown_callback = client_config->shutdown_callback;
Expand Down Expand Up @@ -629,7 +637,11 @@ static void s_s3_client_start_destroy(void *user_data) {
aws_s3_client_lock_synced_data(client);

client->synced_data.active = false;

if (!client->synced_data.endpoints_cleanup_task_scheduled) {
client->synced_data.endpoints_cleanup_task_scheduled = true;
aws_event_loop_schedule_task_now(
client->process_work_event_loop, &client->synced_data.endpoints_cleanup_task);
}
/* Prevent the client from cleaning up in between the mutex unlock/re-lock below.*/
client->synced_data.start_destroy_executing = true;

Expand Down Expand Up @@ -1389,6 +1401,59 @@ static void s_s3_client_schedule_buffer_pool_trim_synced(struct aws_s3_client *c
client->threaded_data.trim_buffer_pool_task_scheduled = true;
}

static void s_s3_endpoints_cleanup_task(struct aws_task *task, void *arg, enum aws_task_status task_status) {
(void)task;
(void)task_status;

struct aws_s3_client *client = arg;
struct aws_array_list endpoints_to_release;
aws_array_list_init_dynamic(&endpoints_to_release, client->allocator, 5, sizeof(struct aws_s3_endpoint *));

/* BEGIN CRITICAL SECTION */
aws_s3_client_lock_synced_data(client);
client->synced_data.endpoints_cleanup_task_scheduled = false;

for (struct aws_hash_iter iter = aws_hash_iter_begin(&client->synced_data.endpoints); !aws_hash_iter_done(&iter);
aws_hash_iter_next(&iter)) {
struct aws_s3_endpoint *endpoint = (struct aws_s3_endpoint *)iter.element.value;
if (endpoint->client_synced_data.ref_count == 0) {
aws_array_list_push_back(&endpoints_to_release, &endpoint);
aws_hash_iter_delete(&iter, true);
}
}

/* END CRITICAL SECTION */
aws_s3_client_unlock_synced_data(client);

/* now destroy all endpoints without holding the lock */
size_t list_size = aws_array_list_length(&endpoints_to_release);
for (size_t i = 0; i < list_size; ++i) {
struct aws_s3_endpoint *endpoint;
aws_array_list_get_at(&endpoints_to_release, &endpoint, i);
aws_s3_endpoint_destroy(endpoint);
}

/* Clean up the array list */
aws_array_list_clean_up(&endpoints_to_release);

aws_s3_client_schedule_process_work(client);
}

static void s_s3_client_schedule_endpoints_cleanup_synced(struct aws_s3_client *client) {
ASSERT_SYNCED_DATA_LOCK_HELD(client);
if (client->synced_data.endpoints_cleanup_task_scheduled) {
return;
}
client->synced_data.endpoints_cleanup_task_scheduled = true;
uint64_t now_ns = 0;
aws_event_loop_current_clock_time(client->process_work_event_loop, &now_ns);
aws_event_loop_schedule_task_future(
client->process_work_event_loop,
&client->synced_data.endpoints_cleanup_task,
now_ns +
aws_timestamp_convert(s_endpoints_cleanup_time_offset_in_s, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL));
}

void aws_s3_client_schedule_process_work(struct aws_s3_client *client) {
AWS_PRECONDITION(client);

Expand Down Expand Up @@ -1439,7 +1504,7 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {
aws_linked_list_init(&meta_request_work_list);

/*******************/
/* Step 1: Move relevant data into thread local memory. */
/* Step 1: Move relevant data into thread local memory and schedule cleanups */
/*******************/
AWS_LOGF_DEBUG(
AWS_LS_S3_CLIENT,
Expand All @@ -1454,6 +1519,13 @@ static void s_s3_client_process_work_default(struct aws_s3_client *client) {

if (client->synced_data.active) {
s_s3_client_schedule_buffer_pool_trim_synced(client);
s_s3_client_schedule_endpoints_cleanup_synced(client);
} else if (client->synced_data.endpoints_cleanup_task_scheduled) {
client->synced_data.endpoints_cleanup_task_scheduled = false;
/* Cancel the task to run it sync */
aws_s3_client_unlock_synced_data(client);
aws_event_loop_cancel_task(client->process_work_event_loop, &client->synced_data.endpoints_cleanup_task);
aws_s3_client_lock_synced_data(client);
}

aws_linked_list_swap_contents(&meta_request_work_list, &client->synced_data.pending_meta_request_work);
Expand Down
22 changes: 10 additions & 12 deletions source/s3_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ static struct aws_http_connection_manager *s_s3_endpoint_create_http_connection_

static void s_s3_endpoint_http_connection_manager_shutdown_callback(void *user_data);

static void s_s3_endpoint_ref_count_zero(struct aws_s3_endpoint *endpoint);

static void s_s3_endpoint_acquire(struct aws_s3_endpoint *endpoint, bool already_holding_lock);

static void s_s3_endpoint_release(struct aws_s3_endpoint *endpoint);
Expand Down Expand Up @@ -123,8 +121,6 @@ struct aws_s3_endpoint *aws_s3_endpoint_new(

error_cleanup:

aws_string_destroy(options->host_name);

aws_mem_release(allocator, endpoint);

return NULL;
Expand Down Expand Up @@ -246,7 +242,6 @@ static void s_s3_endpoint_acquire(struct aws_s3_endpoint *endpoint, bool already
aws_s3_client_lock_synced_data(endpoint->client);
}

AWS_ASSERT(endpoint->client_synced_data.ref_count > 0);
++endpoint->client_synced_data.ref_count;

if (!already_holding_lock) {
Expand All @@ -267,29 +262,32 @@ static void s_s3_endpoint_release(struct aws_s3_endpoint *endpoint) {
/* BEGIN CRITICAL SECTION */
aws_s3_client_lock_synced_data(endpoint->client);

bool should_destroy = (endpoint->client_synced_data.ref_count == 1);
bool should_destroy = endpoint->client_synced_data.ref_count == 1 && !endpoint->client->synced_data.active;
if (should_destroy) {
aws_hash_table_remove(&endpoint->client->synced_data.endpoints, endpoint->host_name, NULL, NULL);
} else {
--endpoint->client_synced_data.ref_count;
}
--endpoint->client_synced_data.ref_count;

aws_s3_client_unlock_synced_data(endpoint->client);
/* END CRITICAL SECTION */

if (should_destroy) {
/* The endpoint may have async cleanup to do (connection manager).
/* Do a sync cleanup since client is getting destroyed to avoid any cleanup delay.
* The endpoint may have async cleanup to do (connection manager).
* When that's all done we'll invoke a completion callback.
* Since it's a crime to hold a lock while invoking a callback,
* we make sure that we've released the client's lock before proceeding... */
s_s3_endpoint_ref_count_zero(endpoint);
* we make sure that we've released the client's lock before proceeding...
*/
aws_s3_endpoint_destroy(endpoint);
}
}

static void s_s3_endpoint_ref_count_zero(struct aws_s3_endpoint *endpoint) {
void aws_s3_endpoint_destroy(struct aws_s3_endpoint *endpoint) {
AWS_PRECONDITION(endpoint);
AWS_PRECONDITION(endpoint->http_connection_manager);

AWS_FATAL_ASSERT(endpoint->client_synced_data.ref_count == 0);

struct aws_http_connection_manager *http_connection_manager = endpoint->http_connection_manager;
endpoint->http_connection_manager = NULL;

Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ add_net_test_case(test_s3_get_object_tls_default)
add_net_test_case(test_s3_get_object_less_than_part_size)
add_net_test_case(test_s3_get_object_empty_object)
add_net_test_case(test_s3_get_object_multiple)
add_net_test_case(test_s3_get_object_multiple_serial)
add_net_test_case(test_s3_get_object_sse_kms)
add_net_test_case(test_s3_get_object_sse_aes256)
add_net_test_case(test_s3_get_object_backpressure_small_increments)
Expand Down
47 changes: 47 additions & 0 deletions tests/s3_data_plane_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,53 @@ static int s_test_s3_get_object_multiple(struct aws_allocator *allocator, void *
return 0;
}

AWS_TEST_CASE(test_s3_get_object_multiple_serial, s_test_s3_get_object_multiple_serial)
static int s_test_s3_get_object_multiple_serial(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

struct aws_s3_tester tester;
AWS_ZERO_STRUCT(tester);
ASSERT_SUCCESS(aws_s3_tester_init(allocator, &tester));

struct aws_s3_tester_client_options client_options = {
.part_size = MB_TO_BYTES(5),
};

struct aws_s3_client *client = NULL;
ASSERT_SUCCESS(aws_s3_tester_client_new(&tester, &client_options, &client));

struct aws_byte_cursor object_path = aws_byte_cursor_from_c_str("/pre-existing-10MB");
struct aws_s3_tester_meta_request_options get_options = {
.allocator = allocator,
.meta_request_type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT,
.validate_type = AWS_S3_TESTER_VALIDATE_TYPE_EXPECT_SUCCESS,
.client = client,
.get_options =
{
.object_path = object_path,
},
};

for (size_t i = 0; i < 4; ++i) {
ASSERT_SUCCESS(aws_s3_tester_send_meta_request_with_options(&tester, &get_options, NULL));
}

/* Sleep for some time to wait for the cleanup task to run */
aws_thread_current_sleep(aws_timestamp_convert(7, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL));

/* BEGIN CRITICAL SECTION */
aws_s3_client_lock_synced_data(client);

AWS_ASSERT(client->synced_data.num_endpoints_allocated == 0);

aws_s3_client_unlock_synced_data(client);
/* END CRITICAL SECTION */

client = aws_s3_client_release(client);
aws_s3_tester_clean_up(&tester);
return 0;
}

AWS_TEST_CASE(test_s3_get_object_empty_object, s_test_s3_get_object_empty_default)
static int s_test_s3_get_object_empty_default(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand Down

0 comments on commit eb3b2bd

Please sign in to comment.