diff --git a/scripts/sync-upstream-patches.sh b/scripts/sync-upstream-patches.sh index 20f66f1d..7571c340 100755 --- a/scripts/sync-upstream-patches.sh +++ b/scripts/sync-upstream-patches.sh @@ -177,7 +177,43 @@ setup_upstream() { fi log_info "Fetching versions from upstream..." - git fetch "$UPSTREAM_REMOTE" "$FROM_VERSION" "$TO_VERSION" --no-tags 2>/dev/null + + # Detect if FROM_VERSION or TO_VERSION are commit SHAs (not refs) + # SHAs are 7-40 hex characters without 'v' prefix or 'refs/' prefix + local fetch_refs=() + + if [[ "$FROM_VERSION" =~ ^[0-9a-f]{7,40}$ ]]; then + log_info "FROM_VERSION appears to be a commit SHA, fetching tags and branches..." + else + fetch_refs+=("$FROM_VERSION") + fi + + if [[ "$TO_VERSION" =~ ^[0-9a-f]{7,40}$ ]]; then + log_info "TO_VERSION appears to be a commit SHA, fetching tags and branches..." + else + fetch_refs+=("$TO_VERSION") + fi + + # If either version is a SHA, fetch the relevant branch (e.g., 4.0) to get all commits + if [[ "$FROM_VERSION" =~ ^[0-9a-f]{7,40}$ ]] || [[ "$TO_VERSION" =~ ^[0-9a-f]{7,40}$ ]]; then + # Try to infer branch from FROM_VERSION if it's a tag (e.g., v4.0.11 -> 4.0) + local branch="" + if [[ "$FROM_VERSION" =~ ^v([0-9]+\.[0-9]+) ]]; then + branch="${BASH_REMATCH[1]}" + log_info "Fetching branch $branch to include commit SHAs..." + fetch_refs+=("$branch") + else + # Fallback: fetch all tags to find the commits + log_info "Fetching all tags to locate commits..." + git fetch "$UPSTREAM_REMOTE" --tags 2>/dev/null || true + fi + fi + + # Fetch the specific refs if any were identified + if [[ ${#fetch_refs[@]} -gt 0 ]]; then + git fetch "$UPSTREAM_REMOTE" "${fetch_refs[@]}" --no-tags 2>/dev/null || true + fi + log_success "Fetched upstream versions" } diff --git a/source/oss_version.txt b/source/oss_version.txt index 33fc096b..f789f9ed 100644 --- a/source/oss_version.txt +++ b/source/oss_version.txt @@ -1 +1 @@ -v4.0.11 +upstream/4.0 diff --git a/source/plugins/in_emitter/emitter.c b/source/plugins/in_emitter/emitter.c index 064415e8..5d1bd44e 100644 --- a/source/plugins/in_emitter/emitter.c +++ b/source/plugins/in_emitter/emitter.c @@ -285,8 +285,10 @@ static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct fl return -1; } - return flb_input_set_collector_time(in, in_emitter_ingest_ring_buffer, - 1, 0, in->config); + ctx->coll_fd = flb_input_set_collector_time(in, + in_emitter_ingest_ring_buffer, + 1, 0, in->config); + return (ctx->coll_fd < 0) ? -1 : 0; } /* Initialize plugin */ @@ -316,15 +318,9 @@ static int cb_emitter_init(struct flb_input_instance *in, return -1; } - if (scheduler != config->sched && - scheduler != NULL && - ctx->ring_buffer_size == 0) { - + if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) { ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY; - - flb_plg_debug(in, - "threaded emitter instances require ring_buffer_size" - " being set, using default value of %u", + flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)", ctx->ring_buffer_size); } diff --git a/source/plugins/in_splunk/splunk_prot.c b/source/plugins/in_splunk/splunk_prot.c index d55433f6..af0ebc0c 100644 --- a/source/plugins/in_splunk/splunk_prot.c +++ b/source/plugins/in_splunk/splunk_prot.c @@ -642,7 +642,7 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len); } - return 0; + return ret; } static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn, @@ -887,10 +887,11 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } - if (!ret) { + if (ret < 0) { send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); + } else { + send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); } - send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); } else if (strcasecmp(uri, "/services/collector/event/1.0") == 0 || strcasecmp(uri, "/services/collector/event") == 0 || @@ -910,10 +911,11 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } - if (!ret) { + if (ret < 0) { send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}"); + } else { + send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); } - send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}"); } else { send_response(conn, 400, "error: invalid HTTP endpoint\n"); diff --git a/source/src/opentelemetry/flb_opentelemetry_logs.c b/source/src/opentelemetry/flb_opentelemetry_logs.c index a20a9de6..85894225 100644 --- a/source/src/opentelemetry/flb_opentelemetry_logs.c +++ b/source/src/opentelemetry/flb_opentelemetry_logs.c @@ -159,7 +159,7 @@ static int process_json_payload_log_records_entry( trace_id = &log_records_entry->ptr[result].val; } - /* trace_id must be a 32 char hex string */ + /* trace_id must be a 32 char hex string, or empty (skip validation) */ if (trace_id != NULL) { if (trace_id->type != MSGPACK_OBJECT_STR) { if (error_status) { @@ -168,20 +168,25 @@ static int process_json_payload_log_records_entry( return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; } - if (trace_id->via.str.size != 32) { + /* Skip validation if trace_id is empty */ + if (trace_id->via.str.size == 0) { + trace_id = NULL; /* Treat as if not present */ + } + else if (trace_id->via.str.size != 32) { if (error_status) { *error_status = FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; } return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; } - - /* Validate hex format */ - for (i = 0; i < 32; i++) { - if (!isxdigit(trace_id->via.str.ptr[i])) { - if (error_status) { - *error_status = FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; + else { + /* Validate hex format */ + for (i = 0; i < 32; i++) { + if (!isxdigit(trace_id->via.str.ptr[i])) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; + } + return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; } - return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID; } } } @@ -192,6 +197,7 @@ static int process_json_payload_log_records_entry( span_id = &log_records_entry->ptr[result].val; } + /* span_id must be a 16 char hex string, or empty (skip validation) */ if (span_id != NULL) { if (span_id->type != MSGPACK_OBJECT_STR) { if (error_status) { @@ -200,20 +206,25 @@ static int process_json_payload_log_records_entry( return -FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID; } - if (span_id->via.str.size != 16) { + /* Skip validation if span_id is empty */ + if (span_id->via.str.size == 0) { + span_id = NULL; /* Treat as if not present */ + } + else if (span_id->via.str.size != 16) { if (error_status) { *error_status = FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID; } return -FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID; } - - /* Validate hex format */ - for (i = 0; i < 16; i++) { - if (!isxdigit(span_id->via.str.ptr[i])) { - if (error_status) { - *error_status = FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID; + else { + /* Validate hex format */ + for (i = 0; i < 16; i++) { + if (!isxdigit(span_id->via.str.ptr[i])) { + if (error_status) { + *error_status = FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID; + } + return -FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID; } - return -FLB_OTEL_LOGS_ERR_UNEXPECTED_TIMESTAMP_TYPE; } } } diff --git a/source/tests/internal/data/opentelemetry/test_cases.json b/source/tests/internal/data/opentelemetry/test_cases.json index d63b4c99..640a9694 100644 --- a/source/tests/internal/data/opentelemetry/test_cases.json +++ b/source/tests/internal/data/opentelemetry/test_cases.json @@ -1305,5 +1305,26 @@ "log_metadata": {"otlp":{}}, "log_body": {"plain": {"key1": "v1", "key2": 2}} } + }, + + "empty_trace_span_ids_skipped": { + "input": { + "resourceLogs": [{ + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": "1640995200000000000", + "traceId": "", + "spanId": "", + "body": {"stringValue": "test log with empty trace and span IDs"} + }] + }] + }] + }, + "expected": { + "group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0}, + "group_body": {"resource":{}}, + "log_metadata": {"otlp":{}}, + "log_body": {"log": "test log with empty trace and span IDs"} + } } } diff --git a/source/tests/runtime/in_forward.c b/source/tests/runtime/in_forward.c index 6cabfa94..a1627290 100644 --- a/source/tests/runtime/in_forward.c +++ b/source/tests/runtime/in_forward.c @@ -419,7 +419,7 @@ void flb_test_unix_path() TEST_CHECK(ret == 0); /* waiting to create socket */ - flb_time_msleep(200); + flb_time_msleep(200); memset(&sun, 0, sizeof(sun)); fd = socket(AF_LOCAL, SOCK_STREAM, 0); @@ -504,7 +504,7 @@ void flb_test_unix_perm() TEST_CHECK(ret == 0); /* waiting to create socket */ - flb_time_msleep(200); + flb_time_msleep(200); memset(&sun, 0, sizeof(sun)); fd = socket(AF_LOCAL, SOCK_STREAM, 0); @@ -565,6 +565,118 @@ void flb_test_unix_perm() } #endif /* FLB_HAVE_UNIX_SOCKET */ +static int cb_count_only(void *record, size_t size, void *data) +{ + int n = get_output_num(); + set_output_num(n + 1); + flb_free(record); + return 0; +} + +void flb_test_threaded_forward_issue_10946() +{ + struct flb_lib_out_cb cb = {0}; + flb_ctx_t *ctx; + int in_ffd, out_ffd, ret; + int out_count; + flb_sockfd_t fd; + char *buf; + size_t size; + int root_type; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct cfl_variant v_key = { + .type = CFL_VARIANT_STRING, + .data.as_string = "log" + }; + struct cfl_variant v_mode = { + .type = CFL_VARIANT_STRING, + .data.as_string = "partial_message" + }; + char *json = "[\"logs\",1234567890,{\"log\":\"hello\"}]"; + + clear_output_num(); + + cb.cb = cb_count_only; + cb.data = &out_count; + + /* Service */ + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "1", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "forward", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, + "tag", "logs", + "threaded", "true", + NULL); + TEST_CHECK(ret == 0); + + /* Attach a logs-processor: multiline (minimal settings). + * This mirrors the YAML: + * processors.logs: + * - name: multiline + * multiline.key_content: log + * mode: partial_message + */ + proc = flb_processor_create(ctx->config, "ut", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_LOGS, "multiline"); + TEST_CHECK(pu != NULL); + + ret = flb_processor_unit_set_property(pu, "multiline.key_content", &v_key); + TEST_CHECK(ret == 0); + + ret = flb_processor_unit_set_property(pu, "mode", &v_mode); + TEST_CHECK(ret == 0); + + ret = flb_input_set_processor(ctx, in_ffd, proc); + TEST_CHECK(ret == 0); + + /* Output: lib -> count arrivals of tag 'logs' (after processors) */ + out_ffd = flb_output(ctx, (char *) "lib", (void *) &cb); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, + "match", "logs", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start engine */ + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Send a single Forward frame to 'logs' */ + fd = connect_tcp(NULL, -1); + TEST_CHECK(fd >= 0); + + /* ["logs", 1234567890, {"log":"hello"}] */ + ret = flb_pack_json(json, strlen(json), &buf, &size, &root_type, NULL); + TEST_CHECK(ret == 0); + TEST_CHECK(send(fd, buf, size, 0) == (ssize_t) size); + flb_free(buf); + + /* Give it a moment to flush */ + flb_time_msleep(1500); + + /* With the fix, at least one record must arrive */ + out_count = get_output_num(); + TEST_CHECK(out_count > 0); + if (!TEST_CHECK(out_count > 0)) { + TEST_MSG("no outputs with threaded+multiline; emitter RB/collector likely missing"); + } + + /* Cleanup */ + flb_socket_close(fd); + flb_stop(ctx); + flb_destroy(ctx); +} TEST_LIST = { {"forward", flb_test_forward}, @@ -574,6 +686,6 @@ TEST_LIST = { {"unix_path", flb_test_unix_path}, {"unix_perm", flb_test_unix_perm}, #endif + {"issue_10946", flb_test_threaded_forward_issue_10946}, {NULL, NULL} }; -