diff --git a/src/v/cluster/self_test_backend.cc b/src/v/cluster/self_test_backend.cc index 22f6731827098..dbb181f04ef4d 100644 --- a/src/v/cluster/self_test_backend.cc +++ b/src/v/cluster/self_test_backend.cc @@ -14,6 +14,7 @@ #include "base/seastarx.h" #include "base/vlog.h" #include "cluster/logger.h" +#include "json/document.h" #include "ssx/future-util.h" #include @@ -51,14 +52,18 @@ ss::future<> self_test_backend::stop() { co_await std::move(f); } -ss::future> self_test_backend::do_start_test( - std::vector dtos, - std::vector ntos, - std::vector ctos, - std::vector unknown_checks) { +ss::future> +self_test_backend::do_start_test(start_test_request r) { auto gate_holder = _gate.hold(); std::vector results; + parse_self_test_checks(r); + + auto dtos = std::move(r.dtos); + auto ntos = std::move(r.ntos); + auto ctos = std::move(r.ctos); + auto unparsed_checks = std::move(r.unparsed_checks); + _stage = self_test_stage::disk; for (auto& dto : dtos) { try { @@ -151,13 +156,13 @@ ss::future> self_test_backend::do_start_test( } } - for (const auto& unknown_check : unknown_checks) { + for (const auto& unparsed_check : unparsed_checks) { results.push_back(self_test_result{ .name = "Unknown", - .test_type = unknown_check.test_type, + .test_type = unparsed_check.test_type, .error = fmt::format( "Unknown test type {} requested on node {}", - unknown_check.test_type, + unparsed_check.test_type, _self)}); } @@ -173,21 +178,16 @@ get_status_response self_test_backend::start_test(start_test_request req) { clusterlog.debug, "Request to start self-tests with id: {}", req.id); ssx::background = ssx::spawn_with_gate_then(_gate, [this, req = std::move(req)]() { - return do_start_test( - std::move(req.dtos), - std::move(req.ntos), - std::move(req.ctos), - std::move(req.unknown_checks)) - .then([this, id = req.id](auto results) { - for (auto& r : results) { - r.test_id = id; - } - _prev_run = get_status_response{ - .id = id, - .status = self_test_status::idle, - .stage = _stage, - .results = std::move(results)}; - }); + return do_start_test(std::move(req)).then([this](auto results) { + for (auto& r : results) { + r.test_id = _id; + } + _prev_run = get_status_response{ + .id = _id, + .status = self_test_status::idle, + .results = std::move(results), + .stage = _stage}; + }); }).finally([units = std::move(units)] {}); } else { vlog( diff --git a/src/v/cluster/self_test_backend.h b/src/v/cluster/self_test_backend.h index 07028d24876fe..9f8a2278e7903 100644 --- a/src/v/cluster/self_test_backend.h +++ b/src/v/cluster/self_test_backend.h @@ -76,11 +76,8 @@ class self_test_backend { ss::future netcheck(model::node_id, iobuf&&); private: - ss::future> do_start_test( - std::vector dtos, - std::vector ntos, - std::vector ctos, - std::vector unknown_checks); + ss::future> + do_start_test(start_test_request r); struct previous_netcheck_entity { static const inline model::node_id unassigned{-1}; diff --git a/src/v/cluster/self_test_frontend.cc b/src/v/cluster/self_test_frontend.cc index ceb5e47ef4277..c2c0659f1bc29 100644 --- a/src/v/cluster/self_test_frontend.cc +++ b/src/v/cluster/self_test_frontend.cc @@ -142,7 +142,7 @@ ss::future self_test_frontend::start_test( } if ( req.dtos.empty() && req.ntos.empty() && req.ctos.empty() - && req.unknown_checks.empty()) { + && req.unparsed_checks.empty()) { throw self_test_exception("No tests specified to run"); } /// Validate input @@ -197,8 +197,9 @@ ss::future self_test_frontend::start_test( .id = test_id, .dtos = std::move(req.dtos), .ntos = std::move(new_ntos), + .unparsed_checks = std::move(req.unparsed_checks), .ctos = std::move(req.ctos), - .unknown_checks = std::move(req.unknown_checks)}); + }); }); co_return test_id; } diff --git a/src/v/cluster/self_test_rpc_types.cc b/src/v/cluster/self_test_rpc_types.cc index f4cb1e5048621..055cde93331e3 100644 --- a/src/v/cluster/self_test_rpc_types.cc +++ b/src/v/cluster/self_test_rpc_types.cc @@ -75,4 +75,32 @@ make_netcheck_request(model::node_id src, size_t sz) { co_return cluster::netcheck_request{.source = src, .buf = std::move(iob)}; } +void parse_self_test_checks(start_test_request& r) { + static constexpr auto known_checks = std::to_array( + {"disk", "network", "cloud"}); + for (auto it = r.unparsed_checks.begin(); it != r.unparsed_checks.end();) { + const auto& test_type = it->test_type; + if ( + std::find(known_checks.begin(), known_checks.end(), test_type) + != known_checks.end()) { + json::Document doc; + if (doc.Parse(it->test_json.c_str()).HasParseError()) { + ++it; + continue; + } + const auto& obj = doc.GetObject(); + if (test_type == "disk") { + r.dtos.push_back(cluster::diskcheck_opts::from_json(obj)); + } else if (test_type == "network") { + r.ntos.push_back(cluster::netcheck_opts::from_json(obj)); + } else if (test_type == "cloud") { + r.ctos.push_back(cluster::cloudcheck_opts::from_json(obj)); + } + it = r.unparsed_checks.erase(it); + } else { + ++it; + } + } +} + } // namespace cluster diff --git a/src/v/cluster/self_test_rpc_types.h b/src/v/cluster/self_test_rpc_types.h index 2dcafe6fa276b..e2841763e9ec7 100644 --- a/src/v/cluster/self_test_rpc_types.h +++ b/src/v/cluster/self_test_rpc_types.h @@ -239,22 +239,21 @@ struct cloudcheck_opts } }; -// Captures unknown test types passed to self test frontend for logging -// purposes. -struct unknown_check +// Captures unparsed test types passed to self test backend. +struct unparsed_check : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { ss::sstring test_type; ss::sstring test_json; auto serde_fields() { return std::tie(test_type, test_json); } friend std::ostream& - operator<<(std::ostream& o, const unknown_check& unknown_check) { + operator<<(std::ostream& o, const unparsed_check& unparsed_check) { fmt::print( o, "{{test_type: {}, test_json: {}}}", - unknown_check.test_type, - unknown_check.test_json); + unparsed_check.test_type, + unparsed_check.test_json); return o; } }; @@ -341,15 +340,19 @@ struct empty_request struct start_test_request : serde::envelope< start_test_request, - serde::version<1>, + serde::version<2>, serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; uuid_t id; std::vector dtos; std::vector ntos; + std::vector unparsed_checks; std::vector ctos; - std::vector unknown_checks; + + auto serde_fields() { + return std::tie(id, dtos, ntos, unparsed_checks, ctos); + } friend std::ostream& operator<<(std::ostream& o, const start_test_request& r) { @@ -363,8 +366,8 @@ struct start_test_request for (const auto& v : r.ctos) { fmt::print(ss, "cloudcheck_opts: {}", v); } - for (const auto& v : r.unknown_checks) { - fmt::print(ss, "unknown_check: {}", v); + for (const auto& v : r.unparsed_checks) { + fmt::print(ss, "unparsed_check: {}", v); } fmt::print(o, "{{id: {} {}}}", r.id, ss.str()); return o; @@ -374,14 +377,14 @@ struct start_test_request struct get_status_response : serde::envelope< get_status_response, - serde::version<0>, + serde::version<1>, serde::compat_version<0>> { using rpc_adl_exempt = std::true_type; uuid_t id{}; self_test_status status{}; - self_test_stage stage{}; std::vector results; + self_test_stage stage{}; friend std::ostream& operator<<(std::ostream& o, const get_status_response& r) { @@ -426,4 +429,13 @@ struct netcheck_response ss::future make_netcheck_request(model::node_id src, size_t sz); +// Parses the raw json out of the start_test_request::unparsed_checks vector +// into self-test options for the various tests, utilizing `opt_t::from_json`. +// In the case that the controller node is of a redpanda version lower than +// the current node, some self-test checks may have been left in +// the "unparsed_checks" vector in the request when the server first processes +// the self test request. We will attempt to parse the test json in the +// self_test_backend of the follower node instead, if we recognize it. +void parse_self_test_checks(start_test_request& r); + } // namespace cluster diff --git a/src/v/redpanda/admin/server.cc b/src/v/redpanda/admin/server.cc index 28c83f9d5ff74..a4958d2178a6a 100644 --- a/src/v/redpanda/admin/server.cc +++ b/src/v/redpanda/admin/server.cc @@ -2821,22 +2821,15 @@ admin_server::self_test_start_handler(std::unique_ptr req) { for (const auto& element : params) { const auto& obj = element.GetObject(); const ss::sstring test_type(obj["type"].GetString()); - if (test_type == "disk") { - r.dtos.push_back(cluster::diskcheck_opts::from_json(obj)); - } else if (test_type == "network") { - r.ntos.push_back(cluster::netcheck_opts::from_json(obj)); - } else if (test_type == "cloud") { - r.ctos.push_back(cluster::cloudcheck_opts::from_json(obj)); - } else { - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - element.Accept(writer); - r.unknown_checks.push_back(cluster::unknown_check{ - .test_type = test_type, - .test_json = ss::sstring{ - buffer.GetString(), buffer.GetSize()}}); - } + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + element.Accept(writer); + r.unparsed_checks.push_back(cluster::unparsed_check{ + .test_type = test_type, + .test_json = ss::sstring{ + buffer.GetString(), buffer.GetSize()}}); } + cluster::parse_self_test_checks(r); } else { /// Default test run is to start 1 disk and 1 network test with /// default arguments diff --git a/tests/rptest/tests/self_test_test.py b/tests/rptest/tests/self_test_test.py index 2106c73225034..5ca13feeda9bd 100644 --- a/tests/rptest/tests/self_test_test.py +++ b/tests/rptest/tests/self_test_test.py @@ -9,12 +9,15 @@ import re import time +from collections import defaultdict from rptest.services.cluster import cluster from rptest.tests.end_to_end import EndToEndTest from rptest.tests.redpanda_test import RedpandaTest from rptest.clients.rpk import RpkTool from rptest.services.admin import Admin from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings +from rptest.services.redpanda_installer import RedpandaVersionLine +from rptest.services.redpanda_installer import InstallOptions from ducktape.utils.util import wait_until from ducktape.mark import matrix from rptest.utils.functional import flat_map @@ -221,9 +224,9 @@ def test_self_test_unknown_test_type(self): cloud_storage_enable_remote_read=True, cloud_storage_enable_remote_write=True)) - #Attempt to run with an unknown test type "pandatest" - #and possibly unknown "cloud" test. - #The rest of the tests should proceed as normal. + # Attempt to run with an unknown test type "pandatest" + # and possibly unknown "cloud" test. + # The rest of the tests should proceed as normal. request_json = { 'tests': [{ 'type': 'pandatest' @@ -236,32 +239,32 @@ def test_self_test_unknown_test_type(self): }] } - #Manually invoke self test admin endpoint. + # Manually invoke self test admin endpoint. self.redpanda._admin._request('POST', 'debug/self_test/start', json=request_json) - #Populate list of unknown reports. + # Populate list of unknown reports. unknown_report_types = ['pandatest'] redpanda_versions = [ self.redpanda.get_version_int_tuple(node) for node in self.redpanda.nodes ] - #All nodes should have the same version of - #Redpanda running. + # All nodes should have the same version of + # Redpanda running. assert len(set(redpanda_versions)) == 1 - #Cloudcheck was introduced in 24.2.1. - #Expect that it will be unknown to nodes running - #earlier versions of redpanda. + # Cloudcheck was introduced in 24.2.1. + # Expect that it will be unknown to nodes running + # earlier versions of redpanda. if redpanda_versions[0] < (24, 2, 1): unknown_report_types.append('cloud') # Wait for self test completion. node_reports = self.wait_for_self_test_completion() - #Assert reports are passing, with the exception of unknown tests. + # Assert reports are passing, with the exception of unknown tests. reports = flat_map(lambda node: node['results'], node_reports) assert len(reports) > 0 for report in reports: @@ -270,3 +273,81 @@ def test_self_test_unknown_test_type(self): else: assert 'error' not in report assert 'warning' not in report + + @cluster(num_nodes=3) + def test_self_test_mixed_node_controller_lower_version(self): + """Assert the self test still runs when the controller node + is of a lower version than the rest of the nodes in the cluster. + The upgraded follower nodes should be able to parse the "unknown" + checks (currently just the cloudcheck), and then run and return + their results to the controller node.""" + num_nodes = 3 + + install_opts = InstallOptions(version=RedpandaVersionLine((24, 1)), + num_to_upgrade=2) + self.start_redpanda( + num_nodes=num_nodes, + si_settings=SISettings(test_context=self.test_context), + install_opts=install_opts) + + # Attempt to run with a possibly unknown "cloud" test. + # The controller, which is of a lower version than the other nodes in the cluster, + # doesn't recognize "cloud" as a test, but the other nodes should. + request_json = { + 'tests': [{ + 'type': 'cloud', + 'backoff_ms': 100, + 'timeout_ms': 5000 + }] + } + + redpanda_versions = { + i: self.redpanda.get_version_int_tuple(node) + for (i, node) in enumerate(self.redpanda.nodes) + } + + controller_node_index = min(redpanda_versions, + key=redpanda_versions.get) + controller_node_id = controller_node_index + 1 + # Make sure that the lowest version node is the controller. + self.redpanda._admin.partition_transfer_leadership( + 'redpanda', 'controller', 0, controller_node_id) + wait_until(lambda: self.redpanda._admin.get_partition_leader( + namespace="redpanda", topic="controller", partition=0) == + controller_node_id, + timeout_sec=10, + backoff_sec=1, + err_msg="Leadership did not stabilize") + + # Manually invoke self test admin endpoint, using the lowest version node as the target. + self.redpanda._admin._request( + 'POST', + 'debug/self_test/start', + json=request_json, + node=self.redpanda.nodes[controller_node_index]) + + # Wait for self test completion. + node_reports = self.wait_for_self_test_completion() + + unknown_checks_map = defaultdict(set) + for node, version in redpanda_versions.items(): + node_id = node + 1 + # Cloudcheck was introduced in 24.2.1. + # Expect that it will be unknown to nodes running + # earlier versions of redpanda. + if version < (24, 2, 1): + unknown_checks_map[node_id].add('cloud') + + # Assert reports are passing, with the exception of unknown tests. + assert len(node_reports) > 0 + for report in node_reports: + node = report['node_id'] + results = report['results'] + # Results shouldn't be empty, even for unknown checks. + assert len(results) > 0 + for result in results: + if result['test_type'] in unknown_checks_map[node]: + assert 'error' in result + else: + assert 'error' not in result + assert 'warning' not in result