Skip to content
38 changes: 37 additions & 1 deletion scripts/sync-upstream-patches.sh
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,43 @@ setup_upstream() {
fi

log_info "Fetching versions from upstream..."
git fetch "$UPSTREAM_REMOTE" "$FROM_VERSION" "$TO_VERSION" --no-tags 2>/dev/null

# Detect if FROM_VERSION or TO_VERSION are commit SHAs (not refs)
# SHAs are 7-40 hex characters without 'v' prefix or 'refs/' prefix
local fetch_refs=()

if [[ "$FROM_VERSION" =~ ^[0-9a-f]{7,40}$ ]]; then
log_info "FROM_VERSION appears to be a commit SHA, fetching tags and branches..."
else
fetch_refs+=("$FROM_VERSION")
fi

if [[ "$TO_VERSION" =~ ^[0-9a-f]{7,40}$ ]]; then
log_info "TO_VERSION appears to be a commit SHA, fetching tags and branches..."
else
fetch_refs+=("$TO_VERSION")
fi

# If either version is a SHA, fetch the relevant branch (e.g., 4.0) to get all commits
if [[ "$FROM_VERSION" =~ ^[0-9a-f]{7,40}$ ]] || [[ "$TO_VERSION" =~ ^[0-9a-f]{7,40}$ ]]; then
# Try to infer branch from FROM_VERSION if it's a tag (e.g., v4.0.11 -> 4.0)
local branch=""
if [[ "$FROM_VERSION" =~ ^v([0-9]+\.[0-9]+) ]]; then
branch="${BASH_REMATCH[1]}"
log_info "Fetching branch $branch to include commit SHAs..."
fetch_refs+=("$branch")
else
# Fallback: fetch all tags to find the commits
log_info "Fetching all tags to locate commits..."
git fetch "$UPSTREAM_REMOTE" --tags 2>/dev/null || true
fi
fi

# Fetch the specific refs if any were identified
if [[ ${#fetch_refs[@]} -gt 0 ]]; then
git fetch "$UPSTREAM_REMOTE" "${fetch_refs[@]}" --no-tags 2>/dev/null || true
fi

log_success "Fetched upstream versions"
}

Expand Down
2 changes: 1 addition & 1 deletion source/oss_version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v4.0.11
upstream/4.0
16 changes: 6 additions & 10 deletions source/plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,10 @@ static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct fl
return -1;
}

return flb_input_set_collector_time(in, in_emitter_ingest_ring_buffer,
1, 0, in->config);
ctx->coll_fd = flb_input_set_collector_time(in,
in_emitter_ingest_ring_buffer,
1, 0, in->config);
return (ctx->coll_fd < 0) ? -1 : 0;
}

/* Initialize plugin */
Expand Down Expand Up @@ -316,15 +318,9 @@ static int cb_emitter_init(struct flb_input_instance *in,
return -1;
}

if (scheduler != config->sched &&
scheduler != NULL &&
ctx->ring_buffer_size == 0) {

if (in->is_threaded == FLB_TRUE && ctx->ring_buffer_size == 0) {
ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;

flb_plg_debug(in,
"threaded emitter instances require ring_buffer_size"
" being set, using default value of %u",
flb_plg_debug(in, "threaded: enable emitter ring buffer (size=%u)",
ctx->ring_buffer_size);
}

Expand Down
12 changes: 7 additions & 5 deletions source/plugins/in_splunk/splunk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len);
}

return 0;
return ret;
}

static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
Expand Down Expand Up @@ -887,10 +887,11 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

if (!ret) {
if (ret < 0) {
send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}");
} else {
send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}");
}
send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}");
}
else if (strcasecmp(uri, "/services/collector/event/1.0") == 0 ||
strcasecmp(uri, "/services/collector/event") == 0 ||
Expand All @@ -910,10 +911,11 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

if (!ret) {
if (ret < 0) {
send_json_message_response(conn, 400, "{\"text\":\"Invalid data format\",\"code\":6}");
} else {
send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}");
}
send_json_message_response(conn, 200, "{\"text\":\"Success\",\"code\":0}");
}
else {
send_response(conn, 400, "error: invalid HTTP endpoint\n");
Expand Down
45 changes: 28 additions & 17 deletions source/src/opentelemetry/flb_opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ static int process_json_payload_log_records_entry(
trace_id = &log_records_entry->ptr[result].val;
}

/* trace_id must be a 32 char hex string */
/* trace_id must be a 32 char hex string, or empty (skip validation) */
if (trace_id != NULL) {
if (trace_id->type != MSGPACK_OBJECT_STR) {
if (error_status) {
Expand All @@ -168,20 +168,25 @@ static int process_json_payload_log_records_entry(
return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
}

if (trace_id->via.str.size != 32) {
/* Skip validation if trace_id is empty */
if (trace_id->via.str.size == 0) {
trace_id = NULL; /* Treat as if not present */
}
else if (trace_id->via.str.size != 32) {
if (error_status) {
*error_status = FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
}
return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
}

/* Validate hex format */
for (i = 0; i < 32; i++) {
if (!isxdigit(trace_id->via.str.ptr[i])) {
if (error_status) {
*error_status = FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
else {
/* Validate hex format */
for (i = 0; i < 32; i++) {
if (!isxdigit(trace_id->via.str.ptr[i])) {
if (error_status) {
*error_status = FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
}
return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
}
return -FLB_OTEL_LOGS_ERR_INVALID_TRACE_ID;
}
}
}
Expand All @@ -192,6 +197,7 @@ static int process_json_payload_log_records_entry(
span_id = &log_records_entry->ptr[result].val;
}

/* span_id must be a 16 char hex string, or empty (skip validation) */
if (span_id != NULL) {
if (span_id->type != MSGPACK_OBJECT_STR) {
if (error_status) {
Expand All @@ -200,20 +206,25 @@ static int process_json_payload_log_records_entry(
return -FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID;
}

if (span_id->via.str.size != 16) {
/* Skip validation if span_id is empty */
if (span_id->via.str.size == 0) {
span_id = NULL; /* Treat as if not present */
}
else if (span_id->via.str.size != 16) {
if (error_status) {
*error_status = FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID;
}
return -FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID;
}

/* Validate hex format */
for (i = 0; i < 16; i++) {
if (!isxdigit(span_id->via.str.ptr[i])) {
if (error_status) {
*error_status = FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID;
else {
/* Validate hex format */
for (i = 0; i < 16; i++) {
if (!isxdigit(span_id->via.str.ptr[i])) {
if (error_status) {
*error_status = FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID;
}
return -FLB_OTEL_LOGS_ERR_INVALID_SPAN_ID;
}
return -FLB_OTEL_LOGS_ERR_UNEXPECTED_TIMESTAMP_TYPE;
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions source/tests/internal/data/opentelemetry/test_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1305,5 +1305,26 @@
"log_metadata": {"otlp":{}},
"log_body": {"plain": {"key1": "v1", "key2": 2}}
}
},

"empty_trace_span_ids_skipped": {
"input": {
"resourceLogs": [{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1640995200000000000",
"traceId": "",
"spanId": "",
"body": {"stringValue": "test log with empty trace and span IDs"}
}]
}]
}]
},
"expected": {
"group_metadata": {"schema":"otlp","resource_id":0,"scope_id":0},
"group_body": {"resource":{}},
"log_metadata": {"otlp":{}},
"log_body": {"log": "test log with empty trace and span IDs"}
}
}
}
118 changes: 115 additions & 3 deletions source/tests/runtime/in_forward.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ void flb_test_unix_path()
TEST_CHECK(ret == 0);

/* waiting to create socket */
flb_time_msleep(200);
flb_time_msleep(200);

memset(&sun, 0, sizeof(sun));
fd = socket(AF_LOCAL, SOCK_STREAM, 0);
Expand Down Expand Up @@ -504,7 +504,7 @@ void flb_test_unix_perm()
TEST_CHECK(ret == 0);

/* waiting to create socket */
flb_time_msleep(200);
flb_time_msleep(200);

memset(&sun, 0, sizeof(sun));
fd = socket(AF_LOCAL, SOCK_STREAM, 0);
Expand Down Expand Up @@ -565,6 +565,118 @@ void flb_test_unix_perm()
}
#endif /* FLB_HAVE_UNIX_SOCKET */

static int cb_count_only(void *record, size_t size, void *data)
{
int n = get_output_num();
set_output_num(n + 1);
flb_free(record);
return 0;
}

void flb_test_threaded_forward_issue_10946()
{
struct flb_lib_out_cb cb = {0};
flb_ctx_t *ctx;
int in_ffd, out_ffd, ret;
int out_count;
flb_sockfd_t fd;
char *buf;
size_t size;
int root_type;
struct flb_processor *proc;
struct flb_processor_unit *pu;
struct cfl_variant v_key = {
.type = CFL_VARIANT_STRING,
.data.as_string = "log"
};
struct cfl_variant v_mode = {
.type = CFL_VARIANT_STRING,
.data.as_string = "partial_message"
};
char *json = "[\"logs\",1234567890,{\"log\":\"hello\"}]";

clear_output_num();

cb.cb = cb_count_only;
cb.data = &out_count;

/* Service */
ctx = flb_create();
TEST_CHECK(ctx != NULL);
flb_service_set(ctx,
"Flush", "0.200000000",
"Grace", "1",
"Log_Level", "error",
NULL);

in_ffd = flb_input(ctx, (char *) "forward", NULL);
TEST_CHECK(in_ffd >= 0);
ret = flb_input_set(ctx, in_ffd,
"tag", "logs",
"threaded", "true",
NULL);
TEST_CHECK(ret == 0);

/* Attach a logs-processor: multiline (minimal settings).
* This mirrors the YAML:
* processors.logs:
* - name: multiline
* multiline.key_content: log
* mode: partial_message
*/
proc = flb_processor_create(ctx->config, "ut", NULL, 0);
TEST_CHECK(proc != NULL);

pu = flb_processor_unit_create(proc, FLB_PROCESSOR_LOGS, "multiline");
TEST_CHECK(pu != NULL);

ret = flb_processor_unit_set_property(pu, "multiline.key_content", &v_key);
TEST_CHECK(ret == 0);

ret = flb_processor_unit_set_property(pu, "mode", &v_mode);
TEST_CHECK(ret == 0);

ret = flb_input_set_processor(ctx, in_ffd, proc);
TEST_CHECK(ret == 0);

/* Output: lib -> count arrivals of tag 'logs' (after processors) */
out_ffd = flb_output(ctx, (char *) "lib", (void *) &cb);
TEST_CHECK(out_ffd >= 0);
ret = flb_output_set(ctx, out_ffd,
"match", "logs",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start engine */
ret = flb_start(ctx);
TEST_CHECK(ret == 0);

/* Send a single Forward frame to 'logs' */
fd = connect_tcp(NULL, -1);
TEST_CHECK(fd >= 0);

/* ["logs", 1234567890, {"log":"hello"}] */
ret = flb_pack_json(json, strlen(json), &buf, &size, &root_type, NULL);
TEST_CHECK(ret == 0);
TEST_CHECK(send(fd, buf, size, 0) == (ssize_t) size);
flb_free(buf);

/* Give it a moment to flush */
flb_time_msleep(1500);

/* With the fix, at least one record must arrive */
out_count = get_output_num();
TEST_CHECK(out_count > 0);
if (!TEST_CHECK(out_count > 0)) {
TEST_MSG("no outputs with threaded+multiline; emitter RB/collector likely missing");
}

/* Cleanup */
flb_socket_close(fd);
flb_stop(ctx);
flb_destroy(ctx);
}

TEST_LIST = {
{"forward", flb_test_forward},
Expand All @@ -574,6 +686,6 @@ TEST_LIST = {
{"unix_path", flb_test_unix_path},
{"unix_perm", flb_test_unix_perm},
#endif
{"issue_10946", flb_test_threaded_forward_issue_10946},
{NULL, NULL}
};

Loading