Skip to content

Commit

Permalink
Release participant_stateless secure builtin writer history change …
Browse files Browse the repository at this point in the history
…when authentication has finished (#5386)

* TMP: REMOVE THIS COMMIT

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #22033: BB test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #22033: Modify secure builtins initial payload size

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #22033: Fix: release stateless msg payload pool when participant cryptography succeeds

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit b414621)
  • Loading branch information
Mario-DL authored and mergify[bot] committed Nov 12, 2024
1 parent ad9b4eb commit 39943e6
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 20 deletions.
30 changes: 25 additions & 5 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,11 @@ void SecurityManager::delete_participant_stateless_message_entities()
void SecurityManager::create_participant_stateless_message_pool()
{
participant_stateless_message_writer_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 20, 100 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE, 20, 100};
participant_stateless_message_reader_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 5000 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE, 10, 5000};

BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize() };
BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE};
participant_stateless_message_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipantStatelessMessage", cfg);

PoolConfig writer_cfg = PoolConfig::from_history_attributes(participant_stateless_message_writer_hattr_);
Expand Down Expand Up @@ -1221,7 +1221,7 @@ void SecurityManager::delete_participant_volatile_message_secure_entities()
void SecurityManager::create_participant_volatile_message_secure_pool()
{
participant_volatile_message_secure_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 0 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE, 10, 0 };

PoolConfig pool_cfg = PoolConfig::from_history_attributes(participant_volatile_message_secure_hattr_);
participant_volatile_message_secure_pool_ =
Expand Down Expand Up @@ -1720,6 +1720,7 @@ void SecurityManager::process_participant_volatile_message_secure(
const GUID_t remote_participant_key(message.message_identity().source_guid().guidPrefix,
c_EntityId_RTPSParticipant);
std::shared_ptr<ParticipantCryptoHandle> remote_participant_crypto;
DiscoveredParticipantInfo::AuthUniquePtr remote_participant_info;

// Search remote participant crypto handle.
{
Expand All @@ -1735,6 +1736,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}

remote_participant_crypto = dp_it->second->get_participant_crypto();
remote_participant_info = dp_it->second->get_auth();
}
else
{
Expand All @@ -1756,12 +1758,30 @@ void SecurityManager::process_participant_volatile_message_secure(
EPROSIMA_LOG_ERROR(SECURITY, "Cannot set remote participant crypto tokens ("
<< remote_participant_key << ") - (" << exception.what() << ")");
}
else
{
// Release the change from the participant_stateless_message_writer_pool_
// As both participants have already authorized each other

if (remote_participant_info &&
remote_participant_info->change_sequence_number_ != SequenceNumber_t::unknown())
{
participant_stateless_message_writer_history_->remove_change(
remote_participant_info->change_sequence_number_);
remote_participant_info->change_sequence_number_ = SequenceNumber_t::unknown();
}
}
}
else
{
std::lock_guard<shared_mutex> _(mutex_);
remote_participant_pending_messages_.emplace(remote_participant_key, std::move(message.message_data()));
}

if (remote_participant_info)
{
restore_discovered_participant_info(remote_participant_key, remote_participant_info);
}
}
else if (message.message_class_id().compare(GMCLASSID_SECURITY_READER_CRYPTO_TOKENS) == 0)
{
Expand Down Expand Up @@ -1917,7 +1937,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}
else
{
EPROSIMA_LOG_INFO(SECURITY, "Discarted ParticipantGenericMessage with class id " << message.message_class_id());
EPROSIMA_LOG_INFO(SECURITY, "Discarded ParticipantGenericMessage with class id " << message.message_class_id());
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/cpp/rtps/security/SecurityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct EndpointSecurityAttributes;
*/
class SecurityManager : private WriterListener
{
static constexpr std::size_t PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE = 8192;
static constexpr std::size_t PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE = 1024;

public:

/**
Expand Down Expand Up @@ -401,14 +404,19 @@ class SecurityManager : private WriterListener
}

AuthenticationInfo(
AuthenticationInfo&& auth)
AuthenticationInfo&& auth) noexcept
: identity_handle_(std::move(auth.identity_handle_))
, handshake_handle_(std::move(auth.handshake_handle_))
, auth_status_(auth.auth_status_)
, expected_sequence_number_(auth.expected_sequence_number_)
, change_sequence_number_(std::move(auth.change_sequence_number_))
, event_(std::move(auth.event_))
{
auth.identity_handle_ = nullptr;
auth.handshake_handle_ = nullptr;
auth.auth_status_ = AUTHENTICATION_NOT_AVAILABLE;
auth.expected_sequence_number_ = 0;
auth.change_sequence_number_ = SequenceNumber_t::unknown();
}

int32_t handshake_requests_sent_;
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,16 +834,28 @@ class PubSubReader
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Reader is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Reader authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1134,6 +1146,15 @@ class PubSubReader
return *this;
}

PubSubReader& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubReader& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,16 +700,28 @@ class PubSubWriter
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Writer is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Writer authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1080,6 +1092,15 @@ class PubSubWriter
return *this;
}

PubSubWriter& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubWriter& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
110 changes: 106 additions & 4 deletions test/blackbox/common/BlackboxTestsSecurity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3528,11 +3528,11 @@ TEST_F(SecurityPkcs, BuiltinAuthenticationAndAccessAndCryptoPlugin_pkcs11_key)

static void CommonPermissionsConfigure(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
const std::string& governance_file,
const std::string& permissions_file)
const std::string& permissions_file,
const PropertyPolicy& extra_properties)
{
PropertyPolicy sub_property_policy;
PropertyPolicy sub_property_policy(extra_properties);
sub_property_policy.properties().emplace_back(Property("dds.sec.auth.plugin",
"builtin.PKI-DH"));
sub_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_ca",
Expand All @@ -3551,9 +3551,18 @@ static void CommonPermissionsConfigure(
"file://" + std::string(certs_path) + "/" + governance_file));
sub_property_policy.properties().emplace_back(Property("dds.sec.access.builtin.Access-Permissions.permissions",
"file://" + std::string(certs_path) + "/" + permissions_file));

reader.property_policy(sub_property_policy);
}

static void CommonPermissionsConfigure(
PubSubWriter<HelloWorldPubSubType>& writer,
const std::string& governance_file,
const std::string& permissions_file,
const PropertyPolicy& extra_properties)
{
PropertyPolicy pub_property_policy(extra_properties);

PropertyPolicy pub_property_policy;
pub_property_policy.properties().emplace_back(Property("dds.sec.auth.plugin",
"builtin.PKI-DH"));
pub_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_ca",
Expand All @@ -3572,9 +3581,21 @@ static void CommonPermissionsConfigure(
"file://" + std::string(certs_path) + "/" + governance_file));
pub_property_policy.properties().emplace_back(Property("dds.sec.access.builtin.Access-Permissions.permissions",
"file://" + std::string(certs_path) + "/" + permissions_file));

writer.property_policy(pub_property_policy);
}

static void CommonPermissionsConfigure(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
const std::string& governance_file,
const std::string& permissions_file,
const PropertyPolicy& extra_properties = PropertyPolicy())
{
CommonPermissionsConfigure(reader, governance_file, permissions_file, extra_properties);
CommonPermissionsConfigure(writer, governance_file, permissions_file, extra_properties);
}

static void BuiltinAuthenticationAndAccessAndCryptoPlugin_Permissions_validation_ok_common(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
Expand Down Expand Up @@ -5086,6 +5107,87 @@ TEST(Security, security_with_initial_peers_over_tcpv4_correctly_behaves)
tcp_server.block_for_all(std::chrono::seconds(10));
}

// Regression test for Redmine issue #22033
// Authorized participants shall remove the changes from the
// participants secure stateless msgs pool
TEST(Security, participant_stateless_secure_writer_pool_change_is_removed_upon_participant_authentication)
{
struct TestConsumer : public eprosima::fastdds::dds::LogConsumer
{
TestConsumer(
std::atomic_size_t& n_logs_ref)
: n_logs_(n_logs_ref)
{
}

void Consume(
const eprosima::fastdds::dds::Log::Entry&) override
{
++n_logs_;
}

private:

std::atomic_size_t& n_logs_;
};

// Counter for log entries
std::atomic<size_t>n_logs{};

// Prepare Log module to check that no SECURITY errors are produced
eprosima::fastdds::dds::Log::SetCategoryFilter(std::regex("SECURITY"));
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Error);
eprosima::fastdds::dds::Log::RegisterConsumer(std::unique_ptr<eprosima::fastdds::dds::LogConsumer>(new TestConsumer(
n_logs)));

const size_t n_participants = 20;

// Create 21 secure participants
std::vector<std::shared_ptr<PubSubReader<HelloWorldPubSubType>>> participants;
participants.reserve(n_participants + 1);

for (size_t i = 0; i < n_participants + 1; ++i)
{
participants.emplace_back(std::make_shared<PubSubReader<HelloWorldPubSubType>>("HelloWorldTopic"));
// Configure security
const std::string governance_file("governance_helloworld_all_enable.smime");
const std::string permissions_file("permissions_helloworld.smime");

PropertyPolicy handshake_prop_policy;

handshake_prop_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.max_handshake_requests",
"10000000"));
handshake_prop_policy.properties().emplace_back(Property(
"dds.sec.auth.builtin.PKI-DH.initial_handshake_resend_period",
"250"));
handshake_prop_policy.properties().emplace_back(Property(
"dds.sec.auth.builtin.PKI-DH.handshake_resend_period_gain",
"1.0"));

CommonPermissionsConfigure(*participants.back(), governance_file, permissions_file, handshake_prop_policy);

// Init all except the latest one
if (i != n_participants)
{
participants.back()->init();
ASSERT_TRUE(participants.back()->isInitialized());
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

// Wait for the first participant to authenticate the rest
participants.front()->waitAuthorized(std::chrono::seconds::zero(), n_participants - 1);

// Init the last one
participants.back()->init();
ASSERT_TRUE(participants.back()->isInitialized());

participants.front()->waitAuthorized(std::chrono::seconds::zero(), n_participants);

// No SECURITY error logs should have been produced
eprosima::fastdds::dds::Log::Flush();
EXPECT_EQ(0u, n_logs);
}

void blackbox_security_init()
{
Expand Down

0 comments on commit 39943e6

Please sign in to comment.