Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions score/datarouter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ cc_library(
## Log parser
## ---------------------------------------------------------------------------

cc_library(
name = "logparser_interface",
hdrs = [
"include/logparser/i_logparser.h",
],
features = COMPILER_WARNING_FEATURES,
strip_include_prefix = "include",
visibility = [
"//score/datarouter/file_transfer:__subpackages__",
"//score/datarouter/nonverbose_dlt:__pkg__",
"//score/datarouter/nonverbose_dlt_stub:__pkg__",
"//score/datarouter/persistent_logging:__pkg__",
"//score/datarouter/persistent_logging/persistent_logging:__pkg__",
"//score/datarouter/test:__subpackages__",
],
deps = [
"//score/mw/log/detail/data_router/shared_memory:reader",
],
)

cc_library(
name = "logparser",
srcs = [
Expand All @@ -174,6 +194,7 @@ cc_library(
":datarouter_types",
":dltprotocol",
":log",
":logparser_interface",
"//score/mw/log/detail/data_router/shared_memory:reader",
"@score_baselibs//score/static_reflection_with_serialization/serialization",
],
Expand All @@ -191,12 +212,27 @@ cc_library(
deps = [
":datarouter_types",
":dltprotocol",
":logparser_interface",
"//score/mw/log/detail/data_router/shared_memory:reader",
"@score_baselibs//score/mw/log/configuration:nvconfig",
"@score_baselibs//score/static_reflection_with_serialization/serialization",
],
)

cc_library(
name = "logparser_mock",
hdrs = [
"mocks/logparser_mock.h",
],
features = COMPILER_WARNING_FEATURES,
strip_include_prefix = "mocks",
visibility = ["//score/datarouter/test:__subpackages__"],
deps = [
":logparser_interface",
"@googletest//:gtest_main",
],
)

## ===========================================================================
## libtracing
## ---------------------------------------------------------------------------
Expand Down Expand Up @@ -664,6 +700,7 @@ cc_library(
strip_include_prefix = "include",
visibility = ["//visibility:private"],
deps = [
":datarouter_feature_config",
":datarouter_lib",
":dltserver_common",
":persistentlogconfig",
Expand Down Expand Up @@ -693,6 +730,7 @@ cc_library(
strip_include_prefix = "include",
visibility = ["//score/datarouter/test:__subpackages__"],
deps = [
":datarouter_feature_config",
":datarouter_testing",
":dltserver_common",
":persistentlogconfig",
Expand Down Expand Up @@ -737,6 +775,38 @@ cc_library(
}),
)

cc_library(
name = "datarouter_socketserver_testing",
testonly = True,
srcs = [
"src/daemon/socketserver.cpp",
],
hdrs = [
"include/daemon/socketserver.h",
],
# TODO: will be reworked in Ticket-207823
features = COMPILER_WARNING_FEATURES,
local_defines = select({
"//score/datarouter/build_configuration_flags:config_persistent_logging": ["PERSISTENT_LOGGING"],
"//conditions:default": [],
}),
strip_include_prefix = "include",
visibility = ["//score/datarouter/test:__subpackages__"],
deps = [
":datarouter_feature_config",
":datarouter_lib",
":dltserver",
":persistentlogconfig",
":socketserver_config_lib_testing",
"@score_baselibs//score/mw/log/configuration:nvconfigfactory",
] + select({
"//score/datarouter/build_configuration_flags:config_persistent_logging": [
"//score/datarouter/persistent_logging/persistent_logging_stub:sysedr_stub",
],
"//conditions:default": [],
}),
)

cc_library(
name = "datarouter_app",
srcs = [
Expand All @@ -762,10 +832,7 @@ cc_binary(
],
features = COMPILER_WARNING_FEATURES,
visibility = [
"//ecu/xyz/xyz-shared/config/common/pas/datarouter:__subpackages__",
"//platform/aas/tools/itf:__subpackages__",
"//platform/aas/tools/sctf:__subpackages__",
# "@ddad//ecu/xyz/xyz-shared/config/common/pas/datarouter:__subpackages__",
"//visibility:public",
],
deps = [
":datarouter_app",
Expand Down
4 changes: 2 additions & 2 deletions score/datarouter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ Example statistics message:
```
[APP1 : count 4074 , size 714378 B, read_time: 21000 us, transp_delay: 204000 us time_between_to_calls_: 0 us time_to_process_records_: 0 us buffer size watermark: 152 KB out of 512 KB ( 29 %) messages dropped: 0 (accumulated)]
```
This example shows source `APP1` transmitted 4074 messages with a total payload size of 714378 bytes. The datarouter spent 204000 microseconds reading the messages (since the last read cycle), with a maximum message latency of 1364 microseconds.
This example shows source `APP1` transmitted 4074 messages with a total payload size of 714378 bytes. The datarouter spent 204000 microseconds reading the messages (since the last read cycle).

The datarouter detects message gaps in the incoming flow and reports them using the source ID as the context ID.

Example gap detection message:
```
535575 2019/11/05 14:59:22.720133 244.3038 157 HFPP DR 485 0 log error verbose 5 message drop detected: messages 1073 to 1110 lost!
535575 2019/11/05 14:59:22.720133 244.3038 157 ECU1 DR 485 0 log error verbose 5 message drop detected: messages 1073 to 1110 lost!
```
The source ID corresponds to its PID. In this example, the source with PID `485` lost 37 messages because the datarouter did not read the ring buffer fast enough.

Expand Down
61 changes: 31 additions & 30 deletions score/datarouter/datarouter/data_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ DataRouter::MessagingSessionPtr DataRouter::new_source_session(
// It shall be safe to create shared memory reader as only single process - Datarouter daemon, shall be running
// at all times in whole system.
auto reader = reader_factory->Create(fd, client_pid);
if (reader.has_value() == false)
if (reader == nullptr)
{
stats_logger_.LogError() << "Failed to create session for pid=" << client_pid << ", appid=" << name;
return nullptr;
}

return new_source_session_impl(
name, is_dlt_enabled, std::move(handle), quota, quota_enforcement_enabled, std::move(reader.value()), nvConfig);
name, is_dlt_enabled, std::move(handle), quota, quota_enforcement_enabled, std::move(reader), nvConfig);
}

void DataRouter::show_source_statistics(uint16_t series_num)
Expand All @@ -119,20 +119,21 @@ std::unique_ptr<DataRouter::SourceSession> DataRouter::new_source_session_impl(
SessionHandleVariant handle,
const double quota,
bool quota_enforcement_enabled,
score::mw::log::detail::SharedMemoryReader reader,
std::unique_ptr<score::mw::log::detail::ISharedMemoryReader> reader,
const score::mw::log::NvConfig& nvConfig)
{
std::lock_guard<std::mutex> lock(subscriber_mutex_);

auto sourceSession = std::make_unique<DataRouter::SourceSession>(*this,
std::move(reader),
name,
is_dlt_enabled,
std::move(handle),
quota,
quota_enforcement_enabled,
stats_logger_,
nvConfig);
auto sourceSession =
std::make_unique<DataRouter::SourceSession>(*this,
std::move(reader),
name,
is_dlt_enabled,
std::move(handle),
quota,
quota_enforcement_enabled,
stats_logger_,
std::make_unique<score::platform::internal::LogParser>(nvConfig));
if (sourceCallback_)
{
sourceCallback_(std::move(sourceSession->get_parser()));
Expand Down Expand Up @@ -178,9 +179,9 @@ bool DataRouter::SourceSession::tryFinalizeAcquisition(bool& needs_fast_reschedu

if (data_acquired_local.has_value())
{
if (reader_.IsBlockReleasedByWriters(data_acquired_local.value().acquired_buffer))
if (reader_->IsBlockReleasedByWriters(data_acquired_local.value().acquired_buffer))
{
std::ignore = reader_.NotifyAcquisitionSetReader(data_acquired_local.value());
std::ignore = reader_->NotifyAcquisitionSetReader(data_acquired_local.value());
command_data_.lock()->data_acquired_ = std::nullopt;

return true;
Expand Down Expand Up @@ -213,7 +214,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou

score::mw::log::detail::TypeRegistrationCallback on_new_type =
[this](const score::mw::log::detail::TypeRegistration& registration) noexcept {
parser_.AddIncomingType(registration);
parser_->AddIncomingType(registration);
};

score::mw::log::detail::NewRecordCallback on_new_record =
Expand All @@ -225,7 +226,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
}

auto record_received_timestamp = score::mw::log::detail::TimePoint::clock::now();
parser_.Parse(record);
parser_->Parse(record);
++message_count_local;

transport_delay_local = std::max(transport_delay_local,
Expand All @@ -234,7 +235,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
};

bool detach_needed = false;
const auto number_of_bytes_in_buffer_result = reader_.Read(on_new_type, on_new_record);
const auto number_of_bytes_in_buffer_result = reader_->Read(on_new_type, on_new_record);
if (number_of_bytes_in_buffer_result.has_value())
{
number_of_bytes_in_buffer = number_of_bytes_in_buffer_result.value();
Expand Down Expand Up @@ -262,7 +263,7 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou
if (cmd->block_expected_to_be_next.has_value())
{
const auto peek_bytes =
reader_.PeekNumberOfBytesAcquiredInBuffer(cmd->block_expected_to_be_next.value());
reader_->PeekNumberOfBytesAcquiredInBuffer(cmd->block_expected_to_be_next.value());

if ((peek_bytes.has_value() && peek_bytes.value() > 0) ||
(cmd->ticks_without_write > kTicksWithoutAcquireWhileNoWrites))
Expand Down Expand Up @@ -291,12 +292,12 @@ void DataRouter::SourceSession::processAndRouteLogMessages(uint64_t& message_cou

void DataRouter::SourceSession::process_detached_logs(uint64_t& number_of_bytes_in_buffer)
{
const auto number_of_bytes_in_buffer_result_detached = reader_.ReadDetached(
const auto number_of_bytes_in_buffer_result_detached = reader_->ReadDetached(
[this](const auto& registration) noexcept {
parser_.AddIncomingType(registration);
parser_->AddIncomingType(registration);
},
[this](const auto& record) noexcept {
parser_.Parse(record);
parser_->Parse(record);
});

if (number_of_bytes_in_buffer_result_detached.has_value())
Expand All @@ -316,8 +317,8 @@ void DataRouter::SourceSession::update_and_log_stats(uint64_t message_count_loca
{
auto stats = stats_data_.lock();

const auto message_count_dropped_new = reader_.GetNumberOfDropsWithBufferFull();
const auto size_dropped_new = reader_.GetSizeOfDropsWithBufferFull();
const auto message_count_dropped_new = reader_->GetNumberOfDropsWithBufferFull();
const auto size_dropped_new = reader_->GetSizeOfDropsWithBufferFull();
if (message_count_dropped_new != stats->message_count_dropped)
{
stats_logger_.LogError() << stats->name << ": message drop detected: "
Expand All @@ -327,7 +328,7 @@ void DataRouter::SourceSession::update_and_log_stats(uint64_t message_count_loca
stats->size_dropped = size_dropped_new;
}

const auto message_count_dropped_invalid_size_new = reader_.GetNumberOfDropsWithInvalidSize();
const auto message_count_dropped_invalid_size_new = reader_->GetNumberOfDropsWithInvalidSize();
if (message_count_dropped_invalid_size_new != stats->message_count_dropped_invalid_size)
{
stats_logger_.LogError() << stats->name << ": message drop detected: "
Expand All @@ -349,22 +350,22 @@ void DataRouter::SourceSession::update_and_log_stats(uint64_t message_count_loca
}

DataRouter::SourceSession::SourceSession(DataRouter& router,
score::mw::log::detail::SharedMemoryReader reader,
std::unique_ptr<score::mw::log::detail::ISharedMemoryReader> reader,
const std::string& name,
const bool is_dlt_enabled,
SessionHandleVariant handle,
const double quota,
bool quota_enforcement_enabled,
score::mw::log::Logger& stats_logger,
const score::mw::log::NvConfig& nvConfig)
std::unique_ptr<score::platform::internal::ILogParser> parser)
: UnixDomainServer::ISession{},
MessagePassingServer::ISession{},
local_subscriber_data_(LocalSubscriberData{}),
command_data_(CommandData{}),
stats_data_(StatsData{}),
router_(router),
reader_(std::move(reader)),
parser_(nvConfig),
parser_(std::move(parser)),
handle_(std::move(handle)),
stats_logger_(stats_logger)
{
Expand Down Expand Up @@ -464,7 +465,7 @@ void DataRouter::SourceSession::show_stats()
name = stats->name;
}

const auto buffer_size_kb = reader_.GetRingBufferSizeBytes() / 1024U / 2U;
const auto buffer_size_kb = reader_->GetRingBufferSizeBytes() / 1024U / 2U;
auto buffer_watermark_kb = max_bytes_in_buffer / 1024U;

if (message_count_dropped > 0)
Expand Down Expand Up @@ -533,7 +534,8 @@ void DataRouter::SourceSession::request_acquire()
},
[](score::cpp::pmr::unique_ptr<score::platform::internal::daemon::ISessionHandle>& handle) {
handle->AcquireRequest();
}),
// For the quality team argumentation, kindly, check Ticket-200702 and Ticket-229594.
}), // LCOV_EXCL_LINE : tooling issue. no code to test in this line.
handle_);
}

Expand All @@ -548,7 +550,6 @@ void DataRouter::SourceSession::on_closed_by_peer()
{
command_data_.lock()->command_detach_on_closed = true;
}
// LCOV_EXCL_STOP

} // namespace datarouter
} // namespace platform
Expand Down
Loading
Loading