diff --git a/include/fluent-bit/flb_lua.h b/include/fluent-bit/flb_lua.h index c05070cca42..c87f0050628 100644 --- a/include/fluent-bit/flb_lua.h +++ b/include/fluent-bit/flb_lua.h @@ -100,5 +100,5 @@ void flb_lua_tompack(lua_State *l, struct flb_lua_l2c_config *l2cc); void flb_lua_dump_stack(FILE *out, lua_State *l); int flb_lua_enable_flb_null(lua_State *l); - +void flb_lua_bulk_process(lua_State *l, struct flb_time *t, msgpack_object *o, int table_index); #endif diff --git a/plugins/filter_lua/lua.c b/plugins/filter_lua/lua.c index 5186955b8ae..5ddef3deaa7 100644 --- a/plugins/filter_lua/lua.c +++ b/plugins/filter_lua/lua.c @@ -428,17 +428,14 @@ static int pack_result (struct lua_filter *ctx, struct flb_time *ts, msgpack_unpacked_init(&result); ret = msgpack_unpack_next(&result, data, bytes, &off); - if (ret != MSGPACK_UNPACK_SUCCESS) { msgpack_unpacked_destroy(&result); return FLB_FALSE; } - if (result.data.type == MSGPACK_OBJECT_MAP) { ret = pack_record(ctx, log_encoder, ts, metadata, &result.data); - msgpack_unpacked_destroy(&result); if (ret != FLB_EVENT_ENCODER_SUCCESS) { @@ -491,6 +488,8 @@ static int cb_lua_filter(const void *data, size_t bytes, struct flb_time t_orig; struct flb_time t; struct lua_filter *ctx = filter_context; + int i; + int array_size; /* Lua return values */ int l_code; double l_timestamp; @@ -500,6 +499,7 @@ static int cb_lua_filter(const void *data, size_t bytes, struct flb_log_event_decoder log_decoder; struct flb_log_event log_event; + (void) f_ins; (void) i_ins; (void) config; @@ -525,140 +525,305 @@ static int cb_lua_filter(const void *data, size_t bytes, return FLB_FILTER_NOTOUCH; } - while ((ret = flb_log_event_decoder_next( - &log_decoder, - &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_sbuffer_init(&data_sbuf); - msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); + if (ctx->chunk_mode) { + if (ctx->time_as_table != FLB_TRUE) { + flb_plg_error(ctx->ins, + "time_as_table needed in chunk_mode"); - /* Get timestamp */ - flb_time_copy(&t, &log_event.timestamp); - flb_time_copy(&t_orig, &log_event.timestamp); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); - /* Prepare function call, pass 3 arguments, expect 3 return values */ + return FLB_FILTER_NOTOUCH; + } lua_getglobal(ctx->lua->state, ctx->call); - lua_pushstring(ctx->lua->state, tag); - /* Timestamp */ - if (ctx->time_as_table == FLB_TRUE) { - flb_lua_pushtimetable(ctx->lua->state, &t); - } - else { - ts = flb_time_to_double(&t); - lua_pushnumber(ctx->lua->state, ts); - } + /* Create Lua table outside the loop */ + lua_createtable(ctx->lua->state, 0, 0); - flb_lua_pushmsgpack(ctx->lua->state, log_event.body); - if (ctx->protected_mode) { - ret = lua_pcall(ctx->lua->state, 3, 3, 0); - if (ret != 0) { - flb_plg_error(ctx->ins, "error code %d: %s", - ret, lua_tostring(ctx->lua->state, -1)); - lua_pop(ctx->lua->state, 1); + /* Push all timestamp + record tuples into the table */ + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { - msgpack_sbuffer_destroy(&data_sbuf); - flb_log_event_decoder_destroy(&log_decoder); - flb_log_event_encoder_destroy(&log_encoder); + /* Copy the current timestamp */ + flb_time_copy(&t, &log_event.timestamp); - return FLB_FILTER_NOTOUCH; + /* Add record to be send to lua */ + flb_lua_bulk_process(ctx->lua->state, &t, log_event.body, -2); } - } - else { - lua_call(ctx->lua->state, 3, 3); - } + /* Push the tag to the lua table. Same for this whole chunk */ + lua_pushstring(ctx->lua->state, tag); + lua_insert(ctx->lua->state, -2); - /* Initialize Return values */ - l_code = 0; - l_timestamp = ts; + if (ctx->protected_mode) { + /* Prepare function call, pass tag + records, expect records */ + ret = lua_pcall(ctx->lua->state, 2, 1, 0); + if (ret != 0) { + flb_plg_error(ctx->ins, "error code %d: %s", + ret, lua_tostring(ctx->lua->state, -1)); + lua_pop(ctx->lua->state, 1); - flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc); - lua_pop(ctx->lua->state, 1); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_NOTOUCH; + } + } + else { + /* Prepare function call, pass tag + records, expect records */ + lua_call(ctx->lua->state, 2, 1); + } + + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + array_size = lua_objlen(ctx->lua->state, -1); + for (i = 1; i <= array_size; i++) { + /* Retrieve each record entry from records table */ + lua_rawgeti(ctx->lua->state, -1, i); + + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + /* Retrieve timestamp */ + lua_getfield(ctx->lua->state, -1, "timestamp"); + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + /* Retrieve seconds */ + lua_getfield(ctx->lua->state, -1, "sec"); + t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); + + /* Retrieve nsec */ + lua_getfield(ctx->lua->state, -1, "nsec"); + t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 2); + + flb_plg_debug(ctx->ins, + "Timestamp: sec=%ld, nsec=%ld", + (long)t.tm.tv_sec, (long)t.tm.tv_nsec); + } else { + flb_plg_error(ctx->ins, + "invalid field type returned. " + "Chunk will not be processed"); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + + /* Retrieve record from table*/ + lua_getfield(ctx->lua->state, -1, "record"); + + /* Check the type of 'record' field */ + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + msgpack_sbuffer_init(&data_sbuf); + msgpack_packer_init(&data_pck, + &data_sbuf, + msgpack_sbuffer_write); + /* Create msgpack object for every record */ + flb_plg_debug(ctx->ins, + "Creating msgpack object for record"); + flb_lua_tomsgpack(ctx->lua->state, + &data_pck, + 0, + &ctx->l2cc); + ret = flb_log_event_encoder_begin_record( + &log_encoder); + if ( ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to begin record. " + "Chunk will not be processed"); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + ret = flb_log_event_encoder_set_timestamp( + &log_encoder, &t); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to set timestamp. " + "Chunk will not be processed"); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + ret = flb_log_event_encoder_set_body_from_raw_msgpack( + &log_encoder, + data_sbuf.data, + data_sbuf.size); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to set body. " + "Chunk will not be processed"); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + ret = flb_log_event_encoder_commit_record( + &log_encoder); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Failed to emit record. " + "Chunk will not be processed"); + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + return FLB_FILTER_NOTOUCH; + } + + msgpack_sbuffer_destroy(&data_sbuf); + flb_plg_debug(ctx->ins, + "Msgpack object created for record."); + } else { + flb_plg_error(ctx->ins, + "invalid record field type returned"); + } + + lua_pop(ctx->lua->state, 1); + } else { + flb_plg_error(ctx->ins, + "invalid lua record entry type returned"); + } - /* Lua table */ - if (ctx->time_as_table == FLB_TRUE) { - if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { - /* Retrieve seconds */ - lua_getfield(ctx->lua->state, -1, "sec"); - t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1); lua_pop(ctx->lua->state, 1); + } + } else { + flb_plg_error(ctx->ins, "invalid lua table entry returned"); + } + lua_pop(ctx->lua->state, 1); + } + else { + while ((ret = flb_log_event_decoder_next( + &log_decoder, + &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + msgpack_sbuffer_init(&data_sbuf); + msgpack_packer_init(&data_pck, &data_sbuf, msgpack_sbuffer_write); + + /* Get timestamp */ + flb_time_copy(&t, &log_event.timestamp); + flb_time_copy(&t_orig, &log_event.timestamp); + + /* Prepare function call, pass 3 arguments, expect 3 return values */ + lua_getglobal(ctx->lua->state, ctx->call); + lua_pushstring(ctx->lua->state, tag); + + /* Timestamp */ + if (ctx->time_as_table == FLB_TRUE) { + flb_lua_pushtimetable(ctx->lua->state, &t); + } + else { + ts = flb_time_to_double(&t); + lua_pushnumber(ctx->lua->state, ts); + } - /* Retrieve nanoseconds */ - lua_getfield(ctx->lua->state, -1, "nsec"); - t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1); - lua_pop(ctx->lua->state, 2); + flb_lua_pushmsgpack(ctx->lua->state, log_event.body); + if (ctx->protected_mode) { + ret = lua_pcall(ctx->lua->state, 3, 3, 0); + if (ret != 0) { + flb_plg_error(ctx->ins, "error code %d: %s", + ret, lua_tostring(ctx->lua->state, -1)); + lua_pop(ctx->lua->state, 1); + + msgpack_sbuffer_destroy(&data_sbuf); + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_NOTOUCH; + } } else { - flb_plg_error(ctx->ins, "invalid lua timestamp type returned"); - t = t_orig; + lua_call(ctx->lua->state, 3, 3); } - } - else { - l_timestamp = (double) lua_tonumber(ctx->lua->state, -1); - lua_pop(ctx->lua->state, 1); - } - l_code = (int) lua_tointeger(ctx->lua->state, -1); - lua_pop(ctx->lua->state, 1); + /* Initialize Return values */ + l_code = 0; + l_timestamp = ts; - if (l_code == -1) { /* Skip record */ - msgpack_sbuffer_destroy(&data_sbuf); - continue; - } - else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */ - if (l_code == 1) { - if (ctx->time_as_table == FLB_FALSE) { - flb_time_from_double(&t, l_timestamp); + flb_lua_tomsgpack(ctx->lua->state, &data_pck, 0, &ctx->l2cc); + lua_pop(ctx->lua->state, 1); + + /* Lua table */ + if (ctx->time_as_table == FLB_TRUE) { + if (lua_type(ctx->lua->state, -1) == LUA_TTABLE) { + /* Retrieve seconds */ + lua_getfield(ctx->lua->state, -1, "sec"); + t.tm.tv_sec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); + + /* Retrieve nanoseconds */ + lua_getfield(ctx->lua->state, -1, "nsec"); + t.tm.tv_nsec = lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 2); + } + else { + flb_plg_error(ctx->ins, "invalid lua timestamp type returned"); + t = t_orig; } } - else if (l_code == 2) { - /* Keep the timestamp */ - t = t_orig; + else { + l_timestamp = (double) lua_tonumber(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); } - ret = pack_result(ctx, &t, log_event.metadata, &log_encoder, - data_sbuf.data, data_sbuf.size); + l_code = (int) lua_tointeger(ctx->lua->state, -1); + lua_pop(ctx->lua->state, 1); - if (ret == FLB_FALSE) { - flb_plg_error(ctx->ins, "invalid table returned at %s(), %s", - ctx->call, ctx->script); + if (l_code == -1) { /* Skip record */ msgpack_sbuffer_destroy(&data_sbuf); + continue; + } + else if (l_code == 1 || l_code == 2) { /* Modified, pack new data */ + if (l_code == 1) { + if (ctx->time_as_table == FLB_FALSE) { + flb_time_from_double(&t, l_timestamp); + } + } + else if (l_code == 2) { + /* Keep the timestamp */ + t = t_orig; + } - flb_log_event_decoder_destroy(&log_decoder); - flb_log_event_encoder_destroy(&log_encoder); + ret = pack_result(ctx, &t, log_event.metadata, &log_encoder, + data_sbuf.data, data_sbuf.size); - return FLB_FILTER_NOTOUCH; - } - } - else { /* Unexpected return code, keep original content */ - /* Code 0 means Keep record, so we don't emit the warning */ - if (l_code != 0) { - flb_plg_error(ctx->ins, - "unexpected Lua script return code %i, " - "original record will be kept." , l_code); + if (ret == FLB_FALSE) { + flb_plg_error(ctx->ins, "invalid table returned at %s(), %s", + ctx->call, ctx->script); + msgpack_sbuffer_destroy(&data_sbuf); + + flb_log_event_decoder_destroy(&log_decoder); + flb_log_event_encoder_destroy(&log_encoder); + + return FLB_FILTER_NOTOUCH; + } } + else { /* Unexpected return code, keep original content */ + /* Code 0 means Keep record, so we don't emit the warning */ + if (l_code != 0) { + flb_plg_error(ctx->ins, + "unexpected Lua script return code %i, " + "original record will be kept." , l_code); + } - ret = flb_log_event_encoder_emit_raw_record( - &log_encoder, - log_decoder.record_base, - log_decoder.record_length); + ret = flb_log_event_encoder_emit_raw_record( + &log_encoder, + log_decoder.record_base, + log_decoder.record_length); - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, - "Log event encoder error : %d", ret); + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, + "Log event encoder error : %d", ret); + } } - } - msgpack_sbuffer_destroy(&data_sbuf); + msgpack_sbuffer_destroy(&data_sbuf); + } } - if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) { ret = FLB_EVENT_ENCODER_SUCCESS; } - if (ret == FLB_EVENT_ENCODER_SUCCESS) { *out_buf = log_encoder.output_buffer; *out_bytes = log_encoder.output_length; - ret = FLB_FILTER_MODIFIED; flb_log_event_encoder_claim_internal_buffer_ownership(&log_encoder); @@ -669,7 +834,6 @@ static int cb_lua_filter(const void *data, size_t bytes, ret = FLB_FILTER_NOTOUCH; } - flb_log_event_decoder_destroy(&log_decoder); flb_log_event_encoder_destroy(&log_encoder); @@ -722,6 +886,12 @@ static struct flb_config_map config_map[] = { "If enabled, Lua script will be executed in protected mode. " "It prevents to crash when invalid Lua script is executed." }, + { + FLB_CONFIG_MAP_BOOL, "chunk_mode", "false", + 0, FLB_TRUE, offsetof(struct lua_filter, chunk_mode), + "If enabled, a whole chunk will be sent to Lua script as table. " + "It may be used for e.g. parallel execution inside Lua script." + }, { FLB_CONFIG_MAP_BOOL, "time_as_table", "false", 0, FLB_TRUE, offsetof(struct lua_filter, time_as_table), diff --git a/plugins/filter_lua/lua_config.h b/plugins/filter_lua/lua_config.h index e67beb38cb9..97de1c825b1 100644 --- a/plugins/filter_lua/lua_config.h +++ b/plugins/filter_lua/lua_config.h @@ -34,6 +34,7 @@ struct lua_filter { flb_sds_t call; /* function name */ flb_sds_t buffer; /* json dec buffer */ int protected_mode; /* exec lua function in protected mode */ + int chunk_mode; /* pass whole chunk to lua script */ int time_as_table; /* timestamp as a Lua table */ int enable_flb_null; /* Use flb_null in Lua */ struct flb_lua_l2c_config l2cc; /* lua -> C config */ diff --git a/src/flb_lua.c b/src/flb_lua.c index 4eca5b0ca8c..fdb6313d24b 100644 --- a/src/flb_lua.c +++ b/src/flb_lua.c @@ -173,7 +173,6 @@ void flb_lua_pushmsgpack(lua_State *l, msgpack_object *o) struct flb_lua_metadata meta; lua_checkstack(l, 3); - switch(o->type) { case MSGPACK_OBJECT_NIL: lua_getglobal(l, FLB_LUA_VAR_FLB_NULL); @@ -839,3 +838,23 @@ void flb_lua_dump_stack(FILE *out, lua_State *l) } fprintf(out, "======\n"); } + +void flb_lua_bulk_process(lua_State *l, struct flb_time *t, msgpack_object *o, int table_index) { + int i; + struct flb_lua_metadata meta; + + lua_checkstack(l, 3); + + lua_createtable(l, 0, 2); + + lua_pushstring(l, "timestamp"); + flb_lua_pushtimetable(l, t); + lua_rawset(l, -3); + + lua_pushstring(l, "record"); + flb_lua_pushmsgpack(l, o); + lua_rawset(l, -3); + + /* Now, append this table to the global Lua table at the given index */ + lua_rawseti(l, table_index, lua_objlen(l, table_index) + 1); +} diff --git a/tests/runtime/filter_lua.c b/tests/runtime/filter_lua.c index 1e3111266d0..51690d19bc7 100644 --- a/tests/runtime/filter_lua.c +++ b/tests/runtime/filter_lua.c @@ -968,6 +968,313 @@ void flb_test_invalid_metatable(void) flb_destroy(ctx); } +void flb_test_chunk_processing(void){ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + flb_sds_t outbuf = flb_sds_create(""); + char *input = "[0, {\"key\":\"value\"}]"; + struct flb_lib_out_cb cb_data; + + const char *expected = "[0.000000,{\"key\":\"mytest\"}]"; + + char *script_body = "" + "function lua_main(tag, records)\n" + " local results = {}\n" + " for i, log_event in ipairs(records) do\n" + " local ts = log_event.timestamp\n" + " local modified_record = log_event.record\n" + " modified_record[\"key\"] = \"mytest\"\n" + " local result = {timestamp = ts, record = modified_record}" + " table.insert(results, result)\n" + " end\n" + " return results\n" + "end\n"; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = callback_cat; + cb_data.data = &outbuf; + + ret = create_script(script_body, strlen(script_body)); + TEST_CHECK(ret == 0); + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "lua", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "call", "lua_main", + "script", TMP_LUA_PATH, + "chunk_mode", "on", + "time_as_table", "on", + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output */ + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + flb_lib_push(ctx, in_ffd, input, strlen(input)); + wait_with_timeout(2000, &output); + if (!TEST_CHECK(!strcmp(outbuf, expected))) { + TEST_MSG("expected:\n%s\ngot:\n%s\n", expected, outbuf); + } + + /* clean up */ + flb_lib_free(output); + delete_script(); + + flb_stop(ctx); + flb_destroy(ctx); + flb_sds_destroy(outbuf); +} + +void flb_test_empty_array_chunk(void) +{ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + flb_sds_t outbuf = flb_sds_create(""); + char *input = "[0, {\"key\":[]}]"; + struct flb_lib_out_cb cb_data; + + const char *expected = ""; + + char *script_body = "" + "function lua_main(tag, records)\n" + " local results = {}\n" + " return results\n" + "end\n"; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = callback_cat; + cb_data.data = &outbuf; + + ret = create_script(script_body, strlen(script_body)); + TEST_CHECK(ret == 0); + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "lua", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "call", "lua_main", + "script", TMP_LUA_PATH, + "chunk_mode", "on", + "time_as_table", "on", + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output */ + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + flb_lib_push(ctx, in_ffd, input, strlen(input)); + wait_with_timeout(2000, &output); + if (!TEST_CHECK(!strcmp(outbuf, expected))) { + TEST_MSG("expected:\n%s\ngot:\n%s\n", expected, outbuf); + } + + /* clean up */ + flb_lib_free(output); + delete_script(); + + flb_stop(ctx); + flb_destroy(ctx); + flb_sds_destroy(outbuf); +} + +void flb_test_empty_record_chunk(void){ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + flb_sds_t outbuf = flb_sds_create(""); + char *input = "[0, {\"key\":[]}]"; + struct flb_lib_out_cb cb_data; + + /* This test should return with FLB_FILTER_NOTOUCH */ + const char *expected = "[0.000000,{\"key\":[]}]"; + + char *script_body = "" + "function lua_main(tag, records)\n" + " local results = {}\n" + " local timestamp = 5\n" + " local record = nil\n" + " table.insert(results, {timestamp, record})\n" + " return results\n" + "end\n"; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = callback_cat; + cb_data.data = &outbuf; + + ret = create_script(script_body, strlen(script_body)); + TEST_CHECK(ret == 0); + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "lua", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "call", "lua_main", + "script", TMP_LUA_PATH, + "chunk_mode", "on", + "time_as_table", "on", + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output */ + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + flb_lib_push(ctx, in_ffd, input, strlen(input)); + wait_with_timeout(2000, &output); + if (!TEST_CHECK(!strcmp(outbuf, expected))) { + TEST_MSG("expected:\n%s\ngot:\n%s\n", expected, outbuf); + } + + /* clean up */ + flb_lib_free(output); + delete_script(); + + flb_stop(ctx); + flb_destroy(ctx); + flb_sds_destroy(outbuf); +} + +void flb_test_empty_timestamp_chunk(void){ + int ret; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int filter_ffd; + flb_sds_t outbuf = flb_sds_create(""); + char *input = "[0, {\"key\":[]}]"; + struct flb_lib_out_cb cb_data; + + /* This test should return with FLB_FILTER_NOTOUCH */ + const char *expected = "[0.000000,{\"key\":[]}]"; + + char *script_body = "" + "function lua_main(tag, records)\n" + " local results = {}\n" + " for i, log_event in ipairs(records) do\n" + " local timestamp = nil\n" + " local record = log_event.record\n" + " table.insert(results, {timestamp, record})\n" + " end\n" + " return results\n" + "end\n"; + + clear_output_num(); + + /* Create context, flush every second (some checks omitted here) */ + ctx = flb_create(); + flb_service_set(ctx, "flush", FLUSH_INTERVAL, "grace", "1", NULL); + + /* Prepare output callback context*/ + cb_data.cb = callback_cat; + cb_data.data = &outbuf; + + ret = create_script(script_body, strlen(script_body)); + TEST_CHECK(ret == 0); + /* Filter */ + filter_ffd = flb_filter(ctx, (char *) "lua", NULL); + TEST_CHECK(filter_ffd >= 0); + ret = flb_filter_set(ctx, filter_ffd, + "Match", "*", + "call", "lua_main", + "script", TMP_LUA_PATH, + "chunk_mode", "on", + "time_as_table", "on", + NULL); + + /* Input */ + in_ffd = flb_input(ctx, (char *) "lib", NULL); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + TEST_CHECK(in_ffd >= 0); + + /* Lib output */ + out_ffd = flb_output(ctx, (char *) "lib", (void *)&cb_data); + TEST_CHECK(out_ffd >= 0); + flb_output_set(ctx, out_ffd, + "match", "test", + "format", "json", + NULL); + + ret = flb_start(ctx); + TEST_CHECK(ret==0); + + flb_lib_push(ctx, in_ffd, input, strlen(input)); + wait_with_timeout(2000, &output); + if (!TEST_CHECK(!strcmp(outbuf, expected))) { + TEST_MSG("expected:\n%s\ngot:\n%s\n", expected, outbuf); + } + + /* clean up */ + flb_lib_free(output); + delete_script(); + + flb_stop(ctx); + flb_destroy(ctx); + flb_sds_destroy(outbuf); +} + + TEST_LIST = { {"hello_world", flb_test_helloworld}, {"append_tag", flb_test_append_tag}, @@ -980,5 +1287,9 @@ TEST_LIST = { {"split_record", flb_test_split_record}, {"empty_array", flb_test_empty_array}, {"invalid_metatable", flb_test_invalid_metatable}, + {"chunk_processing", flb_test_chunk_processing}, + {"chunk_processing_empty_array", flb_test_empty_array_chunk}, + {"chunk_processing_empty_record", flb_test_empty_record_chunk}, + {"chunk_processing_empty_timestamp", flb_test_empty_timestamp_chunk}, {NULL, NULL} };