Skip to content

Commit

Permalink
Prevent sanitiser errors when shutting down subscription and client i…
Browse files Browse the repository at this point in the history
…n quick succession (#1549)

* [C/C++] Use the C client close handler to ensure that the client conductor thread drains before destructing the Aeron resources in the C++ wrapper.

* [C] Spelling.
  • Loading branch information
mikeb01 authored Feb 25, 2024
1 parent 588c2a7 commit 30d3200
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
23 changes: 15 additions & 8 deletions aeron-client/src/main/c/aeron_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -861,12 +861,19 @@ int aeron_exclusive_publication_async_remove_destination(
async, &client->conductor, publication, uri);
}

int aeron_client_handler_cmd_await_processed(aeron_client_handler_cmd_t *cmd)
int aeron_client_handler_cmd_await_processed(aeron_client_handler_cmd_t *cmd, uint64_t timeout_ms)
{
bool processed = cmd->processed;

int64_t deadline_ms = aeron_epoch_clock() + timeout_ms;

while (!processed)
{
if (deadline_ms <= aeron_epoch_clock())
{
AERON_SET_ERR(ETIMEDOUT, "%s", "time out waiting for client conductor thread to process message");
return -1;
}

sched_yield();
AERON_GET_VOLATILE(processed, cmd->processed);
}
Expand Down Expand Up @@ -895,7 +902,7 @@ int aeron_add_available_counter_handler(aeron_t *client, aeron_on_available_coun
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_remove_available_counter_handler(aeron_t *client, aeron_on_available_counter_pair_t *pair)
Expand All @@ -919,7 +926,7 @@ int aeron_remove_available_counter_handler(aeron_t *client, aeron_on_available_c
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_add_unavailable_counter_handler(aeron_t *client, aeron_on_unavailable_counter_pair_t *pair)
Expand All @@ -943,7 +950,7 @@ int aeron_add_unavailable_counter_handler(aeron_t *client, aeron_on_unavailable_
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_remove_unavailable_counter_handler(aeron_t *client, aeron_on_unavailable_counter_pair_t *pair)
Expand All @@ -967,7 +974,7 @@ int aeron_remove_unavailable_counter_handler(aeron_t *client, aeron_on_unavailab
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_add_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pair)
Expand All @@ -991,7 +998,7 @@ int aeron_add_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pair)
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_remove_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pair)
Expand All @@ -1015,5 +1022,5 @@ int aeron_remove_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pa
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}
6 changes: 6 additions & 0 deletions aeron-client/src/main/cpp_wrapper/Aeron.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Aeron

~Aeron()
{
aeron_on_close_client_pair_t closePair = {emptyCallback, nullptr};
aeron_add_close_handler(m_aeron, &closePair);
aeron_close(m_aeron);
aeron_context_close(m_context.m_context);

Expand Down Expand Up @@ -985,6 +987,10 @@ class Aeron
on_close_client_t &callback = *reinterpret_cast<on_close_client_t *>(clientd);
callback();
}

static void emptyCallback(void *clientd)
{
}
};
}

Expand Down
41 changes: 41 additions & 0 deletions aeron-client/src/test/cpp_wrapper/SystemTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,44 @@ TEST_F(SystemTest, shouldAddRemoveCloseHandler)
EXPECT_EQ(1, closeCount1);
EXPECT_EQ(0, closeCount2);
}

//
// These tests will fail with the sanitizer if not implemented correctly.
//

TEST_F(SystemTest, shouldFreeSubscriptionDataCorrectly)
{
{
Context ctx;
ctx.useConductorAgentInvoker(false);

std::shared_ptr<Aeron> aeron = Aeron::connect(ctx);
int64_t i = aeron->addSubscription("aeron:ipc", 1000);
std::shared_ptr<Subscription> subscription;
do
{
subscription = aeron->findSubscription(i);
}
while (nullptr == subscription);
}
}

TEST_F(SystemTest, shouldFreeSubscriptionDataCorrectlyWithInvoker)
{
{
Context ctx;
ctx.useConductorAgentInvoker(true);
std::shared_ptr<Aeron> aeron = Aeron::connect(ctx);
AgentInvoker<ClientConductor> &invoker = aeron->conductorAgentInvoker();
invoker.start();

int64_t i = aeron->addSubscription("aeron:ipc", 1000);
std::shared_ptr<Subscription> subscription;
do
{
invoker.invoke();
subscription = aeron->findSubscription(i);
}
while (nullptr == subscription);
}
}

0 comments on commit 30d3200

Please sign in to comment.