Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HAPROXY] use $SOURCEIP instead of $PROXIED_SRCIP #361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a56dc57
light: allow the use of "auto" transport for syslog() source drivers
bazsi Feb 1, 2025
a9054df
logproto: rename prepare() methods to poll_prepare
bazsi Nov 8, 2024
b2c5cc7
transport-stack: add poll_prepare() methods for LogTransport and LogT…
bazsi Nov 8, 2024
e3eae2a
transport-stack: add LogTransportStack level read/write/writev methods
bazsi Nov 10, 2024
97b8259
transport-adapter: add free_method
bazsi Feb 1, 2025
5fa97df
transport/tls-context: simplify compression setup
bazsi Feb 1, 2025
2b3296a
transport-tls: convert to a LogTransportAdapter
bazsi Nov 8, 2024
8aeb9ff
transport-aux-data: add log_transport_aux_data_move()
bazsi Nov 11, 2024
919bccd
transport-stack: make it possible to add aux data from the LogTranspo…
bazsi Nov 10, 2024
aebabff
logproto: call transport-stack level I/O methods
bazsi Nov 8, 2024
f0704b0
logproto/logproto-auto-server: remove RFC6587 references from log mes…
bazsi Feb 1, 2025
02e1fd9
templates: add $SOURCEPORT macro
bazsi Feb 1, 2025
bc95df8
transport-haproxy: use the normal $SOURCEIP, $DESTIP, $DESTPORT macros
bazsi Nov 2, 2024
dd9f318
transport-haproxy: use the LogTransportStack->aux to save src/dst add…
bazsi Nov 10, 2024
b4f0552
transport-haproxy: implement transport switch instead of changing bas…
bazsi Nov 10, 2024
d038370
transport-haproxy: add more information to the proxy detection failur…
bazsi Nov 10, 2024
7511b97
afsocket: refactor TransportMapperInet transport setup logic
bazsi Feb 1, 2025
b52ecdb
afsocket: transport(auto) with tls() should immediately start using TLS
bazsi Feb 1, 2025
96cc1f1
transport/transport-factory-haproxy: use factory for creating HAProxy…
bazsi Feb 1, 2025
cc9260a
light: generalize test_pp_network and test_pp_syslog test cases
bazsi Feb 1, 2025
0a44754
light: fix up files included in the dist
bazsi Feb 2, 2025
6344ea8
news: added news file
bazsi Dec 3, 2024
d4a7589
transport: remove unused files
bazsi Feb 6, 2025
704ded6
transport: free LogTransportBIO BIO_meth instance at shutdown
bazsi Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/apphook.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "timeutils/cache.h"
#include "multi-line/multi-line-factory.h"
#include "filterx/filterx-globals.h"
#include "transport/transport-globals.h"

#include <iv.h>
#include <iv_work.h>
Expand Down Expand Up @@ -243,6 +244,7 @@ app_startup(void)
timeutils_global_init();
multi_line_global_init();
filterx_global_init();
log_transport_global_init();
}

void
Expand Down Expand Up @@ -270,6 +272,7 @@ app_shutdown(void)
msg_stats_deinit();
run_application_hook(AH_SHUTDOWN);

log_transport_global_deinit();
filterx_global_deinit();
multi_line_global_deinit();
main_loop_thread_resource_deinit();
Expand Down
26 changes: 13 additions & 13 deletions lib/logproto/logproto-auto-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,36 @@ typedef struct _LogProtoAutoServer
static LogProtoServer *
_construct_detected_proto(LogProtoAutoServer *self, const gchar *detect_buffer, gsize detect_buffer_len)
{
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint fd = self->super.transport_stack.fd;

if (g_ascii_isdigit(detect_buffer[0]))
{
msg_debug("Auto-detected octet-counted-framing on RFC6587 connection, using framed protocol",
evt_tag_int("fd", transport->fd));
msg_debug("Auto-detected octet-counted-framing, using framed protocol",
evt_tag_int("fd", fd));
return log_proto_framed_server_new(NULL, self->super.options);
}
if (detect_buffer[0] == '<')
{
msg_debug("Auto-detected non-transparent-framing on RFC6587 connection, using simple text protocol",
evt_tag_int("fd", transport->fd));
msg_debug("Auto-detected non-transparent-framing, using simple text protocol",
evt_tag_int("fd", fd));
}
else
{
msg_debug("Unable to detect framing on RFC6587 connection, falling back to simple text transport",
evt_tag_int("fd", transport->fd),
msg_debug("Unable to detect framing, falling back to simple text transport",
evt_tag_int("fd", fd),
evt_tag_mem("detect_buffer", detect_buffer, detect_buffer_len));
}
return log_proto_text_server_new(NULL, self->super.options);
}

static LogProtoPrepareAction
log_proto_auto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
log_proto_auto_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoAutoServer *self = (LogProtoAutoServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*cond = transport->cond;
if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond))
return LPPA_FORCE_SCHEDULE_FETCH;

if (*cond == 0)
*cond = G_IO_IN;

Expand All @@ -72,12 +73,11 @@ static LogProtoStatus
log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement)
{
LogProtoAutoServer *self = (LogProtoAutoServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gchar detect_buffer[8];
gboolean moved_forward;
gint rc;

rc = log_transport_read_ahead(transport, detect_buffer, sizeof(detect_buffer), &moved_forward);
rc = log_transport_stack_read_ahead(&self->super.transport_stack, detect_buffer, sizeof(detect_buffer), &moved_forward);
if (rc == 0)
return LPS_EOF;
else if (rc < 0)
Expand All @@ -101,6 +101,6 @@ log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *

log_proto_server_init(&self->super, transport, options);
self->super.handshake = log_proto_auto_handshake;
self->super.prepare = log_proto_auto_server_prepare;
self->super.poll_prepare = log_proto_auto_server_poll_prepare;
return &self->super;
}
17 changes: 7 additions & 10 deletions lib/logproto/logproto-buffered-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,9 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry
struct stat st;
gint64 ofs = 0;
LogProtoBufferedServerState *state;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint fd;

fd = transport->fd;
fd = self->super.transport_stack.fd;
self->persist_handle = handle;

if (fstat(fd, &st) < 0)
Expand Down Expand Up @@ -255,7 +254,7 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry
raw_buffer = g_alloca(state->raw_buffer_size);
}

rc = log_transport_read(transport, raw_buffer, state->raw_buffer_size, NULL);
rc = log_transport_stack_read(&self->super.transport_stack, raw_buffer, state->raw_buffer_size, NULL);
if (rc != state->raw_buffer_size)
{
msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
Expand Down Expand Up @@ -584,12 +583,12 @@ log_proto_buffered_server_restart_with_state(LogProtoServer *s, PersistState *pe
}

LogProtoPrepareAction
log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
log_proto_buffered_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*cond = transport->cond;
if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond))
return LPPA_FORCE_SCHEDULE_FETCH;

/* if there's no pending I/O in the transport layer, then we want to do a read */
if (*cond == 0)
Expand All @@ -602,9 +601,7 @@ static gint
log_proto_buffered_server_read_data_method(LogProtoBufferedServer *self, guchar *buf, gsize len,
LogTransportAuxData *aux)
{
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

return log_transport_read(transport, buf, len, aux);
return log_transport_stack_read(&self->super.transport_stack, buf, len, aux);
}

static void
Expand Down Expand Up @@ -1081,7 +1078,7 @@ log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *trans
const LogProtoServerOptions *options)
{
log_proto_server_init(&self->super, transport, options);
self->super.prepare = log_proto_buffered_server_prepare;
self->super.poll_prepare = log_proto_buffered_server_poll_prepare;
self->super.fetch = log_proto_buffered_server_fetch;
self->super.free_fn = log_proto_buffered_server_free_method;
self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
Expand Down
4 changes: 2 additions & 2 deletions lib/logproto/logproto-buffered-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ log_proto_buffered_server_cue_flush(LogProtoBufferedServer *self)
self->flush_partial_message = TRUE;
}

LogProtoPrepareAction log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond,
gint *timeout G_GNUC_UNUSED);
LogProtoPrepareAction log_proto_buffered_server_poll_prepare(LogProtoServer *s, GIOCondition *cond,
gint *timeout G_GNUC_UNUSED);
LogProtoBufferedServerState *log_proto_buffered_server_get_state(LogProtoBufferedServer *self);
void log_proto_buffered_server_put_state(LogProtoBufferedServer *self);
gboolean log_proto_buffered_server_restart_with_state(LogProtoServer *s,
Expand Down
7 changes: 3 additions & 4 deletions lib/logproto/logproto-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ struct _LogProtoClient
LogProtoStatus status;
const LogProtoClientOptions *options;
LogTransportStack transport_stack;
/* FIXME: rename to something else */
gboolean (*prepare)(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout);
gboolean (*poll_prepare)(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout);
LogProtoStatus (*post)(LogProtoClient *s, LogMessage *logmsg, guchar *msg, gsize msg_len, gboolean *consumed);
LogProtoStatus (*process_in)(LogProtoClient *s);
LogProtoStatus (*flush)(LogProtoClient *s);
Expand Down Expand Up @@ -124,9 +123,9 @@ log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished)
}

static inline gboolean
log_proto_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
log_proto_client_poll_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
{
gboolean result = s->prepare(s, fd, cond, timeout);
gboolean result = s->poll_prepare(s, fd, cond, timeout);

if (!result && *timeout < 0)
*timeout = s->options->idle_timeout;
Expand Down
18 changes: 9 additions & 9 deletions lib/logproto/logproto-framed-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ typedef struct _LogProtoFramedServer
} LogProtoFramedServer;

static LogProtoPrepareAction
log_proto_framed_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
log_proto_framed_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoFramedServer *self = (LogProtoFramedServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*cond = transport->cond;
if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond))
return LPPA_FORCE_SCHEDULE_FETCH;

/* there is a half message in our buffer so try to wait */
if (!self->half_message_in_buffer)
Expand All @@ -97,7 +97,6 @@ static gboolean
log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read,
LogProtoStatus *status)
{
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint rc;
*status = LPS_SUCCESS;

Expand All @@ -111,15 +110,16 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
return FALSE;

log_transport_aux_data_reinit(&self->buffer_aux);
rc = log_transport_read(transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end,
&self->buffer_aux);
rc = log_transport_stack_read(&self->super.transport_stack,
&self->buffer[self->buffer_end], self->buffer_size - self->buffer_end,
&self->buffer_aux);

if (rc < 0)
{
if (errno != EAGAIN)
{
msg_error("Error reading RFC6587 style framed data",
evt_tag_int("fd", transport->fd),
evt_tag_int("fd", self->super.transport_stack.fd),
evt_tag_error("error"));
*status = LPS_ERROR;
}
Expand All @@ -134,7 +134,7 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea
if (rc == 0)
{
msg_trace("EOF occurred while reading",
evt_tag_int(EVT_TAG_FD, transport->fd));
evt_tag_int(EVT_TAG_FD, self->super.transport_stack.fd));
*status = LPS_EOF;
return FALSE;
}
Expand Down Expand Up @@ -430,7 +430,7 @@ log_proto_framed_server_new(LogTransport *transport, const LogProtoServerOptions
LogProtoFramedServer *self = g_new0(LogProtoFramedServer, 1);

log_proto_server_init(&self->super, transport, options);
self->super.prepare = log_proto_framed_server_prepare;
self->super.poll_prepare = log_proto_framed_server_poll_prepare;
self->super.fetch = log_proto_framed_server_fetch;
self->super.free_fn = log_proto_framed_server_free;
self->half_message_in_buffer = FALSE;
Expand Down
5 changes: 2 additions & 3 deletions lib/logproto/logproto-record-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,16 @@ static gint
log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, LogTransportAuxData *aux)
{
LogProtoRecordServer *self = (LogProtoRecordServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.super.transport_stack);
gint rc;

/* assert that we have enough space in the buffer to read record_size bytes */
g_assert(len >= self->record_size);
len = self->record_size;
rc = log_transport_read(transport, buf, len, aux);
rc = log_transport_stack_read(&s->super.transport_stack, buf, len, aux);
if (rc > 0 && rc != self->record_size)
{
msg_error("Record size was set, and couldn't read enough bytes",
evt_tag_int(EVT_TAG_FD, transport->fd),
evt_tag_int(EVT_TAG_FD, s->super.transport_stack.fd),
evt_tag_int("record_size", self->record_size),
evt_tag_int("read", rc));
errno = EIO;
Expand Down
7 changes: 3 additions & 4 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ struct _LogProtoServer
AckTracker *ack_tracker;

LogProtoServerWakeupCallback wakeup_callback;
/* FIXME: rename to something else */
LogProtoPrepareAction (*prepare)(LogProtoServer *s, GIOCondition *cond, gint *timeout);
LogProtoPrepareAction (*poll_prepare)(LogProtoServer *s, GIOCondition *cond, gint *timeout);
gboolean (*restart_with_state)(LogProtoServer *s, PersistState *state, const gchar *persist_name);
LogProtoStatus (*fetch)(LogProtoServer *s, const guchar **msg, gsize *msg_len, gboolean *may_read,
LogTransportAuxData *aux, Bookmark *bookmark);
Expand Down Expand Up @@ -125,9 +124,9 @@ log_proto_server_set_options(LogProtoServer *self, const LogProtoServerOptions *
}

static inline LogProtoPrepareAction
log_proto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout)
log_proto_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout)
{
LogProtoPrepareAction result = s->prepare(s, cond, timeout);
LogProtoPrepareAction result = s->poll_prepare(s, cond, timeout);

if (result == LPPA_POLL_IO && *timeout < 0)
*timeout = s->options->idle_timeout;
Expand Down
26 changes: 14 additions & 12 deletions lib/logproto/logproto-text-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
#include <errno.h>

static gboolean
log_proto_text_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
log_proto_text_client_poll_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout)
{
LogProtoTextClient *self = (LogProtoTextClient *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*fd = transport->fd;
*cond = transport->cond;
if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond))
return TRUE;

*fd = self->super.transport_stack.fd;

/* if there's no pending I/O in the transport layer, then we want to do a write */
if (*cond == 0)
Expand All @@ -46,24 +47,26 @@ static LogProtoStatus
log_proto_text_client_drop_input(LogProtoClient *s)
{
LogProtoTextClient *self = (LogProtoTextClient *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
guchar buf[1024];
gint rc = -1;

do
{
rc = log_transport_read(transport, buf, sizeof(buf), NULL);
rc = log_transport_stack_read(&self->super.transport_stack, buf, sizeof(buf), NULL);
}
while (rc > 0);

if (rc == -1 && errno != EAGAIN)
{
msg_error("Error reading data", evt_tag_int("fd", transport->fd), evt_tag_error("error"));
msg_error("Error reading data",
evt_tag_int("fd", self->super.transport_stack.fd),
evt_tag_error("error"));
return LPS_ERROR;
}
else if (rc == 0)
{
msg_error("EOF occurred while idle", evt_tag_int("fd", transport->fd));
msg_error("EOF occurred while idle",
evt_tag_int("fd", self->super.transport_stack.fd));
return LPS_ERROR;
}

Expand All @@ -74,7 +77,6 @@ static LogProtoStatus
log_proto_text_client_flush(LogProtoClient *s)
{
LogProtoTextClient *self = (LogProtoTextClient *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint rc;

if (!self->partial)
Expand All @@ -85,13 +87,13 @@ log_proto_text_client_flush(LogProtoClient *s)
/* attempt to flush previously buffered data */
gint len = self->partial_len - self->partial_pos;

rc = log_transport_write(transport, &self->partial[self->partial_pos], len);
rc = log_transport_stack_write(&self->super.transport_stack, &self->partial[self->partial_pos], len);
if (rc < 0)
{
if (errno != EAGAIN && errno != EINTR)
{
msg_error("I/O error occurred while writing",
evt_tag_int("fd", transport->fd),
evt_tag_int("fd", self->super.transport_stack.fd),
evt_tag_error(EVT_TAG_OSERROR));
return LPS_ERROR;
}
Expand Down Expand Up @@ -194,7 +196,7 @@ void
log_proto_text_client_init(LogProtoTextClient *self, LogTransport *transport, const LogProtoClientOptions *options)
{
log_proto_client_init(&self->super, transport, options);
self->super.prepare = log_proto_text_client_prepare;
self->super.poll_prepare = log_proto_text_client_poll_prepare;
self->super.flush = log_proto_text_client_flush;
if (options->drop_input)
self->super.process_in = log_proto_text_client_drop_input;
Expand Down
6 changes: 3 additions & 3 deletions lib/logproto/logproto-text-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
#include <string.h>

LogProtoPrepareAction
log_proto_text_server_prepare_method(LogProtoServer *s, GIOCondition *cond, gint *timeout)
log_proto_text_server_poll_prepare_method(LogProtoServer *s, GIOCondition *cond, gint *timeout)
{
LogProtoTextServer *self = (LogProtoTextServer *) s;
gboolean avail;

LogProtoPrepareAction action = log_proto_buffered_server_prepare(s, cond, timeout);
LogProtoPrepareAction action = log_proto_buffered_server_poll_prepare(s, cond, timeout);
if (action != LPPA_POLL_IO)
return action;

Expand Down Expand Up @@ -331,7 +331,7 @@ void
log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, const LogProtoServerOptions *options)
{
log_proto_buffered_server_init(&self->super, transport, options);
self->super.super.prepare = log_proto_text_server_prepare_method;
self->super.super.poll_prepare = log_proto_text_server_poll_prepare_method;
self->super.super.free_fn = log_proto_text_server_free;
self->super.super.restart_with_state = log_proto_text_server_restart_with_state;
self->super.fetch_from_buffer = log_proto_text_server_fetch_from_buffer;
Expand Down
Loading