Skip to content

Commit

Permalink
schema_registry/service: Populate store on startup
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@vectorized.io>
  • Loading branch information
BenPope committed May 28, 2021
1 parent 19f28b9 commit cdc03e7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "pandaproxy/schema_registry/service.h"

#include "kafka/protocol/create_topics.h"
#include "kafka/protocol/list_offsets.h"
#include "kafka/server/handlers/topics/types.h"
#include "model/fundamental.h"
#include "model/timeout_clock.h"
Expand All @@ -18,6 +19,8 @@
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/configuration.h"
#include "pandaproxy/schema_registry/handlers.h"
#include "pandaproxy/schema_registry/storage.h"
#include "pandaproxy/schema_registry/util.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
Expand Down Expand Up @@ -109,6 +112,32 @@ ss::future<> service::create_internal_topic() {
// TODO(Ben): Validate the _schemas topic
}

ss::future<> service::fetch_internal_topic() {
constexpr static auto offset_req = []() {
return kafka::list_offsets_request{
.data = {.topics{
{{.name{schemas_tp.topic},
.partitions{
{{.partition_index{schemas_tp.partition},
.max_num_offsets = 128}}}}}}}};
};
auto offset_res = co_await _client.local().dispatch(offset_req);
auto max_offset = offset_res.data.topics[0].partitions[0].offset;
vlog(plog.debug, "Schema registry: _schemas max_offset: {}", max_offset);

client_fetcher reader{
_client.local(), schemas_tp, model::offset{0}, max_offset};
consume_to_store consumer{_store};

co_await ss::do_until(
[this, &reader]() {
return _gate.is_closed() || reader.is_end_of_stream();
},
[&consumer, &reader]() {
return std::move(reader).consume(consumer, model::no_timeout);
});
}

service::service(
const YAML::Node& config,
ss::smp_service_group smp_sg,
Expand All @@ -135,6 +164,7 @@ ss::future<> service::start() {
(void)ss::with_gate(_gate, [this]() -> ss::future<> {
try {
co_await create_internal_topic();
co_await fetch_internal_topic();
} catch (...) {
vlog(
plog.error,
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class service {

private:
ss::future<> create_internal_topic();
ss::future<> fetch_internal_topic();
configuration _config;
ss::semaphore _mem_sem;
ss::gate _gate;
Expand Down

0 comments on commit cdc03e7

Please sign in to comment.