Skip to content

Commit

Permalink
pandaproxy/rest: Use iobuf_writer
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jul 16, 2024
1 parent 609db2e commit 0229940
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 17 deletions.
12 changes: 4 additions & 8 deletions src/v/pandaproxy/json/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#pragma once

#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
#include "json/iobuf_writer.h"
#include "json/reader.h"
#include "json/stream.h"
#include "json/writer.h"
#include "pandaproxy/json/rjson_util.h"
#include "utils/base64.h"
Expand Down Expand Up @@ -65,7 +64,7 @@ class rjson_serialize_impl<iobuf> {
: _fmt(fmt) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, iobuf buf) {
bool operator()(::json::iobuf_writer<Buffer>& w, iobuf buf) {
switch (_fmt) {
case serialization_format::none:
[[fallthrough]];
Expand All @@ -81,7 +80,7 @@ class rjson_serialize_impl<iobuf> {
}

template<typename Buffer>
bool encode_base64(::json::Writer<Buffer>& w, iobuf buf) {
bool encode_base64(::json::iobuf_writer<Buffer>& w, iobuf buf) {
if (buf.empty()) {
return w.Null();
}
Expand All @@ -94,11 +93,8 @@ class rjson_serialize_impl<iobuf> {
if (buf.empty()) {
return w.Null();
}
iobuf_parser p{std::move(buf)};
auto str = p.read_string(p.bytes_left());
static_assert(str.padding(), "StringStream requires null termination");
::json::chunked_input_stream ss{std::move(buf)};
::json::Reader reader;
::json::StringStream ss{str.c_str()};
return reader.Parse(ss, w);
};

Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/json/requests/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class rjson_serialize_impl<model::record> {
, _base_offset(base_offset) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, model::record record) {
bool operator()(::json::iobuf_writer<Buffer>& w, model::record record) {
auto offset = _base_offset() + record.offset_delta();

w.StartObject();
Expand Down Expand Up @@ -93,7 +93,8 @@ class rjson_serialize_impl<kafka::fetch_response> {
: _fmt(fmt) {}

template<typename Buffer>
bool operator()(::json::Writer<Buffer>& w, kafka::fetch_response&& res) {
bool
operator()(::json::iobuf_writer<Buffer>& w, kafka::fetch_response&& res) {
// Eager check for errors
for (auto& v : res) {
if (v.partition_response->error_code != kafka::error_code::none) {
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/json/requests/test/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_fetch_empty) {
auto fmt = ppj::serialization_format::binary_v2;

::json::StringBuffer str_buf;
::json::Writer<::json::StringBuffer> w(str_buf);
::json::iobuf_writer<::json::StringBuffer> w(str_buf);
ppj::rjson_serialize_fmt(fmt)(w, std::move(res));

auto expected = R"([])";
Expand All @@ -85,7 +85,7 @@ SEASTAR_THREAD_TEST_CASE(test_produce_fetch_one) {
auto fmt = ppj::serialization_format::binary_v2;

::json::StringBuffer str_buf;
::json::Writer<::json::StringBuffer> w(str_buf);
::json::iobuf_writer<::json::StringBuffer> w(str_buf);
ppj::rjson_serialize_fmt(fmt)(w, std::move(res));

auto expected
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/json/rjson_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct rjson_serialize_fmt_impl {
std::forward<T>(t));
}
template<typename Buffer, typename T>
bool operator()(::json::Writer<Buffer>& w, T&& t) {
bool operator()(::json::iobuf_writer<Buffer>& w, T&& t) {
return rjson_serialize_impl<std::remove_reference_t<T>>{fmt}(
w, std::forward<T>(t));
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/json/test/iobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
#include "pandaproxy/json/iobuf.h"

#include "bytes/iobuf_parser.h"
#include "json/iobuf_writer.h"
#include "json/stringbuffer.h"
#include "json/writer.h"
#include "pandaproxy/json/rjson_util.h"

#include <seastar/testing/thread_test_case.hh>
Expand Down Expand Up @@ -42,7 +42,7 @@ SEASTAR_THREAD_TEST_CASE(test_iobuf_serialize_binary) {
in_buf.append(input.data(), input.size());

::json::StringBuffer out_buf;
::json::Writer<::json::StringBuffer> w(out_buf);
::json::iobuf_writer<::json::StringBuffer> w(out_buf);
ppj::rjson_serialize_fmt(ppj::serialization_format::binary_v2)(
w, std::move(in_buf));
ss::sstring output{out_buf.GetString(), out_buf.GetSize()};
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/rest/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ get_topics_records(server::request_t rq, server::reply_t rp) {
.fetch_partition(std::move(tp), offset, max_bytes, timeout)
.then([res_fmt](kafka::fetch_response res) {
::json::chunked_buffer buf;
::json::Writer<::json::chunked_buffer> w(buf);
::json::iobuf_writer<::json::chunked_buffer> w(buf);

ppj::rjson_serialize_fmt(res_fmt)(w, std::move(res));
return buf;
Expand Down Expand Up @@ -391,7 +391,7 @@ consumer_fetch(server::request_t rq, server::reply_t rp) {
return client.consumer_fetch(group_id, name, timeout, max_bytes)
.then([res_fmt, rp{std::move(rp)}](auto res) mutable {
::json::chunked_buffer buf;
::json::Writer<::json::chunked_buffer> w(buf);
::json::iobuf_writer<::json::chunked_buffer> w(buf);

ppj::rjson_serialize_fmt(res_fmt)(w, std::move(res));

Expand Down

0 comments on commit 0229940

Please sign in to comment.