Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent sanitiser errors when shutting down subscription and client in quick succession #1549

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions aeron-client/src/main/c/aeron_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -861,12 +861,19 @@ int aeron_exclusive_publication_async_remove_destination(
async, &client->conductor, publication, uri);
}

int aeron_client_handler_cmd_await_processed(aeron_client_handler_cmd_t *cmd)
int aeron_client_handler_cmd_await_processed(aeron_client_handler_cmd_t *cmd, uint64_t timeout_ms)
{
bool processed = cmd->processed;

int64_t deadline_ms = aeron_epoch_clock() + timeout_ms;

while (!processed)
{
if (deadline_ms <= aeron_epoch_clock())
{
AERON_SET_ERR(ETIMEDOUT, "%s", "time out waiting for client conductor thread to process message");
return -1;
}

sched_yield();
AERON_GET_VOLATILE(processed, cmd->processed);
}
Expand Down Expand Up @@ -895,7 +902,7 @@ int aeron_add_available_counter_handler(aeron_t *client, aeron_on_available_coun
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_remove_available_counter_handler(aeron_t *client, aeron_on_available_counter_pair_t *pair)
Expand All @@ -919,7 +926,7 @@ int aeron_remove_available_counter_handler(aeron_t *client, aeron_on_available_c
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_add_unavailable_counter_handler(aeron_t *client, aeron_on_unavailable_counter_pair_t *pair)
Expand All @@ -943,7 +950,7 @@ int aeron_add_unavailable_counter_handler(aeron_t *client, aeron_on_unavailable_
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_remove_unavailable_counter_handler(aeron_t *client, aeron_on_unavailable_counter_pair_t *pair)
Expand All @@ -967,7 +974,7 @@ int aeron_remove_unavailable_counter_handler(aeron_t *client, aeron_on_unavailab
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_add_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pair)
Expand All @@ -991,7 +998,7 @@ int aeron_add_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pair)
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}

int aeron_remove_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pair)
Expand All @@ -1015,5 +1022,5 @@ int aeron_remove_close_handler(aeron_t *client, aeron_on_close_client_pair_t *pa
return -1;
}

return aeron_client_handler_cmd_await_processed(&cmd);
return aeron_client_handler_cmd_await_processed(&cmd, aeron_context_get_driver_timeout_ms(client->context));
}
6 changes: 6 additions & 0 deletions aeron-client/src/main/cpp_wrapper/Aeron.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Aeron

~Aeron()
{
aeron_on_close_client_pair_t closePair = {emptyCallback, nullptr};
aeron_add_close_handler(m_aeron, &closePair);
aeron_close(m_aeron);
aeron_context_close(m_context.m_context);

Expand Down Expand Up @@ -985,6 +987,10 @@ class Aeron
on_close_client_t &callback = *reinterpret_cast<on_close_client_t *>(clientd);
callback();
}

static void emptyCallback(void *clientd)
{
}
};
}

Expand Down
41 changes: 41 additions & 0 deletions aeron-client/src/test/cpp_wrapper/SystemTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,44 @@ TEST_F(SystemTest, shouldAddRemoveCloseHandler)
EXPECT_EQ(1, closeCount1);
EXPECT_EQ(0, closeCount2);
}

//
// These tests will fail with the sanitizer if not implemented correctly.
//

TEST_F(SystemTest, shouldFreeSubscritionDataCorrectly)
mikeb01 marked this conversation as resolved.
Show resolved Hide resolved
{
{
Context ctx;
ctx.useConductorAgentInvoker(false);

std::shared_ptr<Aeron> aeron = Aeron::connect(ctx);
int64_t i = aeron->addSubscription("aeron:ipc", 1000);
std::shared_ptr<Subscription> subscription;
do
{
subscription = aeron->findSubscription(i);
}
while (nullptr == subscription);
}
}

TEST_F(SystemTest, shouldFreeSubscritionDataCorrectlyWithInvoker)
{
{
Context ctx;
ctx.useConductorAgentInvoker(true);
std::shared_ptr<Aeron> aeron = Aeron::connect(ctx);
AgentInvoker<ClientConductor> &invoker = aeron->conductorAgentInvoker();
invoker.start();

int64_t i = aeron->addSubscription("aeron:ipc", 1000);
std::shared_ptr<Subscription> subscription;
do
{
invoker.invoke();
subscription = aeron->findSubscription(i);
}
while (nullptr == subscription);
}
}
Loading