From cdc03e7eed2f3fba74a725a330c3949e9c041dd9 Mon Sep 17 00:00:00 2001 From: Ben Pope Date: Fri, 28 May 2021 17:07:52 +0100 Subject: [PATCH] schema_registry/service: Populate store on startup Signed-off-by: Ben Pope --- src/v/pandaproxy/schema_registry/service.cc | 30 +++++++++++++++++++++ src/v/pandaproxy/schema_registry/service.h | 1 + 2 files changed, 31 insertions(+) diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 8abe923c50b7..412f4c4157f0 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -10,6 +10,7 @@ #include "pandaproxy/schema_registry/service.h" #include "kafka/protocol/create_topics.h" +#include "kafka/protocol/list_offsets.h" #include "kafka/server/handlers/topics/types.h" #include "model/fundamental.h" #include "model/timeout_clock.h" @@ -18,6 +19,8 @@ #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/configuration.h" #include "pandaproxy/schema_registry/handlers.h" +#include "pandaproxy/schema_registry/storage.h" +#include "pandaproxy/schema_registry/util.h" #include #include @@ -109,6 +112,32 @@ ss::future<> service::create_internal_topic() { // TODO(Ben): Validate the _schemas topic } +ss::future<> service::fetch_internal_topic() { + constexpr static auto offset_req = []() { + return kafka::list_offsets_request{ + .data = {.topics{ + {{.name{schemas_tp.topic}, + .partitions{ + {{.partition_index{schemas_tp.partition}, + .max_num_offsets = 128}}}}}}}}; + }; + auto offset_res = co_await _client.local().dispatch(offset_req); + auto max_offset = offset_res.data.topics[0].partitions[0].offset; + vlog(plog.debug, "Schema registry: _schemas max_offset: {}", max_offset); + + client_fetcher reader{ + _client.local(), schemas_tp, model::offset{0}, max_offset}; + consume_to_store consumer{_store}; + + co_await ss::do_until( + [this, &reader]() { + return _gate.is_closed() || reader.is_end_of_stream(); + }, + [&consumer, &reader]() { + return std::move(reader).consume(consumer, model::no_timeout); + }); +} + service::service( const YAML::Node& config, ss::smp_service_group smp_sg, @@ -135,6 +164,7 @@ ss::future<> service::start() { (void)ss::with_gate(_gate, [this]() -> ss::future<> { try { co_await create_internal_topic(); + co_await fetch_internal_topic(); } catch (...) { vlog( plog.error, diff --git a/src/v/pandaproxy/schema_registry/service.h b/src/v/pandaproxy/schema_registry/service.h index bb691d747c91..cbf0c668b937 100644 --- a/src/v/pandaproxy/schema_registry/service.h +++ b/src/v/pandaproxy/schema_registry/service.h @@ -44,6 +44,7 @@ class service { private: ss::future<> create_internal_topic(); + ss::future<> fetch_internal_topic(); configuration _config; ss::semaphore _mem_sem; ss::gate _gate;