Skip to content

Commit 2d92b5b

Browse files
dwfflsDaan Wijffelsct2034
committed
Remote name splitting (#525)
* Instead of splitting the hardware_id we now split the name of the diagnostic message into its parts. User still needing the hardware_id can find is as a field in influxdb. Next to this the setting of "none" for unknown data has been removed as it is not part of the influxdb spec, now the field is not set. * Small update to diagnostic_updater README, explaining the parameters that can be set * changed response_code to long instead of CURLcode as it is a HTTP return and not CURL error code. * Removed the use of send.agg, now aggregate all /diagnostics and send them on a timer * Removed the use of send.agg, now aggregate all /diagnostics and send them on a timer * Removed the use of send.agg, now aggregate all /diagnostics and send them on a timer * Updated README * Forgot a merge conflict marker * Forgot a merge conflict marker --------- Co-authored-by: Daan Wijffels <dwijffels@lely.com> Co-authored-by: Christian Henkel <christian.henkel2@de.bosch.com>
1 parent aefcf01 commit 2d92b5b

File tree

6 files changed

+146
-77
lines changed

6 files changed

+146
-77
lines changed

diagnostic_remote_logging/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ The `influx` node supports several parameters. Below is an example configuration
3030
bucket:
3131
organization:
3232
send:
33-
agg: true
33+
diagnostics: true
34+
period: 1.0
3435
top_level_state: true
3536
```
3637
37-
- `send.agg`: Enables or disables subscription to the `/diagnostics_agg` topic.
38+
- `send.diagnostics`: Enables or disables subscription to the `/diagnostics` topic.
39+
- `send.period`: Specifies the interval in seconds for sending diagnostic data to InfluxDB. During each period, all incoming `/diagnostics` messages are collected and transmitted as a batch to InfluxDB.
3840
- `send.top_level_state`: Enables or disables subscription to the `/diagnostics_toplevel_state` topic.
3941

4042
#### InfluxDB Configuration

diagnostic_remote_logging/include/diagnostic_remote_logging/influx_line_protocol.hpp

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@
4040
#define DIAGNOSTIC_REMOTE_LOGGING__INFLUX_LINE_PROTOCOL_HPP_
4141

4242
#include <string>
43+
#include <tuple>
4344
#include <utility>
4445
#include <vector>
4546

46-
#include "rclcpp/rclcpp.hpp"
4747
#include "diagnostic_msgs/msg/diagnostic_array.hpp"
48+
#include "rclcpp/rclcpp.hpp"
4849

4950
std::string toInfluxTimestamp(const rclcpp::Time & time)
5051
{
@@ -105,68 +106,75 @@ std::string formatValues(const std::vector<diagnostic_msgs::msg::KeyValue> & val
105106
return formatted;
106107
}
107108

108-
std::pair<std::string, std::string> splitHardwareID(const std::string & input)
109+
std::tuple<std::string, std::string, std::string> splitName(const std::string & input)
109110
{
110-
size_t first_slash_pos = input.find('/');
111+
size_t colon_pos = input.find(':');
112+
std::string ns_node = (colon_pos != std::string::npos) ? input.substr(0, colon_pos) : input;
113+
std::string name = (colon_pos != std::string::npos) ? input.substr(colon_pos + 1) : "";
111114

112-
// If no slash is found, treat the entire input as the node_name
113-
if (first_slash_pos == std::string::npos) {
114-
return {"none", input};
115+
// Trim leading whitespace from diagnostic name
116+
if (!name.empty() && name[0] == ' ') {
117+
name = name.substr(1);
115118
}
116119

117-
size_t second_slash_pos = input.find('/', first_slash_pos + 1);
118-
119-
// If the second slash is found, extract the "ns" and "node" parts
120-
if (second_slash_pos != std::string::npos) {
121-
std::string ns = input.substr(first_slash_pos + 1, second_slash_pos - first_slash_pos - 1);
122-
std::string node = input.substr(second_slash_pos + 1);
123-
return {ns, node};
120+
// Handle use_fqn is false, only node name
121+
if (ns_node.empty() || ns_node[0] != '/') {
122+
// No leading slash - treat entire thing as node name
123+
return {"", ns_node, name};
124124
}
125125

126-
// If no second slash is found, everything after the first slash is the node
127-
std::string node = input.substr(first_slash_pos + 1);
128-
return {"none", node}; // ns is empty, node is the remaining string
126+
// Remove leading slash when we get a fully qualified name
127+
ns_node = ns_node.substr(1);
128+
size_t last_slash = ns_node.rfind('/');
129+
130+
std::string ns = ns_node.substr(0, last_slash);
131+
std::string node = ns_node.substr(last_slash + 1);
132+
133+
return {ns, node, name};
129134
}
130135

131136
void statusToInfluxLineProtocol(
132-
std::string & output,
133-
const diagnostic_msgs::msg::DiagnosticStatus & status,
137+
std::string & output, const diagnostic_msgs::msg::DiagnosticStatus & status,
134138
const std::string & timestamp_str)
135139
{
136-
// hardware_id is empty for analyzer groups, so skip them
137-
if (status.hardware_id.empty()) {
138-
return;
140+
auto [ns, node, name] = splitName(status.name);
141+
output += escapeSpace(node);
142+
if (!ns.empty()) {
143+
output += ",ns=" + escapeSpace(ns);
144+
}
145+
if (!name.empty()) {
146+
output += ",name=" + escapeSpace(name);
147+
}
148+
if (!status.hardware_id.empty()) {
149+
output += ",hardware_id=" + escapeSpace(status.hardware_id);
150+
}
151+
output += " level=" + std::to_string(status.level);
152+
153+
if (!status.message.empty()) {
154+
output += ",message=\"" + status.message + "\"";
139155
}
140156

141-
auto [ns, identifier] = splitHardwareID(status.hardware_id);
142-
output += escapeSpace(identifier) + ",ns=" + escapeSpace(ns) +
143-
" level=" + std::to_string(status.level) + ",message=\"" + status.message + "\"";
144157
auto formatted_key_values = formatValues(status.values);
145158
if (!formatted_key_values.empty()) {
146159
output += "," + formatted_key_values;
147160
}
148161
output += " " + timestamp_str + "\n";
149162
}
150163

151-
std::string diagnosticStatusToInfluxLineProtocol(
152-
const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr & msg, const rclcpp::Time & time)
164+
void statusToInfluxLineProtocol(
165+
std::string & output, const diagnostic_msgs::msg::DiagnosticStatus & status,
166+
const rclcpp::Time & time)
153167
{
154-
std::string output =
155-
msg->name + " level=" + std::to_string(msg->level) + " " + toInfluxTimestamp(time) + "\n";
156-
return output;
168+
statusToInfluxLineProtocol(output, status, toInfluxTimestamp(time));
157169
}
158170

159-
std::string diagnosticArrayToInfluxLineProtocol(
160-
const diagnostic_msgs::msg::DiagnosticArray::SharedPtr & diag_msg)
171+
void diagnosticArrayToInfluxLineProtocol(
172+
std::string & output, const diagnostic_msgs::msg::DiagnosticArray::SharedPtr & diag_msg)
161173
{
162-
std::string output;
163174
std::string timestamp = toInfluxTimestamp(diag_msg->header.stamp);
164-
165175
for (auto & status : diag_msg->status) {
166176
statusToInfluxLineProtocol(output, status, timestamp);
167177
}
168-
169-
return output;
170178
}
171179

172180
#endif // DIAGNOSTIC_REMOTE_LOGGING__INFLUX_LINE_PROTOCOL_HPP_

diagnostic_remote_logging/include/diagnostic_remote_logging/influxdb.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444
#endif
4545

4646
#include <curl/curl.h>
47+
4748
#include <string>
4849

50+
#include "diagnostic_msgs/msg/diagnostic_array.hpp"
4951
#include "diagnostic_remote_logging/influx_line_protocol.hpp"
50-
5152
#include "rclcpp/rclcpp.hpp"
52-
#include "diagnostic_msgs/msg/diagnostic_array.hpp"
5353

5454
class InfluxDB : public rclcpp::Node
5555
{
@@ -64,10 +64,15 @@ class InfluxDB : public rclcpp::Node
6464
std::string post_url_, influx_token_;
6565
CURL * curl_;
6666

67+
std::string output_string_;
68+
69+
rclcpp::TimerBase::SharedPtr diagnostics_send_timer_;
70+
6771
void setupConnection(const std::string & telegraf_url);
6872

6973
void diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg);
7074
void topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg);
75+
void sendTimerCallback();
7176

7277
bool sendToInfluxDB(const std::string & data);
7378
};

diagnostic_remote_logging/src/influxdb.cpp

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ InfluxDB::InfluxDB(const rclcpp::NodeOptions & opt)
5757
// Ensure all parameters are set
5858
if (organization.empty() || bucket.empty() || influx_token_.empty()) {
5959
throw std::runtime_error(
60-
"All parameters (connection.organization, connection.bucket, connection.token) "
61-
"must be set, or when using a proxy like Telegraf none have to be set.");
60+
"All parameters (connection.organization, connection.bucket, connection.token) "
61+
"must be set, or when using a proxy like Telegraf none have to be set.");
6262
}
6363

6464
// Construct the Telegraf URL
@@ -68,33 +68,56 @@ InfluxDB::InfluxDB(const rclcpp::NodeOptions & opt)
6868

6969
setupConnection(post_url_);
7070

71-
if (declare_parameter("send.agg", true)) {
71+
double send_period = declare_parameter<double>("send.period", 1.0);
72+
bool send_diagnostics = declare_parameter<bool>("send.diagnostics", true);
73+
74+
if (send_period <= 0.0 && send_diagnostics) {
75+
throw std::runtime_error(
76+
"Parameter send.period must be greater than 0.0 if send.diagnostics is set to true");
77+
}
78+
79+
if (send_diagnostics) {
80+
diagnostics_send_timer_ = this->create_wall_timer(
81+
std::chrono::duration<double>(send_period), std::bind(&InfluxDB::sendTimerCallback, this));
82+
7283
diag_sub_ = this->create_subscription<diagnostic_msgs::msg::DiagnosticArray>(
73-
"/diagnostics_agg", rclcpp::SensorDataQoS(),
74-
std::bind(&InfluxDB::diagnosticsCallback, this, std::placeholders::_1));
84+
"/diagnostics", rclcpp::SensorDataQoS(),
85+
std::bind(&InfluxDB::diagnosticsCallback, this, std::placeholders::_1));
86+
}
87+
88+
if (declare_parameter("send.agg", false)) {
89+
throw std::runtime_error(
90+
"The option send.agg is deprecated and will be removed in a future version. Use "
91+
"send.diagnostics and send.period instead.");
7592
}
7693

7794
if (declare_parameter<bool>("send.top_level_state", true)) {
7895
top_level_sub_ = this->create_subscription<diagnostic_msgs::msg::DiagnosticStatus>(
79-
"/diagnostics_toplevel_state", rclcpp::SensorDataQoS(),
80-
std::bind(&InfluxDB::topLevelCallback, this, std::placeholders::_1));
96+
"/diagnostics_toplevel_state", rclcpp::SensorDataQoS(),
97+
std::bind(&InfluxDB::topLevelCallback, this, std::placeholders::_1));
8198
}
8299
}
83100

84101
void InfluxDB::diagnosticsCallback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg)
85102
{
86-
std::string output = diagnosticArrayToInfluxLineProtocol(msg);
103+
diagnosticArrayToInfluxLineProtocol(output_string_, msg);
104+
}
87105

88-
if (!sendToInfluxDB(output)) {
89-
RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_agg to telegraf");
106+
void InfluxDB::sendTimerCallback()
107+
{
108+
if (!sendToInfluxDB(output_string_)) {
109+
RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics to telegraf");
90110
}
91111

92-
RCLCPP_DEBUG(this->get_logger(), "%s", output.c_str());
112+
RCLCPP_DEBUG(this->get_logger(), "%s", output_string_.c_str());
113+
114+
output_string_.clear();
93115
}
94116

95117
void InfluxDB::topLevelCallback(const diagnostic_msgs::msg::DiagnosticStatus::SharedPtr msg)
96118
{
97-
std::string output = diagnosticStatusToInfluxLineProtocol(msg, this->get_clock()->now());
119+
std::string output;
120+
statusToInfluxLineProtocol(output, *msg, this->get_clock()->now());
98121

99122
if (!sendToInfluxDB(output)) {
100123
RCLCPP_ERROR(this->get_logger(), "Failed to send /diagnostics_toplevel_state to telegraf");
@@ -145,12 +168,12 @@ bool InfluxDB::sendToInfluxDB(const std::string & data)
145168
RCLCPP_ERROR(this->get_logger(), "cURL error: %s", curl_easy_strerror(res));
146169
return false;
147170
}
148-
CURLcode response_code;
171+
int32_t response_code;
149172
curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code);
150173

151174
if (response_code != 204) {
152-
RCLCPP_ERROR(this->get_logger(), "Error (%d) when sending to telegraf:\n%s", response_code,
153-
data.c_str());
175+
RCLCPP_ERROR(
176+
this->get_logger(), "Error (%d) when sending to telegraf:\n%s", response_code, data.c_str());
154177
return false;
155178
}
156179

diagnostic_remote_logging/test/influx_line_protocol.cpp

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@
4040

4141
#include <gtest/gtest.h>
4242

43+
#include <rclcpp/rclcpp.hpp>
44+
4345
#include "diagnostic_msgs/msg/diagnostic_array.hpp"
4446
#include "diagnostic_msgs/msg/diagnostic_status.hpp"
4547
#include "diagnostic_msgs/msg/key_value.hpp"
46-
#include <rclcpp/rclcpp.hpp>
4748

4849
diagnostic_msgs::msg::KeyValue createKeyValue(const std::string & key, const std::string & value)
4950
{
@@ -95,31 +96,40 @@ TEST(FormatValuesTests, FormatsKeyValuePairs)
9596
EXPECT_EQ(formatValues(values), expected);
9697
}
9798

98-
// Test splitHardwareID
99-
TEST(SplitHardwareIDTests, SplitsCorrectly)
99+
// Test splitName
100+
TEST(SplitNameTests, SplitsCorrectly)
100101
{
101-
EXPECT_EQ(splitHardwareID("node_name"), std::make_pair(std::string("none"), std::string("node_"
102-
"name")));
103-
EXPECT_EQ(splitHardwareID("/ns/node_name"), std::make_pair(std::string("ns"), std::string("node_"
104-
"nam"
105-
"e")));
106-
EXPECT_EQ(splitHardwareID("/ns/prefix/node_name"),
107-
std::make_pair(std::string("ns"), std::string("prefix/"
108-
"node_name")));
102+
EXPECT_EQ(splitName("diagnostic_name"), std::make_tuple("", "diagnostic_name", ""));
103+
104+
EXPECT_EQ(splitName("/ns/node_name"), std::make_tuple("ns", "node_name", ""));
105+
106+
EXPECT_EQ(splitName("/ns/prefix/node_name"), std::make_tuple("ns/prefix", "node_name", ""));
107+
108+
EXPECT_EQ(
109+
splitName("/ns/ns2/ns3/node_name: diagnostic description"),
110+
std::make_tuple("ns/ns2/ns3", "node_name", "diagnostic description"));
111+
112+
EXPECT_EQ(
113+
splitName("/simple_ns/node: some long diagnostic name here"),
114+
std::make_tuple("simple_ns", "node", "some long diagnostic name here"));
115+
116+
EXPECT_EQ(splitName("just_node: diag"), std::make_tuple("", "just_node", "diag"));
109117
}
110118

111119
// Test statusToInfluxLineProtocol
112120
TEST(StatusToInfluxLineProtocolTests, FormatsCorrectly)
113121
{
114122
diagnostic_msgs::msg::DiagnosticStatus status;
115-
status.hardware_id = "/ns/node_name";
123+
status.name = "/test/test2/diagnostic_updater_example: topic1 topic status";
124+
status.hardware_id = "Device-27-46";
116125
status.level = 2;
117126
status.message = "Test message";
118127
status.values.push_back(createKeyValue("key1", "value1"));
119128
status.values.push_back(createKeyValue("key2", "42"));
120129

121130
std::string expected =
122-
"node_name,ns=ns level=2,message=\"Test message\",key1=\"value1\",key2=42"
131+
"diagnostic_updater_example,ns=test/test2,name=topic1\\ topic\\ "
132+
"status,hardware_id=Device-27-46 level=2,message=\"Test message\",key1=\"value1\",key2=42"
123133
" 1672531200123456789\n";
124134
std::string output;
125135
statusToInfluxLineProtocol(output, status, "1672531200123456789");
@@ -134,36 +144,48 @@ TEST(DiagnosticArrayToInfluxLineProtocolTests, HandlesMultipleStatuses)
134144
diag_msg->header.stamp = rclcpp::Time(1672531200, 123456789);
135145

136146
diagnostic_msgs::msg::DiagnosticStatus status1;
137-
status1.hardware_id = "/ns1/node1";
147+
status1.name = "/ns1/node1: diagnostic description";
148+
status1.hardware_id = "Device-27-46";
138149
status1.level = 1;
139150
status1.message = "First status";
140151
status1.values.push_back(createKeyValue("keyA", "valueA"));
141152

142153
diagnostic_msgs::msg::DiagnosticStatus status2;
143-
status2.hardware_id = "node2";
154+
status2.name = "node2";
155+
status2.hardware_id = "12345";
144156
status2.level = 2;
145157
status2.message = "Second status";
146158
status2.values.push_back(createKeyValue("keyB", "42"));
147159

148-
diag_msg->status = {status1, status2};
160+
diagnostic_msgs::msg::DiagnosticStatus status3;
161+
status3.name = "node3: status";
162+
status3.level = 3;
163+
164+
diag_msg->status = {status1, status2, status3};
165+
166+
std::string output;
167+
diagnosticArrayToInfluxLineProtocol(output, diag_msg);
149168

150169
std::string expected =
151-
"node1,ns=ns1 level=1,message=\"First "
170+
"node1,ns=ns1,name=diagnostic\\ description,hardware_id=Device-27-46 level=1,message=\"First "
152171
"status\",keyA=\"valueA\" 1672531200123456789\n"
153-
"node2,ns=none level=2,message=\"Second "
154-
"status\",keyB=42 1672531200123456789\n";
172+
"node2,hardware_id=12345 level=2,message=\"Second status\",keyB=42 1672531200123456789\n"
173+
"node3,name=status level=3 1672531200123456789\n";
155174

156-
EXPECT_EQ(diagnosticArrayToInfluxLineProtocol(diag_msg), expected);
175+
EXPECT_EQ(output, expected);
157176
}
158177

159-
// Test diagnosticStatusToInfluxLineProtocol
160-
TEST(DiagnosticStatusToInfluxLineProtocol, HandlesSingleStatus)
178+
// Test topLevelStatus
179+
TEST(TopLevelStatusTests, HandlesTopLevelStatus)
161180
{
162181
auto status = std::make_shared<diagnostic_msgs::msg::DiagnosticStatus>();
163182
status->level = 1;
164183
status->name = "toplevel_state";
165184
auto time = rclcpp::Time(1672531200, 123456789);
166185

186+
std::string output;
187+
statusToInfluxLineProtocol(output, *status, time);
188+
167189
std::string expected = "toplevel_state level=1 1672531200123456789\n";
168-
EXPECT_EQ(diagnosticStatusToInfluxLineProtocol(status, time), expected);
190+
EXPECT_EQ(output, expected);
169191
}

0 commit comments

Comments
 (0)