-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable support for MQTT Parser in stirling #1756
Enable support for MQTT Parser in stirling #1756
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChinmayaSharma-hue really appreciate all your hard work on this and very excited to have MQTT support within Pixie! I still haven't made it the whole way through this since I'm new to MQTT. However, I wanted to post the feedback I have so far.
@@ -82,6 +82,7 @@ pl_cc_test( | |||
"ENABLE_NATS_TRACING=true", | |||
"ENABLE_MONGO_TRACING=true", | |||
"ENABLE_AMQP_TRACING=true", | |||
"ENABLE_MQTT_TRACING=true", | |||
], | |||
deps = [ | |||
"//src/stirling/bpf_tools/bcc_bpf:headers", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should separate out any changes that aren't within the parsing code. This will be easier to review if we only have the following in this PR: protocols/mqtt/{types.h,parse.h,parser.cc}
and any related build changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, Noted! Will update in the next commit.
"//src/common/json:cc_library", | ||
"//src/common/zlib:cc_library", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these dependencies needed? I don't see any reference to json parsing or zlib
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they are not, I just forgot to remove them when I copied the BUILD.bazel file from a different protocol. Will update in the next commit.
struct State { | ||
bool conn_closed = false; | ||
}; | ||
|
||
struct StateWrapper { | ||
State global; | ||
std::monostate send; | ||
std::monostate recv; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you share more details on how this state structure will be used? Below NoState
is indicated, so I wasn't sure if you see value in using state or if this was accidental.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this in the beginning when I wasn't sure if this was needed, will remove this.
@@ -112,6 +112,9 @@ DEFINE_int32(stirling_enable_mux_tracing, | |||
DEFINE_int32(stirling_enable_amqp_tracing, | |||
gflags::Int32FromEnv("PX_STIRLING_ENABLE_AMQP_TRACING", px::stirling::TraceMode::On), | |||
"If true, stirling will trace and process AMQP messages."); | |||
DEFINE_int32(stirling_enable_mqtt_tracing, | |||
gflags::Int32FromEnv("PX_STIRLING_ENABLE_MQTT_TRACING", px::stirling::TraceMode::On), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should split this out to a later PR, but this should be px::stirling::TraceMode::OnForNewerKernels
. Our 4.14 kernel build is close to the max BPF program instruction count and so new protocols can't be enabled wholesale.
ParseState ParseFrame(message_type_t type, std::string_view* buf, | ||
Message* result) { | ||
CTX_DCHECK(type == message_type_t::kRequest || type == message_type_t::kResponse); | ||
if (buf->size() < 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is 2 because we could have a 1 byte control header and 1 byte packet length? It might be easier to make this < 5
so that our remaining code can assume that the entire packet length will be accessible. Otherwise, we need to handle ExtractUVarInt
errors differently depending on the context:
- Buffer size: 3 bytes, Expected Packet length: 4 bytes --
binary_decoder->ExtraceUVarInt
will fail and we need to returnkNeedsMoreData
- Buffer size: 5 bytes, Packet length payload: bogus UVarInt encoding --
binary_decoder->ExtraceUVarInt
will fail and we need to returnkInvalid
If we change the logic as I described above, any binary_decoder->ExtraceUVarInt
error means that the UVarInt was larger than 4 bytes and is malformed or not MQTT data, so it can always be treated as kInvalid
. We still need to check that the decoded value is within kMaxVarint28
that we discussed before, but it should simplify discerning the cases mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not work to return kNeedMoreData for all cases of ExtractUVarInt
errors? If buffer size is 3 bytes and expected packet length is 4 then ExtractUVarInt
would return an error and based on this kNeedsMoreData would be returned. I have added another function that checks whether or not the number returned by ExtractUVarInt
is over 4 bytes, which would cause the function to return kInvalid. So both the cases would be taken care of in this way right?
Also, I am not sure why it would be easier to change <2
to <5
. Wouldn't this eliminate cases where the buffer size is 2? (PINGREQ and PINRESP) are only 2 bytes, with remaining length set to 0.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about the case when ExtractUVarInt would return insufficient number of bytes error which would happen as it goes over 4 bytes to parse (as it does not know that the limit is 4 bytes), which would prompt my code to return kNeedsMoreData when in fact it is kInvalid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not work to return kNeedMoreData for all cases of ExtractUVarInt errors?
I don't think it will work because if only 2 bytes are available a UVarInt decode could return a complete value or it could return an insufficient number of bytes error. Since UVarInt's of 2-4 bytes in length are valid in MQTT, we can't use that error as an indicator unless we know that a 4 byte UVarInt is guaranteed to parse.
Also, I am not sure why it would be easier to change <2 to <5. Wouldn't this eliminate cases where the buffer size is 2? (PINGREQ and PINRESP) are only 2 bytes, with remaining length set to 0.)
Good call. Since PINGREQ
and PINGRESP
are a maximum of 2 bytes, that would be a problem and is another case we need to handle.
Are you talking about the case when ExtractUVarInt would return insufficient number of bytes error which would happen as it goes over 4 bytes to parse
Correct, that would be similar to case 1 from my original comment. If we check for 5 bytes, that would allow us to treat any UVarInt decoding as kInvalid
since it would guarantee that it would be 5+ bytes in size.
Could we handle the PINGREQ
and PINGRESP
cases once the control_packet_code_flags
variable is populated? Then we can check to see if the buffer contains 5 bytes? I'm thinking something like the following:
if (buf->size() < 2) {
return ParseState::kNeedsMoreData;
}
PX_ASSIGN_OR_RETURN_ERROR(uint8_t control_packet_code_flags, decoder.ExtractBEInt<uint8_t>());
uint8_t control_packet_code = control_packet_code_flags >> 4;
uint8_t control_packet_flags = control_packet_code_flags & 0x0F;
MqttControlPacketType control_packet_type = GetControlPacketType(control_packet_code);
result->control_packet_type = ControlPacketTypeStrings[control_packet_type];
if (control_packet_type == MqttControlPacketType::PINGREQ ||
control_packet_type == MqttControlPacketType::PINGRESP) {
// Decode UVarInt, check it's 1 byte in length and return success
// if decoding fails or it is > 1 byte, return kInvalid
}
// With the control messages less than 5 bytes in size handled, we can do the following validation.
// This would then allow us to treat any insufficient byte errors from UVarInt decoding as kInvalid since
// there shouldn't be a UVarInt with more than 4 bytes returned.
if (buf->size() < 5) {
return ParseState::kNeedsMoreData;
}
PX_ASSIGN_OR(size_t remaining_length, decoder.ExtractUVarInt(), return ParseState::kInvalid)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, now I seem to have understood your point. Just to confirm, I have drawn a diagram to find out if we are on the same page.
You're talking about the circled out cases in the figure where ExtractUVarInt returns insufficient number of bytes error even when the buffer is complete (meaning that the full packet is present) due to incorrect encoding leading ExtractUVarInt to think there is more variable encoded data than there is. (I guess this would cause a repeated return of kNeedsMoreData to keep filling the buffer with the wrong packet data as it is invalid)
Also buf->size() < 5
returning kNeedsMoreData
would work if UVarInt is more than one byte because remaining length being 2 byte or more would just mean that the buffer would definitely be bigger than 5.
So what you would be doing is just eliminate all cases of kNeedsMoreData before parsing UVarInt so that all the remaining cases of insufficient number of bytes error could be kInvalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leaves all the cases where remaining length is one byte and value of remaining length is less than 4. Like for PUBACK
remaining length is 3, and the remaining length itself takes one byte, so it is less than 5 bytes which would cause the code to return kNeedsMoreData
.
One way to resolve this is after checking if buffer size is less than 5, we can extract the UVarInt remaining length and check if it is one byte and then proceed.
- If
ExtractUVarInt
returns an integer more than one byte (but less than 4 bytes), that can be one of two things,- Encoding is wrong,
kInvalid
needs to be returned. - Buffer is incomplete,
kNeedsMoreData
needs to be returned.
- Encoding is wrong,
- If
ExtractUVarInt
returns insufficient number of bytes error then it is definitelykInvalid
as it is trying to consume more than 4 bytes for UVarInt.
So in the first point there needs to be a differentiation for the two cases. At this point I am at a loss as to how to do this. If the remaining length bytes itself are wrong (encoding is wrong) then there is no way to validate whether or not the buffer is incomplete or the remaining length bytes are wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So currently I have decided to do this,
// Decoding the variable encoding of remaining length field
size_t remaining_length;
if (control_packet_type == MqttControlPacketType::PINGREQ ||
control_packet_type == MqttControlPacketType::PINGRESP) {
PX_ASSIGN_OR_RETURN_INVALID(remaining_length, decoder.ExtractUVarInt());
if (remaining_length > 0) {
return ParseState::kInvalid;
}
}
// Eliminating cases where kNeedsMoreData needs to be returned
// If buffer size is less tan 4, there are chances that the remaining length is not present in its entirety
if (decoder.BufSize() < 4) {
// Checking if buffer is complete
PX_ASSIGN_OR_RETURN_NEEDS_MORE_DATA(remaining_length, decoder.ExtractUVarInt());
// if remaining length is greater than 3 (4 if remaining length is included), then incomplete buffer, otherwise buffer is complete
if (remaining_length > 3) {
return ParseState::kNeedsMoreData;
}
} else {
PX_ASSIGN_OR_RETURN_INVALID(remaining_length, decoder.ExtractUVarInt());
if (!VariableEncodingNumBytes(remaining_length).ok()) {
return ParseState::kInvalid;
}
}
// Making sure buffer is complete according to remaining length
if (decoder.BufSize() < remaining_length) {
return ParseState::kNeedsMoreData;
}
This is not perfect since it can cause problems in this section,
// if remaining length is greater than 3 (4 if remaining length is included), then incomplete buffer, otherwise buffer is complete
if (remaining_length > 3) {
return ParseState::kNeedsMoreData;
}
where I am deciding that buffer is incomplete if remaining length is greater than 3, when it could be that remaining length is greater than 3 simply because of an encoding error and the full data is still present in the buffer. But this works for most of the cases discussed.
Also <5
is replaced with <4
because the first byte is already extracted and I am using decoder's buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for the detailed explanation on those cases.
If we can special case any control codes that are known to be short in length (like we did with PINGREQ
and PINGRESP
, it may reduce the cases of treating some of these incorrectly. For now I'm fine with keeping the implementation as is unless you see an opportunity for special casing any of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Special casing only works for PINGREQ and PINGRESP as we can guarantee their sizes to be below 5. With all the other control packets there are chances that properties are present in variable header that can make them bigger.
if (result->header_fields["password_flag"]) { | ||
PX_ASSIGN_OR_RETURN_ERROR(size_t password_length, decoder->ExtractBEInt<uint16_t>()); | ||
PX_ASSIGN_OR_RETURN_ERROR(std::string_view password, decoder->ExtractString(password_length)); | ||
result->payload["password"] = std::string(password); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this field is important to capture? Since we are capable of capturing everything within a connection, I think it's best to avoid sensitive information when it's easily detected.
I think we should either replace it with a string of the same length "XXXXX" or just skip over the field after advancing the buffer.
result->header_fields["dup"] = (control_packet_flags >> 3) != 0; | ||
result->header_fields["retain"] = (control_packet_flags & 0x1) != 0; | ||
result->header_fields["qos"] = (control_packet_flags >> 1) & 0x3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are expected to be on every message, I think these would be better as individual bool
s on the Message
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for QOS right? Because it would be helpful to know the exact QOS (0,1 or 2) instead of just knowing whether or not the qos is 0 or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I glossed over that the qos field wasn't a bool. The data type should fit whatever we need to store for each field.
inline RecordsWithErrorCount<mqtt::Record> StitchFrames(std::deque<mqtt::Message>* req_messages, | ||
std::deque<mqtt::Message>* resp_messages, | ||
NoState* /*state*/) { | ||
return StitchMessagesWithTimestampOrder<mqtt::Record>(req_messages, resp_messages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stitcher should be the next PR after the parser changes, so I don't want to dive into this too much now. However, it seems that message ordering is only guaranteed within a given QoS level. I think using StitchMessagesWithTimestampOrder
will result in invalid protocol traces if a client used multiple QoS levels.
In the next PR we will add tests for the Stitcher and we can model that situation. We will likely need to perform a similar process as StitchMessagesWithTimestampOrder
, making sure that we only match frames within the same QoS since that will guarantee its assumptions hold. This is why it doesn't work with HTTP pipelining.
We can either use the QoS field as the map key to our new stitcher interface. This hasn't been documented yet as it's in the process of being merged (#1716) or we can leverage the protocol state to make sure we have the ordering correct. My initial thinking is that the former would be best, but we will have to see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I had initially used StitchMessagesWithTimestampOrder
as a placeholder as I was not sure what would work. I will focus more on the stitcher in the stitcher PR. Thanks for the additional context, it's very helpful.
ParseState result_state; | ||
std::string_view frame_view; | ||
|
||
uint8_t payload_format_indicator_publish[] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did these payloads come from real packet captures or are they handcrafted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They came from real packet captures. I used mosquitto to generate packets with different properties and payloads.
EXPECT_EQ(frame.header_fields["remaining_length"], (size_t) 16); | ||
EXPECT_EQ(frame.header_fields["username_flag"], (unsigned long) 0); | ||
EXPECT_EQ(frame.header_fields["password_flag"], (unsigned long) 0); | ||
EXPECT_EQ(frame.header_fields["will_retain"], (unsigned long) 0); | ||
EXPECT_EQ(frame.header_fields["will_qos"], (unsigned long) 0); | ||
EXPECT_EQ(frame.header_fields["will_flag"], (unsigned long) 0); | ||
EXPECT_EQ(frame.header_fields["clean_start"], (unsigned long) 1); | ||
EXPECT_EQ(frame.header_fields["keep_alive"], (unsigned long) 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use UL
to form the appropriate literals or remove the casting entirely. The tests seem to pass without these, so my preference is the latter. This applies to the other casting done in this file.
@@ -62,7 +62,7 @@ DEFINE_string(trace, "", | |||
"Dynamic trace to deploy. Either (1) the path to a file containing PxL or IR trace " | |||
"spec, or (2) <path to object file>:<symbol_name> for full-function tracing."); | |||
DEFINE_string(print_record_batches, | |||
"http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events", | |||
"http_events,mysql_events,pgsql_events,redis_events,cql_events,dns_events,mqtt_events", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should hold off on updating this until the MQTT changes are released (one of the final changes).
@@ -45,5 +45,6 @@ pl_cc_library( | |||
"//src/stirling/source_connectors/socket_tracer/protocols/nats:cc_library", | |||
"//src/stirling/source_connectors/socket_tracer/protocols/pgsql:cc_library", | |||
"//src/stirling/source_connectors/socket_tracer/protocols/redis:cc_library", | |||
"//src/stirling/source_connectors/socket_tracer/protocols/mqtt:cc_library", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be saved for a later change. Most likely once we introduce the "mqtt trace bpf test"
@@ -65,6 +65,7 @@ DECLARE_int32(stirling_enable_nats_tracing); | |||
DECLARE_int32(stirling_enable_kafka_tracing); | |||
DECLARE_int32(stirling_enable_mux_tracing); | |||
DECLARE_int32(stirling_enable_amqp_tracing); | |||
DECLARE_int32(stirling_enable_mqtt_tracing); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes to this file should also come in a later PR (when the MQTT trace bpf test is added).
@@ -49,7 +50,8 @@ using FrameDequeVariant = std::variant<std::monostate, | |||
std::deque<redis::Message>, | |||
std::deque<kafka::Packet>, | |||
std::deque<nats::Message>, | |||
std::deque<amqp::Frame>>; | |||
std::deque<amqp::Frame>, | |||
std::deque<mqtt::Message>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes to this file should also come in a later PR (when the MQTT trace bpf test is added).
//----------------------------------------------------------------------------- | ||
|
||
/** | ||
* Record is the primary output of the http stitcher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Record is the primary output of the http stitcher. | |
* Record is the primary output of the MQTT stitcher. |
@@ -30,3 +30,4 @@ | |||
#include "src/stirling/source_connectors/socket_tracer/protocols/nats/stitcher.h" // IWYU pragma: export | |||
#include "src/stirling/source_connectors/socket_tracer/protocols/pgsql/stitcher.h" // IWYU pragma: export | |||
#include "src/stirling/source_connectors/socket_tracer/protocols/redis/stitcher.h" // IWYU pragma: export | |||
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/stitcher.h" // IWYU pragma: export |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should save this for the stitcher PR.
namespace protocols { | ||
|
||
/** | ||
* Parses a single HTTP message from the input string. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Parses a single HTTP message from the input string. | |
* Parses a single MQTT message from the input string. |
#include "src/stirling/utils/binary_decoder.h" | ||
#include "src/stirling/source_connectors/socket_tracer/protocols/mqtt/types.h" | ||
|
||
#define PX_ASSIGN_OR_RETURN_ERROR(expr, val_or) \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this to PX_ASSIGN_OR_RETURN_NEEDS_MORE_DATA
? Imo kNeedsMoreData
isn't an error because it's an indicator that we should continue to retry. We also have the same macro defined in the pgsql parser and this would ensure our naming is consistent (source).
In addition to this, I believe we are returning kNeedsMoreData
in places that we shouldn't. Ideally we would validate that the parser has the entire payload as early as possible and return kInvalid
for any subsequent decoding. These later decodings shouldn't fail because we've validated that the buffer contains enough bytes, but imo it's the correct ParseState
to return. The MQTT case is a little complex because we have multiple "payload" lengths -- remaining_length
and variable_header_length
.
My understanding is that remaining length will include the size of the entire MQTT frame. Assuming that's correct, we should only consider returning kNeedsMoreData
until we can validate that the buffer is greater than or equal to remaining length. After that point, any decoding errors should be kInvalid
.
It appears the variable length header can contain optional fields. We only decode these fields when we know that they should be present in order to maintain the assumption I mentioned in the previous paragraph. It seems we are already accomplishing that (as seen in the PUBCOMP case), so that shouldn't require any changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what that would mean is only in the main parsing function before parsing the variable header or the payload, checks are done to make sure remaining length is equal to the buffer size and then for any subsequent bit extractions, kInvalid
should be returned?
Right now for every extraction error kNeedsMoreData
is being returned. I realise that extraction error happening is unlikely (unless remaining length field is incorrect) because we have already validated that the remaining length is equal to the buffer size so the full packet is already in the buffer. Would you suggest having PX_ASSIGN_OR_RETURN_INVALID to return kInvalid
in variable header parsing and payload parsing? Because an extraction error in variable header/payload parsing, after we have already validated remaining length to be equal to buffer size, means that the remaining length field was wrong, which would make it an invalid MQTT packet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your latest changes are consistent with what I described above, so this lgtm.
struct Message: public FrameBase { | ||
message_type_t type = message_type_t::kUnknown; | ||
|
||
std::string control_packet_type = "UNKNOWN"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to avoid storing strings in the data table unless it's necessary. Since this field can be modeled as an 8 bit int we should use that instead (MqttControlPacketType
/ uint8_t
).
We typically add a pxl function to map this int back to a string for queries (docs). This allows us to minimize the storage needed for the data type while still allowing the human readable name to be used for visualizations.
template<typename KeyType, typename ValueType> | ||
static std::string MapToString(const std::map<KeyType, ValueType>& inputMap) { | ||
std::string result = "{"; | ||
for (const auto& entry : inputMap) { | ||
result += entry.first + ": "; | ||
if constexpr (std::is_same_v<ValueType, uint32_t>) { | ||
result += std::to_string(entry.second); | ||
} else if constexpr (std::is_same_v<ValueType, std::string>) { | ||
result += entry.second; | ||
} | ||
result += ", "; | ||
} | ||
if (!inputMap.empty()) { | ||
result = result.substr(0, result.size() - 2); // Remove the trailing ", " | ||
} | ||
result += "}"; | ||
return result; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be replaced with the ToJSONString function? It seems like this is similar to json encoding and we should use rapidjson
rather than building it by hand.
std::string header_fields_str = "{"; | ||
for (const auto& entry : properties) { | ||
header_fields_str += entry.first + ": " + std::string(entry.second) + ", "; | ||
} | ||
header_fields_str += "}"; | ||
|
||
std::string properties_str = "{"; | ||
for (const auto& entry : properties) { | ||
properties_str += entry.first + ": " + entry.second + ", "; | ||
} | ||
properties_str += "}"; | ||
|
||
std::string payload_str = "{"; | ||
for (const auto& entry : properties) { | ||
payload_str += entry.first + ": " + entry.second + ", "; | ||
} | ||
payload_str += "}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question regarding json encoding here.
@ChinmayaSharma-hue just posted my second round of feedback and this is shaping up very nicely! Thanks again for all the hard work on this! In addition to the comments, can you please update the PR description to match our GitHub template? This would have been surfaced once the GitHub actions for this PR are permitted to run (I don't have that permission, but I can get that triggered today). |
ed808de
to
3216ad8
Compare
d3583eb
to
73132dd
Compare
73132dd
to
56033f3
Compare
I haven't gotten to making all the required changes as of yet including the linter error fixes, I will let you know as soon as all the changes are made. There were some instances where I made silly errors which should have been fixed without alerting on your part. I hope to make less of these in the future! Thanks a lot for the guidance and support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment regarding one of the changes from the previous round of feedback and two suggestions for fixing the runtime/int
linter warnings.
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_alias, decoder->ExtractBEInt<uint8_t>()); | ||
result->properties["maximum_qos"] = std::to_string(topic_alias); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PX_ASSIGN_OR_RETURN_INVALID(uint16_t topic_alias, decoder->ExtractBEInt<uint8_t>()); | |
result->properties["maximum_qos"] = std::to_string(topic_alias); | |
PX_ASSIGN_OR_RETURN_INVALID(uint8_t max_qos, decoder->ExtractBEInt<uint8_t>()); | |
result->properties["maximum_qos"] = std::to_string(max_qos); |
constexpr int kMaxVarInt24 = 2097152; | ||
constexpr int kMaxVarInt32 = 268435456; | ||
|
||
static inline StatusOr<size_t> VariableEncodingNumBytes(unsigned long integer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static inline StatusOr<size_t> VariableEncodingNumBytes(unsigned long integer) { | |
static inline StatusOr<size_t> VariableEncodingNumBytes(uint64_t integer) { |
break; | ||
} | ||
case PropertyCode::SubscriptionIdentifier: { | ||
PX_ASSIGN_OR_RETURN_INVALID(unsigned long subscription_id, decoder->ExtractUVarInt()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PX_ASSIGN_OR_RETURN_INVALID(unsigned long subscription_id, decoder->ExtractUVarInt()); | |
PX_ASSIGN_OR_RETURN_INVALID(uint64_t subscription_id, decoder->ExtractUVarInt()); |
I have fixed the linting errors (I ran arc lint locally), and I have fixed the other issues with the code. Sorry for the delay. |
@pixie-io/maintainers can we kick off the github actions for this PR? This is ready for its final review and I will be approving once the build passes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for this contribution @ChinmayaSharma-hue and looking forward to working together on the upcoming ones!
Thanks for the contribution @ChinmayaSharma-hue. We require all of our commits to be GPG signed. Could you please follow this guide and sign your commits: https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits |
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
…e tests Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
Signed-off-by: Chinmay <chinmaysharma1020@gmail.com>
1bd3d50
to
5fc5413
Compare
The commits are now gpg signed and verified. |
Summary: This PR adds the parser component of MQTT (v5), a newly added protocol.
Related issues: #341
Type of change: /kind feature
Test Plan: Added tests