Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@
# Not all plugins are supported on Windows yet. This file tweaks
# the build flags so that we can compile fluent-bit on it.

set(FLB_ARCH "")

if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(ARM64|AARCH64|arm64)$")
set(FLB_ARCH "arm64")

elseif(CMAKE_SIZEOF_VOID_P EQUAL 8)
set(FLB_ARCH "x64")
elseif(CMAKE_SIZEOF_VOID_P EQUAL 4)
set(FLB_ARCH "x86")
endif()

if(NOT FLB_ARCH)
message(WARNING "Unknown architecture; falling back to x64 heuristics")
set(FLB_ARCH "x64")
endif()

message(STATUS "Detected target architecture: ${FLB_ARCH}")

if(FLB_WINDOWS_DEFAULTS)
message(STATUS "Overriding setttings with windows-setup.cmake")
set(FLB_REGEX Yes)
Expand All @@ -17,7 +35,13 @@ if(FLB_WINDOWS_DEFAULTS)
if (NOT FLB_LIBYAML_DIR)
set(FLB_CONFIG_YAML No)
endif ()
set(FLB_WASM No)
if(FLB_ARCH STREQUAL "arm64")
set(FLB_WASM No)
elseif(FLB_ARCH STREQUAL "x64")
set(FLB_WASM Yes)
elseif(FLB_ARCH STREQUAL "x86")
set(FLB_WASM Yes)
endif()
set(FLB_WAMRC No)

# INPUT plugins
Expand Down Expand Up @@ -107,7 +131,13 @@ if(FLB_WINDOWS_DEFAULTS)
set(FLB_FILTER_GEOIP2 Yes)
set(FLB_FILTER_AWS Yes)
set(FLB_FILTER_ECS Yes)
set(FLB_FILTER_WASM No)
if(FLB_ARCH STREQUAL "arm64")
set(FLB_FILTER_WASM No)
elseif(FLB_ARCH STREQUAL "x64")
set(FLB_FILTER_WASM Yes)
elseif(FLB_ARCH STREQUAL "x86")
set(FLB_FILTER_WASM Yes)
endif()
endif()

# Search bison and flex executables
Expand Down
79 changes: 52 additions & 27 deletions plugins/filter_wasm/filter_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ static int cb_wasm_filter(const void *data, size_t bytes,
char *json_buf = NULL;
size_t json_size;
int root_type;
struct flb_wasm *wasm = NULL;
/* Get the persistent WASM instance from the filter context. */
struct flb_filter_wasm *ctx = filter_context;
struct flb_wasm *wasm = ctx->wasm;
size_t buf_size;

struct flb_filter_wasm *ctx = filter_context;
struct flb_log_event_encoder log_encoder;
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
Expand All @@ -67,6 +68,12 @@ static int cb_wasm_filter(const void *data, size_t bytes,
(void) i_ins;
(void) config;

/* Safeguard in case initialization failed. */
if (!wasm) {
flb_plg_error(ctx->ins, "WASM instance is not available, skipping.");
return FLB_FILTER_NOTOUCH;
}

ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);

if (ret != FLB_EVENT_DECODER_SUCCESS) {
Expand All @@ -88,13 +95,6 @@ static int cb_wasm_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;
}

wasm = flb_wasm_instantiate(config, ctx->wasm_path, ctx->accessible_dir_list,
ctx->wasm_conf);
if (wasm == NULL) {
flb_plg_debug(ctx->ins, "instantiate wasm [%s] failed", ctx->wasm_path);
goto on_error;
}

while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
Expand All @@ -117,8 +117,8 @@ static int cb_wasm_filter(const void *data, size_t bytes,
}
else {
flb_plg_error(ctx->ins, "encode as JSON from msgpack is failed");

goto on_error;
/* Go to on_error without destroying the persistent wasm instance */
goto on_error_without_wasm_destroy;
}
break;
case FLB_FILTER_WASM_FMT_MSGPACK:
Expand All @@ -127,8 +127,8 @@ static int cb_wasm_filter(const void *data, size_t bytes,
(void **)&buf, &buf_size);
if (ret < 0) {
flb_plg_error(ctx->ins, "format msgpack is failed");

goto on_error;
/* Go to on_error without destroying the persistent wasm instance */
goto on_error_without_wasm_destroy;
}

/* Execute WASM program */
Expand Down Expand Up @@ -227,9 +227,6 @@ static int cb_wasm_filter(const void *data, size_t bytes,
}
}

/* Teardown WASM context */
flb_wasm_destroy(wasm);

*out_buf = log_encoder.output_buffer;
*out_bytes = log_encoder.output_length;

Expand All @@ -240,14 +237,10 @@ static int cb_wasm_filter(const void *data, size_t bytes,

return FLB_FILTER_MODIFIED;

on_error:
/* A new error handler that doesn't destroy the persistent wasm instance */
on_error_without_wasm_destroy:
flb_log_event_decoder_destroy(&log_decoder);
flb_log_event_encoder_destroy(&log_encoder);

if (wasm != NULL) {
flb_wasm_destroy(wasm);
}

return FLB_FILTER_NOTOUCH;
}

Expand Down Expand Up @@ -313,6 +306,7 @@ static int cb_wasm_pre_run(struct flb_filter_instance *f_ins,
/* Check accessibility for the wasm path */
ret = access(ctx->wasm_path, R_OK);
if (ret != 0) {
flb_plg_error(f_ins, "cannot access wasm program at %s", ctx->wasm_path);
goto pre_run_error;
}

Expand Down Expand Up @@ -341,6 +335,9 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins,
if (!ctx) {
return -1;
}
/* Initialize wasm pointer to NULL */
ctx->wasm = NULL;


/* Initialize exec config */
ret = filter_wasm_config_read(ctx, f_ins, config);
Expand Down Expand Up @@ -378,22 +375,50 @@ static int cb_wasm_init(struct flb_filter_instance *f_ins,
wasm_conf->stack_size = ctx->wasm_stack_size;
}

/* Set context */
/* Set context before instantiating */
flb_filter_set_context(f_ins, ctx);

/* Instantiate the WASM module once and store it in the context */
ctx->wasm = flb_wasm_instantiate(config, ctx->wasm_path,
ctx->accessible_dir_list,
ctx->wasm_conf);

if (ctx->wasm == NULL) {
flb_plg_error(ctx->ins, "failed to instantiate wasm program: %s",
ctx->wasm_path);
goto init_error;
}

return 0;

init_error:
delete_wasm_config(ctx);

if (ctx) {
if (ctx->wasm_conf) {
flb_wasm_config_destroy(ctx->wasm_conf);
ctx->wasm_conf = NULL;
}
delete_wasm_config(ctx);
}
flb_filter_set_context(f_ins, NULL);
return -1;
}

static int cb_wasm_exit(void *data, struct flb_config *config)
{
struct flb_filter_wasm *ctx = data;

flb_wasm_config_destroy(ctx->wasm_conf);
flb_wasm_destroy_all(config);
if (!ctx) {
return 0;
}

/* Destroy the single, persistent WASM instance */
if (ctx->wasm) {
flb_wasm_destroy(ctx->wasm);
}
/* Destroy the WASM configuration */
if (ctx->wasm_conf) {
flb_wasm_config_destroy(ctx->wasm_conf);
}
delete_wasm_config(ctx);
return 0;
}
Expand Down
Loading