Skip to content

Commit

Permalink
Merge pull request #22831 from WillemKauf/unknown_self_test_check_parse
Browse files Browse the repository at this point in the history
`cluster`: Parse unknown self-test checks
  • Loading branch information
WillemKauf authored Aug 16, 2024
2 parents 0e51f17 + bbd6445 commit cf7db12
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 69 deletions.
46 changes: 23 additions & 23 deletions src/v/cluster/self_test_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "base/seastarx.h"
#include "base/vlog.h"
#include "cluster/logger.h"
#include "json/document.h"
#include "ssx/future-util.h"

#include <seastar/core/abort_source.hh>
Expand Down Expand Up @@ -51,14 +52,18 @@ ss::future<> self_test_backend::stop() {
co_await std::move(f);
}

ss::future<std::vector<self_test_result>> self_test_backend::do_start_test(
std::vector<diskcheck_opts> dtos,
std::vector<netcheck_opts> ntos,
std::vector<cloudcheck_opts> ctos,
std::vector<unknown_check> unknown_checks) {
ss::future<std::vector<self_test_result>>
self_test_backend::do_start_test(start_test_request r) {
auto gate_holder = _gate.hold();
std::vector<self_test_result> results;

parse_self_test_checks(r);

auto dtos = std::move(r.dtos);
auto ntos = std::move(r.ntos);
auto ctos = std::move(r.ctos);
auto unparsed_checks = std::move(r.unparsed_checks);

_stage = self_test_stage::disk;
for (auto& dto : dtos) {
try {
Expand Down Expand Up @@ -151,13 +156,13 @@ ss::future<std::vector<self_test_result>> self_test_backend::do_start_test(
}
}

for (const auto& unknown_check : unknown_checks) {
for (const auto& unparsed_check : unparsed_checks) {
results.push_back(self_test_result{
.name = "Unknown",
.test_type = unknown_check.test_type,
.test_type = unparsed_check.test_type,
.error = fmt::format(
"Unknown test type {} requested on node {}",
unknown_check.test_type,
unparsed_check.test_type,
_self)});
}

Expand All @@ -173,21 +178,16 @@ get_status_response self_test_backend::start_test(start_test_request req) {
clusterlog.debug, "Request to start self-tests with id: {}", req.id);
ssx::background
= ssx::spawn_with_gate_then(_gate, [this, req = std::move(req)]() {
return do_start_test(
std::move(req.dtos),
std::move(req.ntos),
std::move(req.ctos),
std::move(req.unknown_checks))
.then([this, id = req.id](auto results) {
for (auto& r : results) {
r.test_id = id;
}
_prev_run = get_status_response{
.id = id,
.status = self_test_status::idle,
.stage = _stage,
.results = std::move(results)};
});
return do_start_test(std::move(req)).then([this](auto results) {
for (auto& r : results) {
r.test_id = _id;
}
_prev_run = get_status_response{
.id = _id,
.status = self_test_status::idle,
.results = std::move(results),
.stage = _stage};
});
}).finally([units = std::move(units)] {});
} else {
vlog(
Expand Down
7 changes: 2 additions & 5 deletions src/v/cluster/self_test_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ class self_test_backend {
ss::future<netcheck_response> netcheck(model::node_id, iobuf&&);

private:
ss::future<std::vector<self_test_result>> do_start_test(
std::vector<diskcheck_opts> dtos,
std::vector<netcheck_opts> ntos,
std::vector<cloudcheck_opts> ctos,
std::vector<unknown_check> unknown_checks);
ss::future<std::vector<self_test_result>>
do_start_test(start_test_request r);

struct previous_netcheck_entity {
static const inline model::node_id unassigned{-1};
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/self_test_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ ss::future<uuid_t> self_test_frontend::start_test(
}
if (
req.dtos.empty() && req.ntos.empty() && req.ctos.empty()
&& req.unknown_checks.empty()) {
&& req.unparsed_checks.empty()) {
throw self_test_exception("No tests specified to run");
}
/// Validate input
Expand Down Expand Up @@ -197,8 +197,9 @@ ss::future<uuid_t> self_test_frontend::start_test(
.id = test_id,
.dtos = std::move(req.dtos),
.ntos = std::move(new_ntos),
.unparsed_checks = std::move(req.unparsed_checks),
.ctos = std::move(req.ctos),
.unknown_checks = std::move(req.unknown_checks)});
});
});
co_return test_id;
}
Expand Down
28 changes: 28 additions & 0 deletions src/v/cluster/self_test_rpc_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,32 @@ make_netcheck_request(model::node_id src, size_t sz) {
co_return cluster::netcheck_request{.source = src, .buf = std::move(iob)};
}

void parse_self_test_checks(start_test_request& r) {
static constexpr auto known_checks = std::to_array(
{"disk", "network", "cloud"});
for (auto it = r.unparsed_checks.begin(); it != r.unparsed_checks.end();) {
const auto& test_type = it->test_type;
if (
std::find(known_checks.begin(), known_checks.end(), test_type)
!= known_checks.end()) {
json::Document doc;
if (doc.Parse(it->test_json.c_str()).HasParseError()) {
++it;
continue;
}
const auto& obj = doc.GetObject();
if (test_type == "disk") {
r.dtos.push_back(cluster::diskcheck_opts::from_json(obj));
} else if (test_type == "network") {
r.ntos.push_back(cluster::netcheck_opts::from_json(obj));
} else if (test_type == "cloud") {
r.ctos.push_back(cluster::cloudcheck_opts::from_json(obj));
}
it = r.unparsed_checks.erase(it);
} else {
++it;
}
}
}

} // namespace cluster
38 changes: 25 additions & 13 deletions src/v/cluster/self_test_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,22 +239,21 @@ struct cloudcheck_opts
}
};

// Captures unknown test types passed to self test frontend for logging
// purposes.
struct unknown_check
// Captures unparsed test types passed to self test backend.
struct unparsed_check
: serde::
envelope<unknown_check, serde::version<0>, serde::compat_version<0>> {
envelope<unparsed_check, serde::version<0>, serde::compat_version<0>> {
ss::sstring test_type;
ss::sstring test_json;
auto serde_fields() { return std::tie(test_type, test_json); }

friend std::ostream&
operator<<(std::ostream& o, const unknown_check& unknown_check) {
operator<<(std::ostream& o, const unparsed_check& unparsed_check) {
fmt::print(
o,
"{{test_type: {}, test_json: {}}}",
unknown_check.test_type,
unknown_check.test_json);
unparsed_check.test_type,
unparsed_check.test_json);
return o;
}
};
Expand Down Expand Up @@ -341,15 +340,19 @@ struct empty_request
struct start_test_request
: serde::envelope<
start_test_request,
serde::version<1>,
serde::version<2>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

uuid_t id;
std::vector<diskcheck_opts> dtos;
std::vector<netcheck_opts> ntos;
std::vector<unparsed_check> unparsed_checks;
std::vector<cloudcheck_opts> ctos;
std::vector<unknown_check> unknown_checks;

auto serde_fields() {
return std::tie(id, dtos, ntos, unparsed_checks, ctos);
}

friend std::ostream&
operator<<(std::ostream& o, const start_test_request& r) {
Expand All @@ -363,8 +366,8 @@ struct start_test_request
for (const auto& v : r.ctos) {
fmt::print(ss, "cloudcheck_opts: {}", v);
}
for (const auto& v : r.unknown_checks) {
fmt::print(ss, "unknown_check: {}", v);
for (const auto& v : r.unparsed_checks) {
fmt::print(ss, "unparsed_check: {}", v);
}
fmt::print(o, "{{id: {} {}}}", r.id, ss.str());
return o;
Expand All @@ -374,14 +377,14 @@ struct start_test_request
struct get_status_response
: serde::envelope<
get_status_response,
serde::version<0>,
serde::version<1>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

uuid_t id{};
self_test_status status{};
self_test_stage stage{};
std::vector<self_test_result> results;
self_test_stage stage{};

friend std::ostream&
operator<<(std::ostream& o, const get_status_response& r) {
Expand Down Expand Up @@ -426,4 +429,13 @@ struct netcheck_response
ss::future<cluster::netcheck_request>
make_netcheck_request(model::node_id src, size_t sz);

// Parses the raw json out of the start_test_request::unparsed_checks vector
// into self-test options for the various tests, utilizing `opt_t::from_json`.
// In the case that the controller node is of a redpanda version lower than
// the current node, some self-test checks may have been left in
// the "unparsed_checks" vector in the request when the server first processes
// the self test request. We will attempt to parse the test json in the
// self_test_backend of the follower node instead, if we recognize it.
void parse_self_test_checks(start_test_request& r);

} // namespace cluster
23 changes: 8 additions & 15 deletions src/v/redpanda/admin/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2821,22 +2821,15 @@ admin_server::self_test_start_handler(std::unique_ptr<ss::http::request> req) {
for (const auto& element : params) {
const auto& obj = element.GetObject();
const ss::sstring test_type(obj["type"].GetString());
if (test_type == "disk") {
r.dtos.push_back(cluster::diskcheck_opts::from_json(obj));
} else if (test_type == "network") {
r.ntos.push_back(cluster::netcheck_opts::from_json(obj));
} else if (test_type == "cloud") {
r.ctos.push_back(cluster::cloudcheck_opts::from_json(obj));
} else {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
element.Accept(writer);
r.unknown_checks.push_back(cluster::unknown_check{
.test_type = test_type,
.test_json = ss::sstring{
buffer.GetString(), buffer.GetSize()}});
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
element.Accept(writer);
r.unparsed_checks.push_back(cluster::unparsed_check{
.test_type = test_type,
.test_json = ss::sstring{
buffer.GetString(), buffer.GetSize()}});
}
cluster::parse_self_test_checks(r);
} else {
/// Default test run is to start 1 disk and 1 network test with
/// default arguments
Expand Down
Loading

0 comments on commit cf7db12

Please sign in to comment.