Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Thinkit] Add thread safe PacketIO handler. #805

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions tests/forwarding/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,38 @@ cc_library(
],
)

cc_library(
name = "l3_admit_test",
testonly = True,
srcs = ["l3_admit_test.cc"],
hdrs = ["l3_admit_test.h"],
deps = [
":util",
"//gutil:proto",
"//gutil:status_matchers",
"//lib/gnmi:gnmi_helper",
"//p4_pdpi:ir",
"//p4_pdpi:ir_cc_proto",
"//p4_pdpi:p4_runtime_session",
"//p4_pdpi/packetlib",
"//p4_pdpi/packetlib:packetlib_cc_proto",
"//sai_p4/instantiations/google:instantiations",
"//sai_p4/instantiations/google:sai_p4info_cc",
"//tests/lib:p4rt_fixed_table_programming_helper",
"//tests/lib:packet_in_helper",
"//thinkit:mirror_testbed_fixture",
"@com_github_google_glog//:glog",
"@com_github_p4lang_p4runtime//:p4info_cc_proto",
"@com_github_p4lang_p4runtime//:p4runtime_cc_proto",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest",
],
alwayslink = True,
)

cc_library(
name = "fuzzer_tests",
testonly = True,
Expand Down
98 changes: 98 additions & 0 deletions tests/forwarding/l3_admit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "tests/forwarding/l3_admit_test.h"

#include <memory>
#include <optional>
#include <utility>

#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/strings/substitute.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gutil/proto.h"
#include "gutil/status_matchers.h"
#include "lib/gnmi/gnmi_helper.h"
#include "p4/v1/p4runtime.pb.h"
#include "p4_pdpi/ir.h"
#include "p4_pdpi/ir.pb.h"
#include "p4_pdpi/p4_runtime_session.h"
#include "p4_pdpi/packetlib/packetlib.h"
#include "p4_pdpi/packetlib/packetlib.pb.h"
#include "tests/forwarding/util.h"
#include "tests/lib/p4rt_fixed_table_programming_helper.h"
#include "tests/lib/packet_in_helper.h"
#include "thinkit/mirror_testbed_fixture.h"

namespace pins {
namespace {

absl::Status AddAndSetDefaultVrf(pdpi::P4RuntimeSession& session,
const pdpi::IrP4Info& ir_p4info,
const std::string& vrf_id) {
pdpi::IrWriteRequest ir_write_request;
RETURN_IF_ERROR(gutil::ReadProtoFromString(
absl::Substitute(R"pb(
updates {
type: INSERT
table_entry {
table_name: "vrf_table"
matches {
name: "vrf_id"
exact { str: "$0" }
}
action { name: "no_action" }
}
}
updates {
type: INSERT
table_entry {
table_name: "acl_pre_ingress_table"
priority: 2000
action {
name: "set_vrf"
params {
name: "vrf_id"
value { str: "$0" }
}
}
}
}
)pb",
vrf_id),
&ir_write_request));
ASSIGN_OR_RETURN(p4::v1::WriteRequest pi_write_request,
pdpi::IrWriteRequestToPi(ir_p4info, ir_write_request));
return pdpi::SetMetadataAndSendPiWriteRequest(&session, pi_write_request);
}
} // namespace

TEST_P(L3AdmitTestFixture, L3PacketsAreRoutedWhenMacAddressIsInMyStation) {
LOG(INFO) << "Starting test.";

// PacketIO handlers for both the SUT and control switch.
std::unique_ptr<PacketInHelper> packetio_sut =
std::make_unique<PacketInHelper>(p4rt_sut_switch_session_.get(),
PacketInHelper::NoFilter);
std::unique_ptr<PacketInHelper> packetio_control =
std::make_unique<PacketInHelper>(p4rt_control_switch_session_.get(),
PacketInHelper::NoFilter);
}

} // namespace pins
40 changes: 40 additions & 0 deletions tests/forwarding/l3_admit_test.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef PINS_TESTS_FORWARDING_L3_ADMIT_TEST_H_
#define PINS_TESTS_FORWARDING_L3_ADMIT_TEST_H_

#include <memory>

#include "p4/config/v1/p4info.pb.h"
#include "p4_pdpi/ir.pb.h"
#include "p4_pdpi/p4_runtime_session.h"
#include "sai_p4/instantiations/google/instantiations.h"
#include "sai_p4/instantiations/google/sai_p4info.h"
#include "tests/lib/packet_in_helper.h"
#include "thinkit/mirror_testbed_fixture.h"

namespace pins {

class L3AdmitTestFixture : public thinkit::MirrorTestbedFixture {
protected:

// This test runs on a mirror testbed setup so we open a P4RT connection to
// both switches.
std::unique_ptr<pdpi::P4RuntimeSession> p4rt_sut_switch_session_;
std::unique_ptr<pdpi::P4RuntimeSession> p4rt_control_switch_session_;
};

} // namespace pins

#endif // PINS_TESTS_FORWARDING_L3_ADMIT_TEST_H_
14 changes: 14 additions & 0 deletions tests/lib/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,17 @@ cmd_diff_test(
":switch_test_setup_helpers_golden_test_runner",
],
)

cc_library(
name = "packet_in_helper",
srcs = ["packet_in_helper.cc"],
hdrs = ["packet_in_helper.h"],
deps = [
"//p4_pdpi:p4_runtime_session",
"@com_github_google_glog//:glog",
"@com_github_p4lang_p4runtime//:p4runtime_cc_proto",
"@com_google_absl//absl/status",
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/synchronization",
],
)
89 changes: 89 additions & 0 deletions tests/lib/packet_in_helper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "tests/lib/packet_in_helper.h"

#include "absl/status/status.h"
#include "absl/synchronization/mutex.h"
#include "glog/logging.h"
#include "p4/v1/p4runtime.pb.h"
#include "p4_pdpi/p4_runtime_session.h"

namespace pins {

PacketInHelper::PacketInHelper(
pdpi::P4RuntimeSession* p4rt_session,
std::function<bool(const p4::v1::StreamMessageResponse&)>
packet_in_message_filter)
: p4rt_session_(*p4rt_session),
packet_in_message_filter_(std::move(packet_in_message_filter)) {
CHECK(p4rt_session != nullptr); // Crash OK in ctor.

packet_in_thread_ = std::thread([&]() {
LOG(INFO) << "Start monitoring PacketIO events.";
p4::v1::StreamMessageResponse response;
while (p4rt_session_.StreamChannelRead(response)) {
if (packet_in_message_filter_(response)) {
PushBackPacketInMessage(response);
}
}
});
}

PacketInHelper::~PacketInHelper() {
// If the thread was never started then there isn't any other cleanup needed.
if (!packet_in_thread_.joinable()) return;

// Otherwise we try to stop the P4RT session, and join back the thread.
absl::Status stop_session = p4rt_session_.Finish();
if (!stop_session.ok()) {
LOG(ERROR) << "Problem stopping the P4RT session: " << stop_session;
return;
}

packet_in_thread_.join();
LOG(INFO) << "Stopped monitoring PacketIO events.";
}

bool PacketInHelper::NoFilter(const p4::v1::StreamMessageResponse& response) {
return true;
}

bool PacketInHelper::HasPacketInMessage() const {
absl::MutexLock l(&packet_in_lock_);
return !packet_in_messages_.empty();
}

absl::StatusOr<p4::v1::StreamMessageResponse>
PacketInHelper::GetNextPacketInMessage() {
absl::MutexLock l(&packet_in_lock_);

// If the queue is empty then we return an error.
if (packet_in_messages_.empty()) {
return absl::Status(absl::StatusCode::kOutOfRange,
"The PacketIn queue is empty.");
}

// Otherwise, we return the next packet in the queue.
p4::v1::StreamMessageResponse message = packet_in_messages_.front();
packet_in_messages_.pop();
return message;
}

void PacketInHelper::PushBackPacketInMessage(
const p4::v1::StreamMessageResponse& response) {
absl::MutexLock l(&packet_in_lock_);
packet_in_messages_.push(response);
}

} // namespace pins
89 changes: 89 additions & 0 deletions tests/lib/packet_in_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef PINS_TESTS_LIB_PACKET_IO_HELPER_H_
#define PINS_TESTS_LIB_PACKET_IO_HELPER_H_

#include <functional>
#include <queue>
#include <thread> // NOLINT: third_party code.

#include "absl/status/statusor.h"
#include "absl/synchronization/mutex.h"
#include "p4/v1/p4runtime.pb.h"
#include "p4_pdpi/p4_runtime_session.h"

namespace pins {

// A helper class for managing P4RT PacketIO requests. On construction it will
// spawn a thread to asynchronously collect PacketIn messages from the switch.
//
// NOTE: This class will not take ownership of a P4RT session, but on
// destruction it will close the gRPC stream channel to the switch.
class PacketInHelper final {
public:
// Spawns a thread to collect PacketIn messages sent from the switch.
//
// A filter can be used to limit the type of messages collected. The filter
// should return true for any packets the test wants to collect, and false for
// any packets the test wants to ignore.
explicit PacketInHelper(
pdpi::P4RuntimeSession* p4rt_session,
std::function<bool(const p4::v1::StreamMessageResponse&)>
packet_in_message_filter);

// Closes the P4RuntimeSession's stream, and joins the PacketIn thread.
~PacketInHelper();

// Always returns true so no packet gets filtered out.
static bool NoFilter(const p4::v1::StreamMessageResponse& response);

// Returns true if the PacketIn queue has packets. Otherwise it returns false.
bool HasPacketInMessage() const ABSL_LOCKS_EXCLUDED(packet_in_lock_);

// Returns the next packet in the queue. If no packet exists in the queue it
// will return an OUT_OF_BOUNDS error.
absl::StatusOr<p4::v1::StreamMessageResponse> GetNextPacketInMessage()
ABSL_LOCKS_EXCLUDED(packet_in_lock_);

private:
// Helper method used by the PacketIn thread to update the PacketIn messages.
void PushBackPacketInMessage(const p4::v1::StreamMessageResponse& response)
ABSL_LOCKS_EXCLUDED(packet_in_lock_);

pdpi::P4RuntimeSession& p4rt_session_;

// Thread is spawned in ctor and joined in dtor. It will wait for a PacketIn
// message then update the PacketIn message queue.
std::thread packet_in_thread_;

// Accessing the packet-in queue is lock protected because the P4RT server is
// sending new packets while the tests can be reading them back.
mutable absl::Mutex packet_in_lock_;

// Hold all PacketIn messages until the test reads them out.
std::queue<p4::v1::StreamMessageResponse> packet_in_messages_
ABSL_GUARDED_BY(packet_in_lock_);

// A filter to restrict which packets are actually collected by the
// PacketInHelper class.
//
// return true to collect the packet.
// return false to ignore the packet.
std::function<bool(const p4::v1::StreamMessageResponse&)>
packet_in_message_filter_;
};

} // namespace pins

#endif // PINS_TESTS_LIB_PACKET_IO_HELPER_H_
Loading