Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasm: run wasmtime on the reactor threads #14046

Merged
merged 16 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fetch_dep(hdrhistogram
FetchContent_Declare(
wasmtime
GIT_REPOSITORY https://github.com/bytecodealliance/wasmtime
GIT_TAG ec07c89b9b57a2e4d377b3c730f1b8cf8e31bd3a
GIT_TAG b77b407b25c3c158be209b8df6d9054ac6e43203
# Remove the features we don't use.
PATCH_COMMAND
sed -i "s/default \\\\= \\\\['jitdump', 'wat', 'wasi', 'cache', 'parallel\\\\-compilation', 'async'\\\\]/default = ['async']/g" crates/c-api/Cargo.toml
Expand Down
19 changes: 19 additions & 0 deletions src/v/wasm/tests/wasm_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,25 @@ class fake_schema_registry : public wasm::schema_registry {
private:
std::vector<ppsr::subject_schema> _schemas;
};

void WasmTestFixture::SetUpTestSuite() {
// This is a bit of a hack to set the signal set for the ss::async thread
// that runs the tests.
// In debug mode (only) ss::thread uses swapcontext to switch stacks,
// and swapcontext *also* keeps track of the signalset and swaps those.
// so depending on the running context we can get the signals that wasmtime
// needs can be blocked, as these signals are blocked by default and we only
// unblock the signals on the reactor threads in wasm::runtime::start
//
// In release mode, longjmp and setjmp is used instead of swapcontext, so
// there is nothing that resets the signalset.
auto mask = ss::make_empty_sigset_mask();
sigaddset(&mask, SIGSEGV);
sigaddset(&mask, SIGILL);
sigaddset(&mask, SIGFPE);
ss::throw_pthread_error(::pthread_sigmask(SIG_UNBLOCK, &mask, nullptr));
}

void WasmTestFixture::SetUp() {
_probe = std::make_unique<wasm::transform_probe>();
auto sr = std::make_unique<fake_schema_registry>();
Expand Down
2 changes: 2 additions & 0 deletions src/v/wasm/tests/wasm_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class WasmTestFixture : public ::testing::Test {
public:
static constexpr model::timestamp NOW = model::timestamp(1687201340524ULL);

static void SetUpTestSuite();

void SetUp() override;
void TearDown() override;

Expand Down
8 changes: 4 additions & 4 deletions src/v/wasm/transform_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ constexpr int32_t NO_ACTIVE_TRANSFORM = -1;
constexpr int32_t INVALID_HANDLE = -2;
constexpr int32_t INVALID_BUFFER = -3;

model::record_batch transform_module::for_each_record(
ss::future<model::record_batch> transform_module::for_each_record_async(
const model::record_batch* input,
ss::noncopyable_function<void(wasm_call_params)> func) {
ss::noncopyable_function<ss::future<>(wasm_call_params)> func) {
vassert(
input->header().attrs.compression() == model::compression::none,
"wasm transforms expect uncompressed batches");
Expand Down Expand Up @@ -69,7 +69,7 @@ model::record_batch transform_module::for_each_record(
auto current_record_timestamp = input->header().first_timestamp()
+ record_position.timestamp_delta;
try {
func({
co_await func({
.batch_handle = bh,
.record_handle = record_handle(
int32_t(record_position.start_index)),
Expand All @@ -95,7 +95,7 @@ model::record_batch transform_module::for_each_record(
batch.header().crc = model::crc_record_batch(batch);
batch.header().header_crc = model::internal_header_only_crc(batch.header());
_call_ctx = std::nullopt;
return batch;
co_return batch;
}

// NOLINTBEGIN(bugprone-easily-swappable-parameters)
Expand Down
4 changes: 2 additions & 2 deletions src/v/wasm/transform_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class transform_module {
* the redpanda_transform_on_record_written function that the guest should
* expose.
*/
model::record_batch for_each_record(
ss::future<model::record_batch> for_each_record_async(
const model::record_batch*,
ss::noncopyable_function<void(wasm_call_params)>);
ss::noncopyable_function<ss::future<>(wasm_call_params)>);

// Start ABI exports

Expand Down
Loading
Loading