Skip to content

Commit

Permalink
wasm: remove transform_module::for_each_record
Browse files Browse the repository at this point in the history
This was only used on an alien thread when wasm was executed in a
blocking context, but now we're using the async version so it can be
removed.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Oct 10, 2023
1 parent 914dda0 commit c9ff0db
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 68 deletions.
64 changes: 0 additions & 64 deletions src/v/wasm/transform_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,70 +34,6 @@ constexpr int32_t NO_ACTIVE_TRANSFORM = -1;
constexpr int32_t INVALID_HANDLE = -2;
constexpr int32_t INVALID_BUFFER = -3;

model::record_batch transform_module::for_each_record(
const model::record_batch* input,
ss::noncopyable_function<void(wasm_call_params)> func) {
vassert(
input->header().attrs.compression() == model::compression::none,
"wasm transforms expect uncompressed batches");

iobuf_const_parser parser(input->data());

auto bh = batch_handle(input->header().crc);

std::vector<record_position> record_positions;
record_positions.reserve(input->record_count());

while (parser.bytes_left() > 0) {
auto start_index = parser.bytes_consumed();
auto [size, amt] = parser.read_varlong();
parser.skip(sizeof(model::record_attributes::type));
auto [timestamp_delta, td] = parser.read_varlong();
parser.skip(size - sizeof(model::record_attributes::type) - td);
record_positions.push_back(
{.start_index = start_index,
.size = size_t(size + amt),
.timestamp_delta = int32_t(timestamp_delta)});
}

_call_ctx.emplace(transform_context{
.input = input,
});

for (const auto& record_position : record_positions) {
_call_ctx->current_record = record_position;
auto current_record_timestamp = input->header().first_timestamp()
+ record_position.timestamp_delta;
try {
func({
.batch_handle = bh,
.record_handle = record_handle(
int32_t(record_position.start_index)),
.record_size = int32_t(record_position.size),
.current_record_offset = int32_t(_call_ctx->output_record_count),
.current_record_timestamp = model::timestamp(
current_record_timestamp),
});
} catch (...) {
_call_ctx = std::nullopt;
std::rethrow_exception(std::current_exception());
}
}

model::record_batch::compressed_records records = std::move(
_call_ctx->output_records);
model::record_batch_header header = _call_ctx->input->header();
header.size_bytes = int32_t(
model::packed_record_batch_header_size + records.size_bytes());
header.record_count = _call_ctx->output_record_count;
model::record_batch batch(
header, std::move(records), model::record_batch::tag_ctor_ng{});
batch.header().crc = model::crc_record_batch(batch);
batch.header().header_crc = model::internal_header_only_crc(batch.header());
_call_ctx = std::nullopt;
return batch;
}

ss::future<model::record_batch> transform_module::for_each_record_async(
const model::record_batch* input,
ss::noncopyable_function<ss::future<>(wasm_call_params)> func) {
Expand Down
4 changes: 0 additions & 4 deletions src/v/wasm/transform_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ class transform_module {
* the redpanda_transform_on_record_written function that the guest should
* expose.
*/
model::record_batch for_each_record(
const model::record_batch*,
ss::noncopyable_function<void(wasm_call_params)>);

ss::future<model::record_batch> for_each_record_async(
const model::record_batch*,
ss::noncopyable_function<ss::future<>(wasm_call_params)>);
Expand Down

0 comments on commit c9ff0db

Please sign in to comment.