diff --git a/src/stirling/source_connectors/socket_tracer/conn_tracker.h b/src/stirling/source_connectors/socket_tracer/conn_tracker.h index 2d854982764..50bd81dce97 100644 --- a/src/stirling/source_connectors/socket_tracer/conn_tracker.h +++ b/src/stirling/source_connectors/socket_tracer/conn_tracker.h @@ -260,13 +260,14 @@ class ConnTracker : NotCopyMoveable { using TRecordType = typename TProtocolTraits::record_type; using TFrameType = typename TProtocolTraits::frame_type; using TStateType = typename TProtocolTraits::state_type; + using TKey = typename TProtocolTraits::key_type; InitProtocolState(); - DataStreamsToFrames(); + DataStreamsToFrames(); - auto& req_frames = req_data()->Frames(); - auto& resp_frames = resp_data()->Frames(); + auto& req_frames = req_data()->Frames(); + auto& resp_frames = resp_data()->Frames(); auto state_ptr = protocol_state(); CONN_TRACE(2) << absl::Substitute("req_frames=$0 resp_frames=$1", req_frames.size(), @@ -279,39 +280,11 @@ class ConnTracker : NotCopyMoveable { // TODO(@benkilimnik): Eventually, we should migrate all of the protocols to use the map. if constexpr (TProtocolTraits::stream_support == protocols::BaseProtocolTraits::UseStream) { - using TKey = typename TProtocolTraits::key_type; - // TODO(@benkilimnik): For now, we populate the map using the parsed req and resp deques. - // In a future PR, we should parse the map earlier in the event parser. - absl::flat_hash_map> requests; - absl::flat_hash_map> responses; - for (auto& frame : req_frames) { - // GetStreamID returns 0 by default if not specialized in protocol. - auto key = protocols::GetStreamID(&frame); - requests[key].push_back(std::move(frame)); - } - for (auto& frame : resp_frames) { - auto key = protocols::GetStreamID(&frame); - responses[key].push_back(std::move(frame)); - } result = protocols::StitchFrames( - &requests, &responses, state_ptr); - // TODO(@benkilimnik): Update req and resp frame deques to match maps for now. Populate maps - // during parsing in a future PR. - req_frames.clear(); - for (auto& [_, frames] : requests) { - for (auto& frame : frames) { - req_frames.push_back(std::move(frame)); - } - } - resp_frames.clear(); - for (auto& [_, frames] : responses) { - for (auto& frame : frames) { - resp_frames.push_back(std::move(frame)); - } - } + &req_frames, &resp_frames, state_ptr); } else { result = protocols::StitchFrames( - &req_frames, &resp_frames, state_ptr); + &req_frames[0], &resp_frames[0], state_ptr); } CONN_TRACE(2) << absl::Substitute("records=$0", result.records.size()); @@ -325,15 +298,15 @@ class ConnTracker : NotCopyMoveable { * Returns reference to current set of unconsumed requests. * Note: A call to ProcessBytesToFrames() is required to parse new requests. */ - template - std::deque& req_frames() { - return req_data()->Frames(); + template + absl::flat_hash_map>& req_frames() { + return req_data()->Frames(); } // TODO(yzhao): req_data() requires role_ to be set. But HTTP2 uprobe tracing does // not set that. So send_data() is created. Investigate more unified approach. - template - const std::deque& send_frames() const { - return send_data_.Frames(); + template + const absl::flat_hash_map>& send_frames() const { + return send_data_.Frames(); } size_t http2_client_streams_size() const { return http2_client_streams_.streams().size(); } @@ -343,13 +316,13 @@ class ConnTracker : NotCopyMoveable { * Returns reference to current set of unconsumed responses. * Note: A call to ProcessBytesToFrames() is required to parse new responses. */ - template - std::deque& resp_frames() { - return resp_data()->Frames(); + template + absl::flat_hash_map>& resp_frames() { + return resp_data()->Frames(); } - template - const std::deque& recv_frames() const { - return recv_data_.Frames(); + template + const absl::flat_hash_map>& recv_frames() const { + return recv_data_.Frames(); } const conn_id_t& conn_id() const { return conn_id_; } @@ -572,13 +545,14 @@ class ConnTracker : NotCopyMoveable { std::chrono::time_point buffer_expiry_timestamp) { using TFrameType = typename TProtocolTraits::frame_type; using TStateType = typename TProtocolTraits::state_type; + using TKey = typename TProtocolTraits::key_type; if constexpr (std::is_same_v) { http2_client_streams_.Cleanup(frame_size_limit_bytes, frame_expiry_timestamp); http2_server_streams_.Cleanup(frame_size_limit_bytes, frame_expiry_timestamp); } else { - send_data_.CleanupFrames(frame_size_limit_bytes, frame_expiry_timestamp); - recv_data_.CleanupFrames(frame_size_limit_bytes, frame_expiry_timestamp); + send_data_.CleanupFrames(frame_size_limit_bytes, frame_expiry_timestamp); + recv_data_.CleanupFrames(frame_size_limit_bytes, frame_expiry_timestamp); } auto* state = protocol_state(); @@ -617,11 +591,11 @@ class ConnTracker : NotCopyMoveable { std::string ToString() const; - template + template void InitFrames() { if constexpr (!std::is_same_v) { - send_data_.InitFrames(); - recv_data_.InitFrames(); + send_data_.InitFrames(); + recv_data_.InitFrames(); } } @@ -631,6 +605,7 @@ class ConnTracker : NotCopyMoveable { template size_t MemUsage() const { using TFrameType = typename TProtocolTraits::frame_type; + using TKey = typename TProtocolTraits::key_type; size_t data_buffer_total = 0; data_buffer_total += send_data().data_buffer().capacity(); @@ -642,8 +617,8 @@ class ConnTracker : NotCopyMoveable { http2_events_total += http2_client_streams_.StreamsSize(); http2_events_total += http2_server_streams_.StreamsSize(); } else { - parsed_msg_total += send_data().FramesSize(); - parsed_msg_total += recv_data().FramesSize(); + parsed_msg_total += send_data().FramesSize(); + parsed_msg_total += recv_data().FramesSize(); } return data_buffer_total + http2_events_total + parsed_msg_total; @@ -687,19 +662,19 @@ class ConnTracker : NotCopyMoveable { void UpdateDataStats(const SocketDataEvent& event); - template + template void DataStreamsToFrames() { auto state_ptr = protocol_state(); DataStream* req_data_ptr = req_data(); DCHECK_NE(req_data_ptr, nullptr); - req_data_ptr->template ProcessBytesToFrames(message_type_t::kRequest, - state_ptr); + req_data_ptr->template ProcessBytesToFrames( + message_type_t::kRequest, state_ptr); DataStream* resp_data_ptr = resp_data(); DCHECK_NE(resp_data_ptr, nullptr); - resp_data_ptr->template ProcessBytesToFrames(message_type_t::kResponse, - state_ptr); + resp_data_ptr->template ProcessBytesToFrames( + message_type_t::kResponse, state_ptr); } template @@ -817,6 +792,7 @@ ConnTracker::ProcessToRecords(); template std::string DebugString(const ConnTracker& c, std::string_view prefix) { using TFrameType = typename TProtocolTraits::frame_type; + using TKey = typename TProtocolTraits::key_type; std::string info; info += absl::Substitute("$0conn_id=$1\n", prefix, ToString(c.conn_id())); @@ -829,9 +805,9 @@ std::string DebugString(const ConnTracker& c, std::string_view prefix) { info += c.http2_server_streams_.DebugString(absl::StrCat(prefix, " ")); } else { info += absl::Substitute("$0recv queue\n", prefix); - info += DebugString(c.recv_data(), absl::StrCat(prefix, " ")); + info += DebugString(c.recv_data(), absl::StrCat(prefix, " ")); info += absl::Substitute("$0send queue\n", prefix); - info += DebugString(c.send_data(), absl::StrCat(prefix, " ")); + info += DebugString(c.send_data(), absl::StrCat(prefix, " ")); } return info; diff --git a/src/stirling/source_connectors/socket_tracer/conn_tracker_test.cc b/src/stirling/source_connectors/socket_tracer/conn_tracker_test.cc index bc562cee64c..04e66d4b260 100644 --- a/src/stirling/source_connectors/socket_tracer/conn_tracker_test.cc +++ b/src/stirling/source_connectors/socket_tracer/conn_tracker_test.cc @@ -375,7 +375,7 @@ TEST_F(ConnTrackerTest, MemUsage) { auto frame1 = event_gen_.InitSendEvent(kHTTPResp0); ConnTracker tracker; - tracker.InitFrames(); + tracker.InitFrames(); // Initial memory use is not 0, because the DataStreamBuffer has a small initial capacity. size_t mem_usage = tracker.MemUsage(); @@ -442,7 +442,7 @@ TEST_F(ConnTrackerTest, BufferClearedAfterExpiration) { tracker.ProcessToRecords(); tracker.Cleanup(frame_size_limit_bytes, buffer_size_limit_bytes, frame_expiry_timestamp, buffer_expiry_timestamp); - EXPECT_EQ(tracker.req_data()->Frames().size(), 1); + EXPECT_EQ((tracker.req_data()->Frames()[0].size()), 1); } TEST_F(ConnTrackerTest, BufferTruncatedBeyondSizeLimit) { @@ -460,7 +460,7 @@ TEST_F(ConnTrackerTest, BufferTruncatedBeyondSizeLimit) { tracker.Cleanup(frame_size_limit_bytes, buffer_size_limit_bytes, frame_expiry_timestamp, buffer_expiry_timestamp); EXPECT_EQ(tracker.req_data()->data_buffer().size(), buffer_size_limit_bytes); - EXPECT_THAT(tracker.req_frames(), IsEmpty()); + EXPECT_THAT((tracker.req_frames()[0]), IsEmpty()); } TEST_F(ConnTrackerTest, MessagesErasedAfterExpiration) { @@ -480,13 +480,13 @@ TEST_F(ConnTrackerTest, MessagesErasedAfterExpiration) { tracker.ProcessToRecords(); tracker.Cleanup(frame_size_limit_bytes, buffer_size_limit_bytes, frame_expiry_timestamp, buffer_expiry_timestamp); - EXPECT_THAT(tracker.req_frames(), SizeIs(1)); + EXPECT_THAT((tracker.req_frames()[0]), SizeIs(1)); frame_expiry_timestamp = now(); tracker.ProcessToRecords(); tracker.Cleanup(frame_size_limit_bytes, buffer_size_limit_bytes, frame_expiry_timestamp, buffer_expiry_timestamp); - EXPECT_THAT(tracker.req_frames(), IsEmpty()); + EXPECT_THAT((tracker.req_frames()[0]), IsEmpty()); } // Tests that tracker state is kDisabled if the remote address is in the cluster's CIDR range. diff --git a/src/stirling/source_connectors/socket_tracer/data_stream.cc b/src/stirling/source_connectors/socket_tracer/data_stream.cc index 6383535b911..73d1bd3932a 100644 --- a/src/stirling/source_connectors/socket_tracer/data_stream.cc +++ b/src/stirling/source_connectors/socket_tracer/data_stream.cc @@ -69,9 +69,9 @@ void DataStream::AddData(std::unique_ptr event) { // To be robust to lost events, which are not necessarily aligned to parseable entity boundaries, // ProcessBytesToFrames() will invoke a call to ParseFrames() with a stream recovery argument when // necessary. -template +template void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) { - auto& typed_messages = Frames(); + auto& typed_messages = Frames(); // TODO(oazizi): Convert to ECHECK once we have more confidence. LOG_IF(WARNING, IsEOS()) << "DataStream reaches EOS, no more data to process."; @@ -180,30 +180,36 @@ void DataStream::ProcessBytesToFrames(message_type_t type, TStateType* state) { } // PROTOCOL_LIST: Requires update on new protocols. -template void -DataStream::ProcessBytesToFrames( +template void DataStream::ProcessBytesToFrames< + protocols::http::stream_id_t, protocols::http::Message, protocols::http::StateWrapper>( message_type_t type, protocols::http::StateWrapper* state); -template void DataStream::ProcessBytesToFrames( - message_type_t type, protocols::NoState* state); -template void -DataStream::ProcessBytesToFrames( +template void DataStream::ProcessBytesToFrames(message_type_t type, + protocols::NoState* state); +template void DataStream::ProcessBytesToFrames< + protocols::mysql::connection_id_t, protocols::mysql::Packet, protocols::mysql::StateWrapper>( message_type_t type, protocols::mysql::StateWrapper* state); -template void DataStream::ProcessBytesToFrames( - message_type_t type, protocols::NoState* state); -template void -DataStream::ProcessBytesToFrames( - message_type_t type, protocols::pgsql::StateWrapper* state); -template void DataStream::ProcessBytesToFrames( +template void DataStream::ProcessBytesToFrames(message_type_t type, + protocols::NoState* state); +template void DataStream::ProcessBytesToFrames< + protocols::pgsql::connection_id_t, protocols::pgsql::RegularMessage, + protocols::pgsql::StateWrapper>(message_type_t type, protocols::pgsql::StateWrapper* state); +template void DataStream::ProcessBytesToFrames(message_type_t type, + protocols::NoState* state); +template void DataStream::ProcessBytesToFrames( message_type_t type, protocols::NoState* state); -template void DataStream::ProcessBytesToFrames( - message_type_t type, protocols::NoState* state); -template void -DataStream::ProcessBytesToFrames( +template void DataStream::ProcessBytesToFrames< + protocols::kafka::correlation_id_t, protocols::kafka::Packet, protocols::kafka::StateWrapper>( message_type_t type, protocols::kafka::StateWrapper* state); -template void DataStream::ProcessBytesToFrames( - message_type_t type, protocols::NoState* state); -template void DataStream::ProcessBytesToFrames( +template void DataStream::ProcessBytesToFrames( message_type_t type, protocols::NoState* state); +template void DataStream::ProcessBytesToFrames(message_type_t type, + protocols::NoState* state); void DataStream::Reset() { data_buffer_.Reset(); has_new_events_ = false; diff --git a/src/stirling/source_connectors/socket_tracer/data_stream.h b/src/stirling/source_connectors/socket_tracer/data_stream.h index 70c6728cc76..44336c2da6e 100644 --- a/src/stirling/source_connectors/socket_tracer/data_stream.h +++ b/src/stirling/source_connectors/socket_tracer/data_stream.h @@ -18,6 +18,7 @@ #pragma once +#include #include #include #include @@ -66,23 +67,24 @@ class DataStream : NotCopyMoveable { * @param type whether to parse as requests, responses or mixed traffic. * @return deque of parsed messages. */ - template + template void ProcessBytesToFrames(message_type_t type, TStateType* state); /** * Initialize the frames to the requested frame type. */ - template + template void InitFrames() { - DCHECK(std::holds_alternative(frames_) || - std::holds_alternative>(frames_)) - << absl::Substitute( - "Must hold the default std::monostate, or the same type as requested. " - "I.e., ConnTracker cannot change the type it holds during runtime. $0 -> $1", - frames_.index(), typeid(TFrameType).name()); + bool check_condition = + std::holds_alternative(frames_) || + std::holds_alternative>>(frames_); + DCHECK(check_condition) << absl::Substitute( + "Must hold the default std::monostate, or the same type as requested. " + "I.e., ConnTracker cannot change the type it holds during runtime. $0 -> $1", + frames_.index(), typeid(TFrameType).name()); if (std::holds_alternative(frames_)) { // Reset the type to the expected type. - frames_ = std::deque(); + frames_ = absl::flat_hash_map>(); LOG_IF(ERROR, frames_.valueless_by_exception()) << absl::Substitute("valueless_by_exception() triggered by initializing to type: $0", typeid(TFrameType).name()); @@ -94,33 +96,36 @@ class DataStream : NotCopyMoveable { * @tparam TFrameType The parsed frame type within the deque. * @return deque of frames. */ - template - std::deque& Frames() { + template + absl::flat_hash_map>& Frames() { // As a safety net, make sure the frames have been initialized. - InitFrames(); + InitFrames(); LOG_IF(ERROR, frames_.valueless_by_exception()) << absl::Substitute( "valueless_by_exception() triggered by type: $0", typeid(TFrameType).name()); - return std::get>(frames_); + return std::get>>(frames_); } - template - const std::deque& Frames() const { - DCHECK(std::holds_alternative>(frames_)) << absl::Substitute( - "Must hold the same type as requested. " - "I.e., ConnTracker cannot change the type it holds during runtime. $0 -> $1", - frames_.index(), typeid(TFrameType).name()); - return std::get>(frames_); + template + const absl::flat_hash_map>& Frames() const { + DCHECK((std::holds_alternative>>(frames_))) + << absl::Substitute( + "Must hold the same type as requested. " + "I.e., ConnTracker cannot change the type it holds during runtime. $0 -> $1", + frames_.index(), typeid(TFrameType).name()); + return std::get>>(frames_); } /** * Approximate size of the parsed frames in the DataStream. */ - template + template size_t FramesSize() const { size_t size = 0; - for (const auto& msg : Frames()) { - size += msg.ByteSize(); + for (const auto& [_, frames] : Frames()) { + for (const auto& frame : frames) { + size += frame.ByteSize(); + } } return size; } @@ -134,10 +139,22 @@ class DataStream : NotCopyMoveable { * Checks if the DataStream is empty of both raw events and parsed messages. * @return true if empty of all data. */ - template + template bool Empty() const { - return data_buffer_.empty() && (std::holds_alternative(frames_) || - std::get>(frames_).empty()); + bool data_buffer_empty = data_buffer_.empty(); + bool monostate = std::holds_alternative(frames_); + if (data_buffer_empty || monostate) { + return true; + } + bool all_deques_empty = true; + for (const auto& [_, frames] : + std::get>>(frames_)) { + if (!frames.empty()) { + all_deques_empty = false; + break; + } + } + return all_deques_empty; } /** @@ -215,16 +232,18 @@ class DataStream : NotCopyMoveable { /** * Cleanup frames that are parsed from the BPF events, when the condition is right. */ - template + template void CleanupFrames(size_t size_limit_bytes, std::chrono::time_point expiry_timestamp) { - size_t size = FramesSize(); + size_t size = FramesSize(); if (size > size_limit_bytes) { VLOG(1) << absl::Substitute("Messages cleared due to size limit ($0 > $1).", size, size_limit_bytes); - Frames().clear(); + for (auto& [_, frame_deque] : Frames()) { + frame_deque.clear(); + } } - EraseExpiredFrames(expiry_timestamp, &Frames()); + EraseExpiredFrames(expiry_timestamp, &Frames()); } /** @@ -254,22 +273,24 @@ class DataStream : NotCopyMoveable { protocols::DataStreamBuffer& data_buffer() { return data_buffer_; } private: - template + template static void EraseExpiredFrames( std::chrono::time_point expiry_timestamp, - std::deque* frames) { - auto iter = frames->begin(); - for (; iter != frames->end(); ++iter) { - auto frame_timestamp = std::chrono::time_point( - std::chrono::nanoseconds(iter->timestamp_ns)); - // As messages are put into the list with monotonically increasing creation time stamp, - // we can just stop at the first frame that is younger than the expiration duration. - // TODO(yzhao): Benchmark with binary search and pick the faster one. - if (expiry_timestamp < frame_timestamp) { - break; + absl::flat_hash_map>* frames) { + for (auto& [key, deque] : *frames) { + auto iter = deque.begin(); + for (; iter != deque.end(); ++iter) { + auto frame_timestamp = std::chrono::time_point( + std::chrono::nanoseconds(iter->timestamp_ns)); + // As messages are put into the list with monotonically increasing creation time stamp, + // we can just stop at the first frame that is younger than the expiration duration. + // TODO(yzhao): Benchmark with binary search and pick the faster one. + if (expiry_timestamp < frame_timestamp) { + break; + } } + deque.erase(deque.begin(), iter); } - frames->erase(frames->begin(), iter); } // Raw data events from BPF. @@ -320,19 +341,24 @@ class DataStream : NotCopyMoveable { // Keep track of the protocol for this DataStream so that data loss can be reported per protocol. traffic_protocol_t protocol_ = traffic_protocol_t::kProtocolUnknown; - template + template friend std::string DebugString(const DataStream& d, std::string_view prefix); }; // Note: can't make DebugString a class member because of GCC restrictions. -template +template inline std::string DebugString(const DataStream& d, std::string_view prefix) { std::string info; info += absl::Substitute("$0raw event bytes=$1\n", prefix, d.data_buffer_.size()); int frames_size; - if (std::holds_alternative>(d.frames_)) { - frames_size = std::get>(d.frames_).size(); + if (std::holds_alternative>>(d.frames_)) { + const auto& frames_map = std::get>>(d.frames_); + // Loop through the map to sum the sizes of all the deques + frames_size = 0; + for (const auto& [key, frame_deque] : frames_map) { + frames_size += frame_deque.size(); + } } else if (std::holds_alternative(d.frames_)) { frames_size = 0; } else { diff --git a/src/stirling/source_connectors/socket_tracer/data_stream_test.cc b/src/stirling/source_connectors/socket_tracer/data_stream_test.cc index 0014598e833..693496df826 100644 --- a/src/stirling/source_connectors/socket_tracer/data_stream_test.cc +++ b/src/stirling/source_connectors/socket_tracer/data_stream_test.cc @@ -75,25 +75,29 @@ TEST_F(DataStreamTest, LostEvent) { // Start off with no lost events. stream.AddData(std::move(req0)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), SizeIs(1)); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + auto frames = stream.Frames(); + EXPECT_THAT(frames[0], SizeIs(1)); // Now add some lost events - should get skipped over. PX_UNUSED(req1); // Lost event. stream.AddData(std::move(req2)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), SizeIs(2)); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + frames = stream.Frames(); + EXPECT_THAT(frames[0], SizeIs(2)); // Some more requests, and another lost request (this time undetectable). stream.AddData(std::move(req3)); PX_UNUSED(req4); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), SizeIs(3)); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + frames = stream.Frames(); + EXPECT_THAT(frames[0], SizeIs(3)); // Now the lost event should be detected. stream.AddData(std::move(req5)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), SizeIs(4)); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + frames = stream.Frames(); + EXPECT_THAT(frames[0], SizeIs(4)); EXPECT_EQ( req1->msg.size() + req4->msg.size(), @@ -115,16 +119,17 @@ TEST_F(DataStreamTest, StuckTemporarily) { stream.set_protocol(kProtocolHTTP); stream.AddData(std::move(req0a)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), IsEmpty()); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + auto frames = stream.Frames(); + EXPECT_THAT(frames[0], IsEmpty()); // Remaining data arrives in time, so stuck count never gets high enough to flush events. stream.AddData(std::move(req0b)); stream.AddData(std::move(req1)); stream.AddData(std::move(req2)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - const auto& requests = stream.Frames(); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + const auto& requests = stream.Frames()[0]; ASSERT_THAT(requests, SizeIs(3)); EXPECT_EQ(requests[0].req_path, "/index.html"); EXPECT_EQ(requests[1].req_path, "/foo.html"); @@ -150,8 +155,9 @@ TEST_F(DataStreamTest, StuckTooLong) { stream.set_current_time(now()); stream.AddData(std::move(req0a)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), IsEmpty()); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + auto frames = stream.Frames(); + EXPECT_THAT(frames[0], IsEmpty()); stream.set_current_time(now() + std::chrono::seconds(FLAGS_buffer_expiration_duration_secs)); @@ -161,8 +167,8 @@ TEST_F(DataStreamTest, StuckTooLong) { stream.AddData(std::move(req1)); stream.AddData(std::move(req2)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - const auto& requests = stream.Frames(); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + const auto& requests = stream.Frames()[0]; ASSERT_THAT(requests, SizeIs(2)); EXPECT_EQ(requests[0].req_path, "/foo.html"); EXPECT_EQ(requests[1].req_path, "/bar.html"); @@ -190,8 +196,8 @@ TEST_F(DataStreamTest, PartialMessageRecovery) { PX_UNUSED(req1b); // Missing event. stream.AddData(std::move(req2)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - const auto& requests = stream.Frames(); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + const auto& requests = stream.Frames()[0]; ASSERT_THAT(requests, SizeIs(2)); EXPECT_EQ(requests[0].req_path, "/index.html"); EXPECT_EQ(requests[1].req_path, "/bar.html"); @@ -228,8 +234,8 @@ TEST_F(DataStreamTest, HeadAndMiddleMissing) { // The presence of a missing event should trigger the stream to make forward progress. - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - const auto& requests = stream.Frames(); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + const auto& requests = stream.Frames()[0]; ASSERT_THAT(requests, SizeIs(1)); EXPECT_EQ(requests[0].req_path, "/bar.html"); @@ -266,15 +272,17 @@ TEST_F(DataStreamTest, LateArrivalPlusMissingEvents) { DataStream stream; stream.set_protocol(kProtocolHTTP); stream.AddData(std::move(req0a)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - ASSERT_THAT(stream.Frames(), IsEmpty()); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + auto frames = stream.Frames(); + ASSERT_THAT(frames[0], IsEmpty()); // Setting buffer_expiry_timestamp to now to simulate a large delay. int buffer_size_limit = 10000; auto buffer_expiry_timestamp = now(); stream.CleanupEvents(buffer_size_limit, buffer_expiry_timestamp); - EXPECT_TRUE(stream.Empty()); + auto empty = stream.Empty(); + EXPECT_TRUE(empty); stream.AddData(std::move(req0b)); stream.AddData(std::move(req1a)); @@ -286,8 +294,8 @@ TEST_F(DataStreamTest, LateArrivalPlusMissingEvents) { stream.AddData(std::move(req4a)); stream.AddData(std::move(req4b)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - const auto& requests = stream.Frames(); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + const auto& requests = stream.Frames()[0]; ASSERT_THAT(requests, SizeIs(3)); EXPECT_EQ(requests[0].req_path, "/foo.html"); EXPECT_EQ(requests[1].req_path, "/index.html"); @@ -325,8 +333,9 @@ TEST_F(DataStreamTest, Stats) { EXPECT_EQ(stream.stat_invalid_frames(), 0); EXPECT_EQ(stream.stat_valid_frames(), 0); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_EQ(stream.Frames().size(), 2); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + auto frames = stream.Frames(); + EXPECT_EQ(frames[0].size(), 2); EXPECT_EQ(stream.stat_raw_data_gaps(), 0); EXPECT_EQ(stream.stat_invalid_frames(), 1); EXPECT_EQ(stream.stat_valid_frames(), 2); @@ -337,8 +346,9 @@ TEST_F(DataStreamTest, Stats) { stream.AddData(std::move(req6bad)); stream.AddData(std::move(req7)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_EQ(stream.Frames().size(), 5); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + frames = stream.Frames(); + EXPECT_EQ(frames[0].size(), 5); EXPECT_EQ(stream.stat_raw_data_gaps(), 1); EXPECT_EQ(stream.stat_invalid_frames(), 2); EXPECT_EQ(stream.stat_valid_frames(), 5); @@ -394,7 +404,7 @@ TEST_F(DataStreamTest, Stress) { } // Process the events. Here we are looking for any DCHECKS that may fire. - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); } } @@ -403,15 +413,18 @@ TEST_F(DataStreamTest, CannotSwitchType) { DataStream stream; stream.set_protocol(kProtocolHTTP); - stream.ProcessBytesToFrames(message_type_t::kRequest, &http_state); + stream.ProcessBytesToFrames(message_type_t::kRequest, + &http_state); #if DCHECK_IS_ON() protocols::mysql::StateWrapper mysql_state{}; - EXPECT_DEATH(stream.ProcessBytesToFrames(message_type_t::kRequest, &mysql_state), + EXPECT_DEATH((stream.ProcessBytesToFrames( + message_type_t::kRequest, &mysql_state)), "ConnTracker cannot change the type it holds during runtime"); #else protocols::mysql::StateWrapper mysql_state{}; - EXPECT_THROW(stream.ProcessBytesToFrames(message_type_t::kRequest, &mysql_state), + EXPECT_THROW((stream.ProcessBytesToFrames( + message_type_t::kRequest, &mysql_state)), std::exception); #endif } @@ -433,13 +446,14 @@ TEST_F(DataStreamTest, SpikeCapacityWithLargeDataChunk) { stream.AddData(std::move(resp2)); protocols::http::StateWrapper state{}; - stream.ProcessBytesToFrames(message_type_t::kResponse, &state); + stream.ProcessBytesToFrames(message_type_t::kResponse, &state); stream.CleanupEvents(retention_capacity_bytes, buffer_expiry_timestamp); - EXPECT_THAT(stream.Frames(), SizeIs(2)); + auto frames = stream.Frames(); + EXPECT_THAT(frames[0], SizeIs(2)); EXPECT_EQ(stream.data_buffer().size(), 16); // Run ProcessBytesToFrames again to propagate data loss stats. - stream.ProcessBytesToFrames(message_type_t::kResponse, &state); + stream.ProcessBytesToFrames(message_type_t::kResponse, &state); EXPECT_EQ( kHTTPIncompleteResp.length() - retention_capacity_bytes, SocketTracerMetrics::GetProtocolMetrics(kProtocolHTTP, kSSLNone).data_loss_bytes.Value()); @@ -465,13 +479,14 @@ TEST_F(DataStreamTest, SpikeCapacityWithLargeDataChunkAndSSLEnabled) { stream.AddData(std::move(resp2)); protocols::http::StateWrapper state{}; - stream.ProcessBytesToFrames(message_type_t::kResponse, &state); + stream.ProcessBytesToFrames(message_type_t::kResponse, &state); stream.CleanupEvents(retention_capacity_bytes, buffer_expiry_timestamp); - EXPECT_THAT(stream.Frames(), SizeIs(2)); + auto frames = stream.Frames(); + EXPECT_THAT(frames[0], SizeIs(2)); EXPECT_EQ(stream.data_buffer().size(), 16); // Run ProcessBytesToFrames again to propagate data loss stats. - stream.ProcessBytesToFrames(message_type_t::kResponse, &state); + stream.ProcessBytesToFrames(message_type_t::kResponse, &state); EXPECT_EQ(kHTTPIncompleteResp.length() - retention_capacity_bytes, SocketTracerMetrics::GetProtocolMetrics(kProtocolHTTP, kSSLUnspecified) .data_loss_bytes.Value()); @@ -497,8 +512,9 @@ TEST_F(DataStreamTest, ResyncCausesDuplicateEventBug) { stream.set_current_time(now()); stream.AddData(std::move(req0a)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - EXPECT_THAT(stream.Frames(), IsEmpty()); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + auto frames = stream.Frames(); + EXPECT_THAT(frames[0], IsEmpty()); stream.set_current_time(now() + std::chrono::seconds(FLAGS_buffer_expiration_duration_secs)); @@ -508,12 +524,12 @@ TEST_F(DataStreamTest, ResyncCausesDuplicateEventBug) { stream.AddData(std::move(req1)); stream.AddData(std::move(req2)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); stream.AddData(std::move(req3)); - stream.ProcessBytesToFrames(message_type_t::kRequest, &state); + stream.ProcessBytesToFrames(message_type_t::kRequest, &state); - const auto& requests = stream.Frames(); + const auto& requests = stream.Frames()[0]; ASSERT_THAT(requests, SizeIs(3)); EXPECT_EQ(requests[0].req_path, "/foo.html"); EXPECT_EQ(requests[1].req_path, "/bar.html"); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h b/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h index 452e23ce702..b475b80de93 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/amqp/types_gen.h @@ -207,10 +207,12 @@ struct Record { } }; +using channel_id = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Frame; using record_type = Record; using state_type = NoState; + using key_type = channel_id; }; } // namespace amqp diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h index 81c4e91c42b..387effd54f1 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h @@ -26,6 +26,7 @@ #include #include +#include #include "src/common/base/base.h" #include "src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/common.h" @@ -96,10 +97,10 @@ struct ParseResult { * * @return ParseResult with locations where parseable frames were found in the source buffer. */ -template +template ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffer, - std::deque* frames, bool resync = false, - TStateType* state = nullptr) { + absl::flat_hash_map>* frames, + bool resync = false, TStateType* state = nullptr) { std::string_view buf = data_stream_buffer->Head(); size_t start_pos = 0; @@ -120,13 +121,11 @@ ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffe buf.remove_prefix(start_pos); } - // Grab size before we start, so we know where the new parsed frames are. - const size_t prev_size = frames->size(); - // Parse and append new frames to the frames vector. - ParseResult result = ParseFramesLoop(type, buf, frames, state); + std::deque new_frames = std::deque(); + ParseResult result = ParseFramesLoop(type, buf, &new_frames, state); - VLOG(1) << absl::Substitute("Parsed $0 new frames", frames->size() - prev_size); + VLOG(1) << absl::Substitute("Parsed $0 new frames", new_frames.size()); // Match timestamps with the parsed frames. for (size_t i = 0; i < result.frame_positions.size(); ++i) { @@ -134,7 +133,7 @@ ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffe f.start += start_pos; f.end += start_pos; - auto& msg = (*frames)[prev_size + i]; + auto& msg = new_frames[i]; StatusOr timestamp_ns_status = data_stream_buffer->GetTimestamp(data_stream_buffer->position() + f.end); LOG_IF(ERROR, !timestamp_ns_status.ok()) << timestamp_ns_status.ToString(); @@ -142,6 +141,12 @@ ParseResult ParseFrames(message_type_t type, DataStreamBuffer* data_stream_buffe } result.end_position += start_pos; + // Parse frames into map + for (auto& frame : new_frames) { + // GetStreamID returns 0 by default if not implemented in protocol. + TKey key = GetStreamID(&frame); + (*frames)[key].push_back(std::move(frame)); + } return result; } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc index 0fcf4d35b04..aba11c62691 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/common/event_parser_test.cc @@ -39,6 +39,7 @@ using ::testing::Pair; // This test parser is a simple comma-separated value splitter. +using stream_id_t = uint16_t; struct TestFrame : public FrameBase { std::string msg; @@ -69,7 +70,7 @@ class EventParserTest : public DataStreamBufferTestWrapper, public ::testing::Te // Use test protocol to test basics of EventParser. TEST_F(EventParserTest, BasicProtocolParsing) { - std::deque word_frames; + absl::flat_hash_map> word_frames; // clang-format off std::vector event_messages = { @@ -93,8 +94,10 @@ TEST_F(EventParserTest, BasicProtocolParsing) { EXPECT_EQ(res.end_position, 44); std::vector timestamps; - for (const auto& frame : word_frames) { - timestamps.push_back(frame.timestamp_ns); + for (const auto& stream : word_frames) { + for (const auto& frame : stream.second) { + timestamps.push_back(frame.timestamp_ns); + } } EXPECT_THAT(timestamps, ElementsAre(0, 1, 1, 2, 3, 4)); } diff --git a/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.cc b/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.cc index 4c6f13f1295..e435926e02d 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.cc @@ -18,6 +18,7 @@ #include "src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.h" +#include #include #include #include diff --git a/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher_test.cc index 63f6d4d7a6e..787baf99cfa 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher_test.cc @@ -16,6 +16,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include #include @@ -162,8 +163,7 @@ constexpr uint8_t kSupportedResp[] = { 0x6e, 0x61, 0x70, 0x70, 0x79, 0x00, 0x03, 0x6c, 0x7a, 0x34, 0x00, 0x0b, 0x43, 0x51, 0x4c, 0x5f, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x00, 0x01, 0x00, 0x05, 0x33, 0x2e, 0x34, 0x2e, 0x34}; -// Asynchronous EVENT response from server. -// Content: SCHEMA_CHANGE DROPPED TABLE tutorialspoint emp +// Asynchronous EVENT response from server. Content: SCHEMA_CHANGE DROPPED TABLE tutorialspoint emp constexpr uint8_t kEventResp[] = {0x00, 0x0d, 0x53, 0x43, 0x48, 0x45, 0x4d, 0x41, 0x5f, 0x43, 0x48, 0x41, 0x4e, 0x47, 0x45, 0x00, 0x07, 0x44, 0x52, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x00, 0x05, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x00, 0x0e, diff --git a/src/stirling/source_connectors/socket_tracer/protocols/dns/types.h b/src/stirling/source_connectors/socket_tracer/protocols/dns/types.h index 471b54c2b9b..23a99d14ed7 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/dns/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/dns/types.h @@ -158,10 +158,12 @@ struct Record { } }; +using stream_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Frame; using record_type = Record; using state_type = NoState; + using key_type = stream_id_t; }; } // namespace dns diff --git a/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc index 4621bd27c7a..20211a84195 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc +++ b/src/stirling/source_connectors/socket_tracer/protocols/http/parse_test.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -791,13 +792,13 @@ TEST_P(HTTPParserTest, ParseHTTPRequestsRepeatedly) { AddEvent(events[1]); AddEvent(events[2]); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, /* resync */ false, &state); data_buffer_.RemovePrefix(result.end_position); ASSERT_EQ(ParseState::kSuccess, result.state); - ASSERT_THAT(parsed_messages, + ASSERT_THAT(parsed_messages[0], ElementsAre(HTTPGetReq0ExpectedMessage(), HTTPPostReq0ExpectedMessage())); } } @@ -828,14 +829,15 @@ TEST_P(HTTPParserTest, ParseHTTPResponsesRepeatedly) { AddEvent(events[1]); AddEvent(events[2]); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ false, &state); data_buffer_.RemovePrefix(result.end_position); ASSERT_EQ(ParseState::kSuccess, result.state); - ASSERT_THAT(parsed_messages, ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage(), - HTTPResp2ExpectedMessage())); + ASSERT_THAT(parsed_messages[0], + ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage(), + HTTPResp2ExpectedMessage())); } } @@ -860,12 +862,13 @@ TEST_F(HTTPParserTest, ParseHTTPResponsesWithLeftover) { AddEvent(events[1]); // Don't append last split, yet. - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ false, &state); ASSERT_EQ(ParseState::kNeedsMoreData, result.state); - ASSERT_THAT(parsed_messages, ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage())); + ASSERT_THAT(parsed_messages[0], + ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage())); data_buffer_.RemovePrefix(result.end_position); @@ -876,8 +879,9 @@ TEST_F(HTTPParserTest, ParseHTTPResponsesWithLeftover) { /* resync */ false, &state); ASSERT_EQ(ParseState::kSuccess, result.state); - ASSERT_THAT(parsed_messages, ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage(), - HTTPResp2ExpectedMessage())); + ASSERT_THAT(parsed_messages[0], + ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage(), + HTTPResp2ExpectedMessage())); } // Like ParseHTTPResponsesWithLeftover, but repeats test many times, @@ -907,7 +911,7 @@ TEST_P(HTTPParserTest, ParseHTTPResponsesWithLeftoverRepeatedly) { AddEvent(events[0]); AddEvent(events[1]); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result1 = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ false, &state); @@ -919,7 +923,7 @@ TEST_P(HTTPParserTest, ParseHTTPResponsesWithLeftoverRepeatedly) { /* resync */ false, &state); ASSERT_EQ(ParseState::kSuccess, result2.state); - ASSERT_THAT(parsed_messages, + ASSERT_THAT(parsed_messages[0], ElementsAre(HTTPResp0ExpectedMessage(), HTTPResp1ExpectedMessage(), HTTPResp2ExpectedMessage(), HTTPResp1ExpectedMessage())); } @@ -1051,7 +1055,7 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessage) { CreateEvents({partial_http_get_req0, kHTTPPostReq0, kHTTPGetReq1}); AddEvents(events); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, /* resync */ true, &state); @@ -1061,7 +1065,7 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessage) { data_buffer_.Reset(); EXPECT_EQ(ParseState::kSuccess, result.state); - ASSERT_THAT(parsed_messages, + ASSERT_THAT(parsed_messages[0], ElementsAre(HTTPPostReq0ExpectedMessage(), HTTPGetReq1ExpectedMessage())); } } @@ -1075,7 +1079,7 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessage) { CreateEvents({partial_http_resp0, kHTTPResp1, kHTTPResp2}); AddEvents(events); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ true, &state); @@ -1085,7 +1089,7 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessage) { data_buffer_.Reset(); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, + EXPECT_THAT(parsed_messages[0], ElementsAre(HTTPResp1ExpectedMessage(), HTTPResp2ExpectedMessage())); } } @@ -1101,12 +1105,12 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessageNoSync) { CreateEvents({partial_http_get_req0, kHTTPPostReq0, kHTTPGetReq1}); AddEvents(events); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, /* resync */ false, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, + EXPECT_THAT(parsed_messages[0], ElementsAre(HTTPPostReq0ExpectedMessage(), HTTPGetReq1ExpectedMessage())); } @@ -1118,12 +1122,13 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessageNoSync) { CreateEvents({partial_http_resp0, kHTTPResp1, kHTTPResp2}); AddEvents(events); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ false, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(HTTPResp1ExpectedMessage(), HTTPResp2ExpectedMessage())); + EXPECT_THAT(parsed_messages[0], + ElementsAre(HTTPResp1ExpectedMessage(), HTTPResp2ExpectedMessage())); } // The two tests below introduce a large, but incompletely traced request that @@ -1144,7 +1149,7 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessageWithSync) { CreateEvents({kStuckInducingReq, kHTTPPostReq0, kHTTPGetReq1}); AddEvents(events); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result; result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, @@ -1157,7 +1162,7 @@ TEST_F(HTTPParserTest, ParseReqWithPartialFirstMessageWithSync) { result = ParseFrames(message_type_t::kRequest, &data_buffer_, &parsed_messages, /* resync */ true, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, + EXPECT_THAT(parsed_messages[0], ElementsAre(HTTPPostReq0ExpectedMessage(), HTTPGetReq1ExpectedMessage())); } @@ -1174,7 +1179,7 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessageWithSync) { CreateEvents({kStuckInducingResp, kHTTPResp1, kHTTPResp2}); AddEvents(events); - std::deque parsed_messages; + absl::flat_hash_map> parsed_messages; ParseResult result; result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, @@ -1187,7 +1192,8 @@ TEST_F(HTTPParserTest, ParseRespWithPartialFirstMessageWithSync) { result = ParseFrames(message_type_t::kResponse, &data_buffer_, &parsed_messages, /* resync */ true, &state); EXPECT_EQ(ParseState::kSuccess, result.state); - EXPECT_THAT(parsed_messages, ElementsAre(HTTPResp1ExpectedMessage(), HTTPResp2ExpectedMessage())); + EXPECT_THAT(parsed_messages[0], + ElementsAre(HTTPResp1ExpectedMessage(), HTTPResp2ExpectedMessage())); } } // namespace http diff --git a/src/stirling/source_connectors/socket_tracer/protocols/http/types.h b/src/stirling/source_connectors/socket_tracer/protocols/http/types.h index a4d239aaa38..b7345329e3d 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/http/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/http/types.h @@ -72,10 +72,10 @@ struct Message : public FrameBase { std::string ToString() const override { return absl::Substitute( "[type=$0 minor_version=$1 headers=[$2] req_method=$3 " - "req_path=$4 resp_status=$5 resp_message=$6 body=$7]", + "req_path=$4 resp_status=$5 resp_message=$6 body=$7 timestamp=$8]", magic_enum::enum_name(type), minor_version, absl::StrJoin(headers, ",", absl::PairFormatter(":")), req_method, req_path, resp_status, - resp_message, body); + resp_message, body, timestamp_ns); } }; @@ -110,10 +110,12 @@ struct StateWrapper { std::monostate recv; }; +using stream_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Message; using record_type = Record; using state_type = StateWrapper; + using key_type = stream_id_t; }; } // namespace http diff --git a/src/stirling/source_connectors/socket_tracer/protocols/http2/types.h b/src/stirling/source_connectors/socket_tracer/protocols/http2/types.h index 55e9ceaa8b3..9545c0f5995 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/http2/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/http2/types.h @@ -215,11 +215,13 @@ struct Stream { }; using Record = Stream; +using stream_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Stream; using record_type = Record; using state_type = NoState; + using key_type = stream_id_t; static void ConvertTimestamps(record_type* record, ConvertTimestampsFuncType func) { record->send.timestamp_ns = func(record->send.timestamp_ns); diff --git a/src/stirling/source_connectors/socket_tracer/protocols/kafka/common/types.h b/src/stirling/source_connectors/socket_tracer/protocols/kafka/common/types.h index e3c33f1541e..b7ade9ad7d2 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/kafka/common/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/kafka/common/types.h @@ -393,10 +393,12 @@ struct StateWrapper { std::monostate recv; }; +using correlation_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Packet; using record_type = Record; using state_type = StateWrapper; + using key_type = correlation_id_t; }; } // namespace kafka diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mux/types.h b/src/stirling/source_connectors/socket_tracer/protocols/mux/types.h index 3ad01c4b210..c3c7be2be6a 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mux/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/mux/types.h @@ -18,6 +18,7 @@ #pragma once +#include #include #include @@ -196,11 +197,13 @@ struct Record { } }; +using stream_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Frame; using record_type = Record; // TODO(ddelnano): mux does have state but assume no state for now using state_type = NoState; + using key_type = stream_id_t; }; } // namespace mux diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mysql/types.h b/src/stirling/source_connectors/socket_tracer/protocols/mysql/types.h index 46da209d2f2..949b589e2b3 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mysql/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/mysql/types.h @@ -421,10 +421,12 @@ struct Record { } }; +using connection_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Packet; using record_type = Record; using state_type = StateWrapper; + using key_type = connection_id_t; }; } // namespace mysql diff --git a/src/stirling/source_connectors/socket_tracer/protocols/nats/types.h b/src/stirling/source_connectors/socket_tracer/protocols/nats/types.h index 1dc9a1e802f..94ce2904164 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/nats/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/nats/types.h @@ -60,11 +60,13 @@ struct Record { } }; +using stream_id_t = uint16_t; // Required by event parser interface. struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Message; using record_type = Record; using state_type = NoState; + using key_type = stream_id_t; }; constexpr std::string_view kInfo = "INFO"; diff --git a/src/stirling/source_connectors/socket_tracer/protocols/pgsql/types.h b/src/stirling/source_connectors/socket_tracer/protocols/pgsql/types.h index 6c90891fd69..45a2717e7b3 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/pgsql/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/pgsql/types.h @@ -564,10 +564,12 @@ struct StateWrapper { std::monostate recv; }; +using connection_id_t = uint16_t; struct ProtocolTraits : public BaseProtocolTraits { using frame_type = RegularMessage; using record_type = Record; using state_type = StateWrapper; + using key_type = connection_id_t; }; using MsgDeqIter = std::deque::iterator; diff --git a/src/stirling/source_connectors/socket_tracer/protocols/redis/types.h b/src/stirling/source_connectors/socket_tracer/protocols/redis/types.h index affdad63ddc..762402758a5 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/redis/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/redis/types.h @@ -60,11 +60,13 @@ struct Record { } }; +using stream_id_t = uint16_t; // Required by event parser interface. struct ProtocolTraits : public BaseProtocolTraits { using frame_type = Message; using record_type = Record; using state_type = NoState; + using key_type = stream_id_t; }; } // namespace redis diff --git a/src/stirling/source_connectors/socket_tracer/protocols/types.h b/src/stirling/source_connectors/socket_tracer/protocols/types.h index e9be726dd69..a8a93f041e7 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/types.h @@ -18,6 +18,7 @@ #pragma once +#include #include #include @@ -39,17 +40,18 @@ namespace protocols { // clang-format off // PROTOCOL_LIST: Requires update on new protocols. +// Note: stream_id is set to 0 for protocols that use a single stream / have no notion of streams. using FrameDequeVariant = std::variant, - std::deque, - std::deque, - std::deque, - std::deque, - std::deque, - std::deque, - std::deque, - std::deque, - std::deque>; + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>, + absl::flat_hash_map>>; // clang-format off } // namespace protocols diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc index 489922b884d..e24f221102f 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc @@ -721,6 +721,9 @@ void SocketTraceConnector::CheckTracerState() { } } +using stream_id_t = protocols::http::stream_id_t; +using message_t = protocols::http::Message; + void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) { set_iteration_time(now_fn_()); @@ -786,9 +789,9 @@ void SocketTraceConnector::TransferDataImpl(ConnectorContext* ctx) { } else { // If there's no transfer function, then the tracker should not be holding any data. // http::ProtocolTraits is used as a placeholder; the frames deque is expected to be - // std::monotstate. - ECHECK(conn_tracker->send_data().Empty()); - ECHECK(conn_tracker->recv_data().Empty()); + // std::monostate. + DCHECK((conn_tracker->send_data().Empty())); + DCHECK((conn_tracker->recv_data().Empty())); } conn_tracker->IterationPostTick(); @@ -1629,12 +1632,13 @@ template void SocketTraceConnector::TransferStream(ConnectorContext* ctx, ConnTracker* tracker, DataTable* data_table) { using TFrameType = typename TProtocolTraits::frame_type; + using TKey = typename TProtocolTraits::key_type; VLOG(3) << absl::StrCat("Connection\n", DebugString(*tracker, "")); // Make sure the tracker's frames containers have been properly initialized. // This is a nop if the containers are already of the right type. - tracker->InitFrames(); + tracker->InitFrames(); if (data_table != nullptr && tracker->state() == ConnTracker::State::kTransferring) { // ProcessToRecords() parses raw events and produces messages in format that are expected by diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector_test.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector_test.cc index a52d11d2cc1..20b74350ccd 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector_test.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector_test.cc @@ -856,8 +856,8 @@ TEST_F(SocketTraceConnectorTest, ConnectionCleanupInactiveAlive) { // Events should have been flushed. ASSERT_OK_AND_ASSIGN(const ConnTracker* tracker, source_->GetConnTracker(real_pid, real_fd)); - EXPECT_TRUE(tracker->recv_data().Empty()); - EXPECT_TRUE(tracker->send_data().Empty()); + EXPECT_TRUE((tracker->recv_data().Empty())); + EXPECT_TRUE((tracker->send_data().Empty())); } TEST_F(SocketTraceConnectorTest, TrackedUPIDTransfersData) {