Skip to content

Commit

Permalink
schema_registry/service: Create internal topic on startup
Browse files Browse the repository at this point in the history
Creating the topic in the test is no longer required

Signed-off-by: Ben Pope <ben@vectorized.io>
  • Loading branch information
BenPope committed May 28, 2021
1 parent 5403997 commit 19f28b9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
55 changes: 54 additions & 1 deletion src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@

#include "pandaproxy/schema_registry/service.h"

#include "kafka/protocol/create_topics.h"
#include "kafka/server/handlers/topics/types.h"
#include "model/fundamental.h"
#include "model/timeout_clock.h"
#include "pandaproxy/api/api-doc/schema_registry.json.h"
#include "pandaproxy/error.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/configuration.h"
#include "pandaproxy/schema_registry/handlers.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/memory.hh>
#include <seastar/core/std-coroutine.hh>
#include <seastar/http/api_docs.hh>
Expand All @@ -24,6 +30,9 @@ namespace pandaproxy::schema_registry {

using server = ctx_server<service>;

static const model::topic_partition schemas_tp{
model::topic{"_schemas"}, model::partition_id{0}};

template<typename Handler>
auto wrap_with_gate(ss::gate& g, Handler h) {
return
Expand Down Expand Up @@ -66,6 +75,40 @@ server::routes_t get_schema_registry_routes(ss::gate& gate) {
return routes;
}

ss::future<> service::create_internal_topic() {
vlog(plog.debug, "Schema registry: attempting to create internal topic");
static constexpr auto make_schema_topic_req = []() {
return kafka::create_topics_request{.data{.topics{
{.name{schemas_tp.topic},
.num_partitions = 1, // TODO(Ben): Make configurable
.replication_factor = 1, // TODO(Ben): Make configurable
.assignments{},
.configs{
{.name{ss::sstring{kafka::topic_property_cleanup_policy}},
.value{"compact"}}}}}}};
};
auto res = co_await _client.local().dispatch(make_schema_topic_req);
if (res.data.topics.size() != 1) {
throw std::runtime_error("Unexpected topic count");
}

const auto& topic = res.data.topics[0];
if (topic.error_code == kafka::error_code::none) {
vlog(plog.debug, "Schema registry: created internal topic");
} else if (topic.error_code == kafka::error_code::topic_already_exists) {
vlog(plog.debug, "Schema registry: found internal topic");
} else if (topic.error_code == kafka::error_code::not_controller) {
vlog(plog.debug, "Schema registry: not controller");
} else {
throw kafka::exception(
topic.error_code,
topic.error_message.value_or(
kafka::make_error_code(topic.error_code).message()));
}

// TODO(Ben): Validate the _schemas topic
}

service::service(
const YAML::Node& config,
ss::smp_service_group smp_sg,
Expand All @@ -85,10 +128,20 @@ service::service(
ss::future<> service::start() {
static std::vector<model::broker_endpoint> not_advertised{};
_server.routes(get_schema_registry_routes(_gate));
return _server.start(
co_await _server.start(
_config.schema_registry_api(),
_config.schema_registry_api_tls(),
not_advertised);
(void)ss::with_gate(_gate, [this]() -> ss::future<> {
try {
co_await create_internal_topic();
} catch (...) {
vlog(
plog.error,
"Schema registry failed to initialize internal topic: {}",
std::current_exception());
}
});
}

ss::future<> service::stop() {
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class service {
store& schema_store() { return _store; }

private:
ss::future<> create_internal_topic();
configuration _config;
ss::semaphore _mem_sem;
ss::gate _gate;
Expand Down
6 changes: 0 additions & 6 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ def test_post_subjects_subject_versions(self):
Verify posting a schema
"""

self.logger.debug("Creating _schemas topic")
self._create_topics(names=["_schemas"],
partitions=1,
replicas=1,
cleanup_policy=TopicSpec.CLEANUP_COMPACT)

topic = create_topic_names(1)[0]

self.logger.debug(f"Register a schema against a subject")
Expand Down

0 comments on commit 19f28b9

Please sign in to comment.