Skip to content

Commit

Permalink
admin: added handlers for topic mount/unmount operations
Browse files Browse the repository at this point in the history
Added handlers that provide a wrapper around raw data migration APIs.
The operations support mounting and unmounting multiple topics at a
time. The handler creates a corresponding data migration objects and
returns underlying migration id.

Signed-off-by: Michał Maślanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Sep 3, 2024
1 parent 497f45c commit ae7172f
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/redpanda/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ v_cc_library(
admin/kafka.cc
admin/util.cc
admin/migrations.cc
admin/topics.cc
admin/data_migration_utils.cc
cli_parser.cc
application.cc
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/admin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ redpanda_cc_library(
"recovery.cc",
"security.cc",
"server.cc",
"topics.cc",
"transaction.cc",
"transform.cc",
"usage.cc",
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/admin/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ void admin_server::configure_admin_routes() {
register_shadow_indexing_routes();
register_wasm_transform_routes();
register_data_migration_routes();
register_topic_routes();
/**
* Special REST apis active only in recovery mode
*/
Expand Down
7 changes: 7 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ class admin_server {
void register_wasm_transform_routes();
void register_recovery_mode_routes();
void register_data_migration_routes();
void register_topic_routes();

ss::future<ss::json::json_return_type> patch_cluster_config_handler(
std::unique_ptr<ss::http::request>, const request_auth_result&);
Expand Down Expand Up @@ -656,6 +657,12 @@ class admin_server {
ss::future<ss::json::json_return_type>
delete_migration(std::unique_ptr<ss::http::request>);

// Topic routes
ss::future<ss::json::json_return_type>
mount_topics(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
unmount_topics(std::unique_ptr<ss::http::request>);

ss::future<> throw_on_error(
ss::http::request& req,
std::error_code ec,
Expand Down
190 changes: 190 additions & 0 deletions src/v/redpanda/admin/topics.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "base/vlog.h"
#include "cluster/controller.h"
#include "cluster/data_migration_frontend.h"
#include "cluster/data_migration_types.h"
#include "container/fragmented_vector.h"
#include "json/validator.h"
#include "redpanda/admin/api-doc/migration.json.hh"
#include "redpanda/admin/data_migration_utils.h"
#include "redpanda/admin/server.h"
#include "redpanda/admin/util.h"

using admin::apply_validator;

namespace {

json::validator make_mount_configuration_validator() {
const std::string schema = R"(
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"additionalProperties": false,
"required": [
"topics"
],
"properties": {
"topics": {
"type": "array",
"items": {
"$ref": "#/definitions/inbound_topic"
},
"description": "List of topics to mount"
}
},
"definitions": {
"namespaced_topic": {
"type": "object",
"required": [
"topic"
],
"properties": {
"topic": {
"type": "string"
},
"ns": {
"type": "string"
}
},
"additionalProperties": false
},
"inbound_topic": {
"type": "object",
"required": [
"source_topic"
],
"properties": {
"source_topic": {
"$ref": "#/definitions/namespaced_topic"
},
"alias": {
"$ref": "#/definitions/namespaced_topic"
},
"location": {
"type": "string"
}
},
"additionalProperties": false
}
}
})";
return json::validator(schema);
}

json::validator make_unmount_array_validator() {
const std::string schema = R"(
{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"additionalProperties": false,
"required": [
"topics"
],
"properties": {
"topics": {
"type": "array",
"items": {
"$ref": "#/definitions/namespaced_topic"
},
"description": "List of topics to unmount"
}
},
"definitions": {
"namespaced_topic": {
"type": "object",
"required": [
"topic"
],
"properties": {
"topic": {
"type": "string"
},
"ns": {
"type": "string"
}
},
"additionalProperties": false
}
}
})";
return json::validator(schema);
}

} // namespace

void admin_server::register_topic_routes() {
register_route<superuser>(
ss::httpd::migration_json::mount_topics,
[this](std::unique_ptr<ss::http::request> req) {
return mount_topics(std::move(req));
});
register_route<superuser>(
ss::httpd::migration_json::unmount_topics,
[this](std::unique_ptr<ss::http::request> req) {
return unmount_topics(std::move(req));
});
}

ss::future<ss::json::json_return_type>
admin_server::mount_topics(std::unique_ptr<ss::http::request> req) {
static thread_local json::validator validator
= make_mount_configuration_validator();
auto json_doc = co_await parse_json_body(req.get());
apply_validator(validator, json_doc);
cluster::data_migrations::inbound_migration migration;

migration.topics = parse_inbound_topics(json_doc);
auto result = co_await _controller->get_data_migration_frontend()
.local()
.create_migration(std::move(migration));
if (!result) {
vlog(
adminlog.warn,
"unable to create data migration for topic mount - error: {}",
result.error());
co_await throw_on_error(*req, result.error(), model::controller_ntp);
throw ss::httpd::server_error_exception(
"unknown error when creating data migration for mounting topics");
}

ss::httpd::migration_json::migration_info reply;
reply.id = result.value();
co_return std::move(reply);
}

ss::future<ss::json::json_return_type>
admin_server::unmount_topics(std::unique_ptr<ss::http::request> req) {
static thread_local json::validator validator
= make_unmount_array_validator();
auto json_doc = co_await parse_json_body(req.get());
apply_validator(validator, json_doc);
cluster::data_migrations::outbound_migration migration;

migration.topics = parse_topics(json_doc);
auto result = co_await _controller->get_data_migration_frontend()
.local()
.create_migration(std::move(migration));
if (!result) {
vlog(
adminlog.warn,
"unable to create data migration for topic unmount - error: {}",
result.error());
co_await throw_on_error(*req, result.error(), model::controller_ntp);
throw ss::httpd::server_error_exception(
"unknown error when creating data migration for unmounting topics");
}

ss::httpd::migration_json::migration_info reply;
reply.id = result.value();
co_return std::move(reply);
}

0 comments on commit ae7172f

Please sign in to comment.