Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downstream: PR 6882 rework #6906

Merged
merged 10 commits into from
Feb 24, 2023
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ int flb_input_log_check(struct flb_input_instance *ins, int l);

struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins);
int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins);
int flb_input_downstream_set(struct flb_stream *stream,
int flb_input_downstream_set(struct flb_downstream *stream,
struct flb_input_instance *ins);

#endif
4 changes: 2 additions & 2 deletions plugins/in_elasticsearch/in_elasticsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ static int in_elasticsearch_bulk_init(struct flb_input_instance *ins,
/* Set the context */
flb_input_set_context(ins, ctx);

ctx->evl = config->evl;

port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);

if (flb_random_bytes(rand, 16)) {
Expand Down Expand Up @@ -153,6 +151,8 @@ static int in_elasticsearch_bulk_init(struct flb_input_instance *ins,
return -1;
}

flb_input_downstream_set(ctx->downstream, ctx->ins);

/* Collect upon data available on the standard input */
ret = flb_input_set_collector_socket(ins,
in_elasticsearch_bulk_collect,
Expand Down
1 change: 0 additions & 1 deletion plugins/in_elasticsearch/in_elasticsearch.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct flb_in_elasticsearch {

struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* linked list of connections */
struct mk_event_loop *evl; /* Event loop context */

struct mk_server *server;
struct flb_input_instance *ins;
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_elasticsearch/in_elasticsearch_bulk_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ struct in_elasticsearch_bulk_conn *in_elasticsearch_bulk_conn_add(struct flb_con
conn->buf_size = ctx->buffer_chunk_size;

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
5 changes: 3 additions & 2 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_engine.h>
#include <fluent-bit/flb_downstream.h>
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_network.h>
Expand Down Expand Up @@ -227,9 +228,9 @@ static int in_fw_init(struct flb_input_instance *ins,
}
}

flb_net_socket_nonblocking(ctx->downstream->server_fd);
flb_input_downstream_set(ctx->downstream, ctx->ins);

ctx->evl = config->evl;
flb_net_socket_nonblocking(ctx->downstream->server_fd);

/* Collect upon data available on the standard input */
ret = flb_input_set_collector_socket(ins,
Expand Down
1 change: 0 additions & 1 deletion plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ struct flb_in_fw_config {
int coll_fd;
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
};

Expand Down
2 changes: 1 addition & 1 deletion plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_
conn->in = ctx->ins;

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ static int in_http_init(struct flb_input_instance *ins,
/* Set the context */
flb_input_set_context(ins, ctx);

ctx->evl = config->evl;

port = (unsigned short int) strtoul(ctx->tcp_port, NULL, 10);

ctx->downstream = flb_downstream_create(FLB_TRANSPORT_TCP,
Expand All @@ -112,6 +110,8 @@ static int in_http_init(struct flb_input_instance *ins,
return -1;
}

flb_input_downstream_set(ctx->downstream, ctx->ins);

if (ctx->successful_response_code != 200 &&
ctx->successful_response_code != 201 &&
ctx->successful_response_code != 204) {
Expand Down
1 change: 0 additions & 1 deletion plugins/in_http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ struct flb_http {

struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* linked list of connections */
struct mk_event_loop *evl; /* Event loop context */

struct mk_server *server;
struct flb_input_instance *ins;
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_http/http_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ struct http_conn *http_conn_add(struct flb_connection *connection,
conn->buf_size = ctx->buffer_chunk_size;

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_mqtt/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static int in_mqtt_init(struct flb_input_instance *in,
return -1;
}

ctx->evl = config->evl;
flb_input_downstream_set(ctx->downstream, ctx->ins);

/* Collect upon data available on the standard input */
ret = flb_input_set_collector_event(in,
Expand Down
1 change: 0 additions & 1 deletion plugins/in_mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ struct flb_in_mqtt_config {
int msgp_len; /* msgpack data length */
char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */
struct flb_input_instance *ins; /* plugin input instance */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_downstream *downstream; /* Client manager */
struct mk_list conns; /* Active connections */
};
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_mqtt/mqtt_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection,
conn->status = MQTT_NEW;

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_opentelemetry/http_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ struct http_conn *opentelemetry_conn_add(struct flb_connection *connection,
conn->buf_size = ctx->buffer_chunk_size;

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ static int in_opentelemetry_init(struct flb_input_instance *ins,
return -1;
}

flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port);
flb_input_downstream_set(ctx->downstream, ctx->ins);

ctx->evl = config->evl;
flb_plg_info(ctx->ins, "listening on %s:%s", ctx->listen, ctx->tcp_port);

if (ctx->successful_response_code != 200 &&
ctx->successful_response_code != 201 &&
Expand Down
1 change: 0 additions & 1 deletion plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ struct flb_opentelemetry {
int collector_id; /* Listener collector id */
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* linked list of connections */
struct mk_event_loop *evl; /* Event loop context */

struct mk_server *server;
struct flb_input_instance *ins;
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_syslog/syslog.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ static int in_syslog_init(struct flb_input_instance *in,
return -1;
}

flb_input_downstream_set(ctx->downstream, ctx->ins);

if (ctx->dgram_mode_flag) {
connection = flb_downstream_conn_get(ctx->downstream);

Expand Down
1 change: 0 additions & 1 deletion plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ struct flb_syslog {

/* List for connections and event loop */
struct mk_list connections;
struct mk_event_loop *evl;
struct flb_input_instance *ins;
};

Expand Down
1 change: 0 additions & 1 deletion plugins/in_syslog/syslog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
return NULL;
}

ctx->evl = config->evl;
ctx->ins = ins;

mk_list_init(&ctx->connections);
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_syslog/syslog_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ struct syslog_conn *syslog_conn_add(struct flb_connection *connection,
* stream mode (UDP events are received through the collector)
*/
if (!ctx->dgram_mode_flag) {
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static int in_tcp_init(struct flb_input_instance *in,
return -1;
}

ctx->evl = config->evl;
flb_input_downstream_set(ctx->downstream, ctx->ins);

/* Collect upon data available on the standard input */
ret = flb_input_set_collector_socket(in,
Expand Down
1 change: 0 additions & 1 deletion plugins/in_tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ struct flb_in_tcp_config {
int collector_id; /* Listener collector id */
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
};

Expand Down
2 changes: 1 addition & 1 deletion plugins/in_tcp/tcp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ struct tcp_conn *tcp_conn_add(struct flb_connection *connection,
}

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_udp/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static int in_udp_init(struct flb_input_instance *in,
return -1;
}

ctx->evl = config->evl;
flb_input_downstream_set(ctx->downstream, ctx->ins);

connection = flb_downstream_conn_get(ctx->downstream);

Expand Down
1 change: 0 additions & 1 deletion plugins/in_udp/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ struct flb_in_udp_config {
int collector_id; /* Listener collector id */
struct flb_downstream *downstream; /* Client manager */
struct udp_conn *dummy_conn; /* Datagram dummy connection */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
};

Expand Down
4 changes: 2 additions & 2 deletions plugins/in_unix_socket/unix_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ static int in_unix_socket_init(struct flb_input_instance *in,
return -1;
}

flb_input_downstream_set(ctx->downstream, ctx->ins);

if (ctx->socket_permissions != NULL) {
ret = chmod(ctx->listen, ctx->socket_acl);

Expand All @@ -189,8 +191,6 @@ static int in_unix_socket_init(struct flb_input_instance *in,
}
}

ctx->evl = config->evl;

if (ctx->dgram_mode_flag) {
connection = flb_downstream_conn_get(ctx->downstream);

Expand Down
1 change: 0 additions & 1 deletion plugins/in_unix_socket/unix_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ struct flb_in_unix_socket_config {
struct flb_downstream *downstream; /* Client manager */
struct unix_socket_conn *dummy_conn;/* Datagram dummy connection */
struct mk_list connections; /* List of active connections */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
};

Expand Down
2 changes: 1 addition & 1 deletion plugins/in_unix_socket/unix_socket_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ struct unix_socket_conn *unix_socket_conn_add(struct flb_connection *connection,
}

/* Register instance into the event loop */
ret = mk_event_add(ctx->evl,
ret = mk_event_add(flb_engine_evl_get(),
connection->fd,
FLB_ENGINE_EV_CUSTOM,
MK_EVENT_READ,
Expand Down
29 changes: 3 additions & 26 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -1909,7 +1909,7 @@ int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *in
return 0;
}

int flb_input_downstream_set(struct flb_stream *stream,
int flb_input_downstream_set(struct flb_downstream *stream,
struct flb_input_instance *ins)
{
int flags = 0;
Expand All @@ -1918,38 +1918,15 @@ int flb_input_downstream_set(struct flb_stream *stream,
return -1;
}

/* TLS */
#ifdef FLB_HAVE_TLS
if (ins->use_tls == FLB_TRUE) {
flags |= FLB_IO_TLS;
}
else {
flags |= FLB_IO_TCP;
}
#else
flags |= FLB_IO_TCP;
#endif

/* IPv6 */
if (ins->host.ipv6 == FLB_TRUE) {
flags |= FLB_IO_IPV6;
}

/* Set flags */
flb_stream_enable_flags(stream, flags);

/*
* If the input plugin will run in multiple threads, enable
* the thread safe mode for the Downstream context.
*/
if (flb_input_is_threaded(ins)) {
flb_stream_enable_thread_safety(stream);
flb_stream_enable_thread_safety(&stream->base);

mk_list_add(&stream->_head, &ins->downstreams);
mk_list_add(&stream->base._head, &ins->downstreams);
}

/* Set networking options 'net.*' received through instance properties */
memcpy(&stream->net, &ins->net_setup, sizeof(struct flb_net_setup));

return 0;
}