Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Fix Wireshark dissector decode send command metadata behavior #13471

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar-client-cpp/wireshark/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

set(CMAKE_CXX_FLAGS "-O3 -g ${CMAKE_CXX_FLAGS}")

MESSAGE(STATUS "Use WIRESHARK_BUILD_TYPE: ${CMAKE_BUILD_TYPE}")

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
add_definitions("-DDEBUG")
endif()

# Wireshark dependency's
find_library(WIRESHARK_LIB wireshark)
find_library(WIRESHARK_UTIL_LIB wsutil)
Expand Down
237 changes: 209 additions & 28 deletions pulsar-client-cpp/wireshark/pulsarDissector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,41 @@ static int hf_pulsar_consumer_name = -1;
static int hf_pulsar_producer_name = -1;

static int hf_pulsar_publish_time = -1;
static int hf_pulsar_replicated_from = -1;
static int hf_pulsar_deliver_at_time = -1;
static int hf_pulsar_event_time = -1;
static int hf_pulsar_deliver_after_time = -1;

static int hf_pulsar_chunk_id = -1;
static int hf_pulsar_num_chunks_from_msg = -1;
static int hf_pulsar_uuid = -1;
static int hf_pulsar_compression_type = -1;
static int hf_pulsar_uncompressed_size = -1;
static int hf_pulsar_partition_key = -1;
static int hf_pulsar_ordering_key = -1;
static int hf_pulsar_encryption_algo = -1;
static int hf_pulsar_encryption_param = -1;
static int hf_pulsar_encryption_keys = -1;
static int hf_pulsar_replicated_from = -1;
static int hf_pulsar_replicate_to = -1;
static int hf_pulsar_property = -1;

static int hf_pulsar_txnid_least_bits = -1;
static int hf_pulsar_txnid_most_bits = -1;

static int hf_pulsar_request_in = -1;
static int hf_pulsar_response_in = -1;
static int hf_pulsar_publish_latency = -1;

static int hf_pulsar_sequence_id = -1;
static int hf_pulsar_highest_sequence_id = -1;
static int hf_pulsar_message_id = -1;
static int hf_pulsar_message_permits = -1;

static int ett_pulsar = -1;

const static int FRAME_SIZE_LEN = 4;
const static guint16 MAGIC_BROKER_ENTRY_METADATA = 0x0e02;
const static guint16 MAGIC_CRC32C = 0x0e01;

static pulsar::proto::BaseCommand command;

Expand Down Expand Up @@ -193,6 +212,11 @@ static const value_string sub_type_names_vs[] = {
{CommandSubscribe::Key_Shared, "Key_Shared"},
};

static const value_string compression_type_name_vs[] = {
{CompressionType::NONE, "NONE"}, {CompressionType::LZ4, "LZ4"}, {CompressionType::ZLIB, "ZLIB"},
{CompressionType::ZSTD, "ZSTD"}, {CompressionType::SNAPPY, "SNAPPY"},
};

static const char* to_str(int value, const value_string* values) {
return val_to_str(value, values, "Unknown (%d)");
}
Expand Down Expand Up @@ -246,13 +270,36 @@ struct ConnectionState {
};

static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t* tvb, int offset, int maxOffset) {
if (tvb_get_ntohs(tvb, offset) == MAGIC_BROKER_ENTRY_METADATA) {
offset += 2;
auto brokerEntryMetadataSize = (int)tvb_get_ntohl(tvb, offset);
offset += brokerEntryMetadataSize + 2;
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MAGIC_BROKER_ENTRY_METADATA %d",
brokerEntryMetadataSize);
#endif
}
if (tvb_get_ntohs(tvb, offset) == MAGIC_CRC32C) {
#ifdef DEBUG
auto checksum = tvb_get_ntohl(tvb, offset);
proto_tree_add_debug_text(frame_tree, "[DEBUG] CRC32C %d", checksum);
#endif
// Skip CRC32C Magic (2) and CRC32C checksum (4)
offset += 2;
offset += 4;
}
// Decode message metadata
auto metadataSize = tvb_get_ntohl(tvb, offset);
offset += 4;
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MetadataSize %d, maxOffset %d", metadataSize, maxOffset);
#endif

if (offset + metadataSize > maxOffset) {
// Not enough data to dissect metadata
proto_tree_add_debug_text(frame_tree, "[Not enough data to dissect message metadata]");
// Not enough data to dissect metadata
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] Not enough data to dissect message metadata");
#endif
return;
}

Expand All @@ -265,22 +312,103 @@ static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t* tvb, int
return;
}

proto_item* md_tree = proto_tree_add_subtree_format(
frame_tree, tvb, offset, metadataSize, ett_pulsar, nullptr, "Message / %s / %" G_GINT64_MODIFIER "u",
msgMetadata.producer_name().c_str(), msgMetadata.sequence_id());
#ifdef DEBUG
proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata Utf8DebugString : %s",
msgMetadata.Utf8DebugString().c_str());
proto_tree_add_debug_text(frame_tree, "[DEBUG] MessageMetadata SerializeAsString : %s",
msgMetadata.SerializeAsString().c_str());
#endif

proto_item* md_tree =
proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize, ett_pulsar, nullptr,
"MessageMetadata / %s / %" G_GINT64_MODIFIER "u",
msgMetadata.producer_name().c_str(), msgMetadata.sequence_id());
proto_tree_add_string(md_tree, hf_pulsar_producer_name, tvb, offset, metadataSize,
msgMetadata.producer_name().c_str());

// IDs
proto_tree_add_uint64(md_tree, hf_pulsar_sequence_id, tvb, offset, metadataSize,
msgMetadata.sequence_id());
if (msgMetadata.has_highest_sequence_id()) {
proto_tree_add_uint64(md_tree, hf_pulsar_highest_sequence_id, tvb, offset, metadataSize,
msgMetadata.highest_sequence_id());
}

if (msgMetadata.has_chunk_id()) {
proto_tree_add_uint(md_tree, hf_pulsar_chunk_id, tvb, offset, metadataSize, msgMetadata.chunk_id());
if (msgMetadata.has_num_chunks_from_msg()) {
proto_tree_add_uint(md_tree, hf_pulsar_num_chunks_from_msg, tvb, offset, metadataSize,
msgMetadata.num_chunks_from_msg());
}
}

if (msgMetadata.has_uuid()) {
proto_tree_add_string(md_tree, hf_pulsar_uuid, tvb, offset, metadataSize, msgMetadata.uuid().c_str());
}

// Times
proto_tree_add_uint64(md_tree, hf_pulsar_publish_time, tvb, offset, metadataSize,
msgMetadata.publish_time());
if (msgMetadata.has_replicated_from()) {
proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
msgMetadata.replicated_from().c_str());

if (msgMetadata.has_deliver_at_time()) {
proto_tree_add_uint64(md_tree, hf_pulsar_deliver_at_time, tvb, offset, metadataSize,
msgMetadata.deliver_at_time());
proto_tree_add_uint64(md_tree, hf_pulsar_deliver_after_time, tvb, offset, metadataSize,
msgMetadata.deliver_at_time() - msgMetadata.publish_time());
}

if (msgMetadata.has_event_time()) {
proto_tree_add_uint64(md_tree, hf_pulsar_event_time, tvb, offset, metadataSize,
msgMetadata.event_time());
}

// Compression
if (msgMetadata.has_compression()) {
proto_tree_add_string(md_tree, hf_pulsar_compression_type, tvb, offset, metadataSize,
to_str(msgMetadata.compression(), compression_type_name_vs));
}

if (msgMetadata.has_uncompressed_size()) {
proto_tree_add_uint(md_tree, hf_pulsar_uncompressed_size, tvb, offset, metadataSize,
msgMetadata.uncompressed_size());
}

// Keys
if (msgMetadata.has_partition_key()) {
proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize,
msgMetadata.partition_key().c_str());
}

if (msgMetadata.has_ordering_key()) {
proto_tree_add_string(md_tree, hf_pulsar_ordering_key, tvb, offset, metadataSize,
msgMetadata.ordering_key().c_str());
}

// Encryption
if (msgMetadata.has_encryption_algo()) {
proto_tree_add_string(md_tree, hf_pulsar_encryption_algo, tvb, offset, metadataSize,
msgMetadata.encryption_algo().c_str());
}
if (msgMetadata.has_encryption_param()) {
proto_tree_add_string(md_tree, hf_pulsar_encryption_param, tvb, offset, metadataSize,
msgMetadata.encryption_param().c_str());
}
if (msgMetadata.encryption_keys_size() > 0) {
proto_item* encryption_keys_tree = proto_tree_add_subtree_format(
md_tree, tvb, offset, msgMetadata.encryption_param().size(), ett_pulsar, nullptr,
"EncryptionParam / %s", msgMetadata.encryption_algo().c_str());
for (int i = 0; i < msgMetadata.encryption_keys().size(); i++) {
const auto& encryption_key = msgMetadata.encryption_keys(i);
proto_tree_add_string_format(
encryption_keys_tree, hf_pulsar_encryption_keys, tvb, offset, metadataSize, "", "%s : %s",
encryption_key.has_key() ? encryption_key.key().c_str() : "<none>",
encryption_key.has_value() ? encryption_key.value().c_str() : "<none>");
}
}

// Properties
if (msgMetadata.properties_size() > 0) {
proto_item* properties_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize,
proto_item* properties_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
ett_pulsar, nullptr, "Properties");
for (int i = 0; i < msgMetadata.properties_size(); i++) {
const KeyValue& kv = msgMetadata.properties(i);
Expand All @@ -289,20 +417,33 @@ static void dissect_message_metadata(proto_tree* frame_tree, tvbuff_t* tvb, int
}
}

// Replication
if (msgMetadata.has_replicated_from()) {
proto_tree_add_string(md_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
msgMetadata.replicated_from().c_str());
}

if (msgMetadata.replicate_to_size() > 0) {
proto_item* replicate_tree = proto_tree_add_subtree_format(frame_tree, tvb, offset, metadataSize,
proto_item* replicate_tree = proto_tree_add_subtree_format(md_tree, tvb, offset, metadataSize,
ett_pulsar, nullptr, "Replicate to");
for (int i = 0; i < msgMetadata.replicate_to_size(); i++) {
proto_tree_add_string_format(replicate_tree, hf_pulsar_replicated_from, tvb, offset, metadataSize,
"", "%s", msgMetadata.replicate_to(i).c_str());
}
}

if (msgMetadata.has_partition_key()) {
proto_tree_add_string(md_tree, hf_pulsar_partition_key, tvb, offset, metadataSize,
msgMetadata.partition_key().c_str());
// Transaction
if (msgMetadata.has_txnid_least_bits()) {
proto_tree_add_uint64(md_tree, hf_pulsar_txnid_least_bits, tvb, offset, metadataSize,
msgMetadata.txnid_least_bits());
}

if (msgMetadata.has_txnid_most_bits()) {
proto_tree_add_uint64(md_tree, hf_pulsar_txnid_most_bits, tvb, offset, metadataSize,
msgMetadata.txnid_most_bits());
}

// Payloads
offset += metadataSize;
uint32_t payloadSize = maxOffset - offset;
proto_tree_add_subtree_format(md_tree, tvb, offset, payloadSize, ett_pulsar, nullptr, "Payload / size=%u",
Expand Down Expand Up @@ -341,27 +482,28 @@ void link_to_response_frame(proto_tree* cmd_tree, tvbuff_t* tvb, int offset, int

/* This method dissects fully reassembled messages */
static int dissect_pulsar_message(tvbuff_t* tvb, packet_info* pinfo, proto_tree* tree, void* data _U_) {
col_set_str(pinfo->cinfo, COL_PROTOCOL, "Pulsar");

conversation_t* conversation = find_or_create_conversation(pinfo);
auto state = (ConnectionState*)conversation_get_proto_data(conversation, proto_pulsar);
if (state == nullptr) {
state = new ConnectionState();
conversation_add_proto_data(conversation, proto_pulsar, state);
}

uint32_t offset = FRAME_SIZE_LEN;
int maxOffset = tvb_captured_length(tvb);

auto cmdSize = (uint32_t)tvb_get_ntohl(tvb, offset);
offset += 4;

if (offset + cmdSize > maxOffset) {
// Not enough data to dissect
proto_tree_add_debug_text(tree, "[Not enough data to dissect command]");
// Not enough data to dissect
#ifdef DEBUG
proto_tree_add_debug_text(tree, "[Debug] Not enough data to dissect command");
#endif
return maxOffset;
}

col_set_str(pinfo->cinfo, COL_PROTOCOL, "Pulsar");

conversation_t* conversation = find_or_create_conversation(pinfo);
auto state = (ConnectionState*)conversation_get_proto_data(conversation, proto_pulsar);
if (state == nullptr) {
state = new ConnectionState();
conversation_add_proto_data(conversation, proto_pulsar, state);
}

auto ptr = (uint8_t*)tvb_get_ptr(tvb, offset, cmdSize);
if (!command.ParseFromArray(ptr, cmdSize)) {
proto_tree_add_boolean_format(tree, hf_pulsar_error, tvb, offset, cmdSize, true,
Expand Down Expand Up @@ -976,22 +1118,61 @@ static hf_register_info hf[] = {

{&hf_pulsar_sequence_id,
{"Sequence Id", "apache.pulsar.sequence_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_highest_sequence_id,
{"Highest Sequence Id", "apache.pulsar.highest_sequence_id", FT_UINT64, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_uuid, {"UUID", "apache.pulsar.uuid", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //

{&hf_pulsar_message_id,
{"Message Id", "apache.pulsar.message_id", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_message_permits,
{"Message Permits", "apache.pulsar.message_permits", FT_UINT32, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //

{&hf_pulsar_publish_time,
{"Publish time", "apache.pulsar.publish_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{"Publish Time", "apache.pulsar.publish_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_deliver_at_time,
{"Deliver At Time", "apache.pulsar.deliver_at_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_event_time,
{"Event Time", "apache.pulsar.event_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_deliver_after_time,
{"Deliver After Time", "apache.pulsar.deliver_after_time", FT_UINT64, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //

{&hf_pulsar_chunk_id,
{"Chunk Id", "apache.pulsar.chunk_id", FT_UINT32, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_num_chunks_from_msg,
{"Num Chunks From Msg", "apache.pulsar.num_chunks_from_msg", FT_UINT32, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_compression_type,
{"Compression Type", "apache.pulsar.compression_type", FT_STRING, BASE_NONE, NULL, 0x0, NULL,
HFILL}}, //
{&hf_pulsar_uncompressed_size,
{"UnCompression Size", "apache.pulsar.uncompressed_size", FT_UINT32, BASE_DEC, NULL, 0x0, NULL,
HFILL}}, //

{&hf_pulsar_replicated_from,
{"Replicated from", "apache.pulsar.replicated_from", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_partition_key,
{"Partition key", "apache.pulsar.partition_key", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{"Partition Key", "apache.pulsar.partition_key", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_ordering_key,
{"Ordering Key", "apache.pulsar.ordering_key", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_encryption_algo,
{"Encryption Algo", "apache.pulsar.encryption_algo", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_encryption_param,
{"Encryption Param", "apache.pulsar.encryption_param", FT_STRING, BASE_NONE, NULL, 0x0, NULL,
HFILL}}, //

{&hf_pulsar_replicate_to,
{"Replicate to", "apache.pulsar.replicate_to", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_property,
{"Property", "apache.pulsar.property", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_encryption_keys,
{"Encryption Keys", "apache.pulsar.encryption_keys", FT_STRING, BASE_NONE, NULL, 0x0, NULL, HFILL}}, //

{&hf_pulsar_txnid_least_bits,
{"TxnId Least Bits", "apache.pulsar.txnid_least_bits", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //
{&hf_pulsar_txnid_most_bits,
{"TxnId Most Bits", "apache.pulsar.txnid_most_bits", FT_UINT64, BASE_DEC, NULL, 0x0, NULL, HFILL}}, //

{&hf_pulsar_request_in,
{"Request in frame", "apache.pulsar.request_in", FT_FRAMENUM, BASE_NONE, NULL, 0,
Expand Down