From 7fff66014464893b519ac944979f11f38b6b2c3a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 3 May 2022 07:37:35 +0200 Subject: [PATCH] Add UDP protocol support to Promtail's syslog target (#5790) Until now, the syslog target in Promtail did only support TCP. This PR adds support for sending syslog messages to Promtail (via ng-syslog or rsyslog) via UDP. UPD for syslogs can be used if you prefer speed over guaranteed delivery. Signed-off-by: Christian Haudum --- CHANGELOG.md | 1 + .../pkg/promtail/scrapeconfig/scrapeconfig.go | 4 + .../syslog/syslogparser/syslogparser.go | 8 +- .../promtail/targets/syslog/syslogtarget.go | 240 ++--------- .../targets/syslog/syslogtarget_test.go | 355 ++++++++++----- .../pkg/promtail/targets/syslog/transport.go | 406 ++++++++++++++++++ docs/sources/clients/promtail/scraping.md | 19 +- 7 files changed, 705 insertions(+), 328 deletions(-) create mode 100644 clients/pkg/promtail/targets/syslog/transport.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 892819b080234..ed4d4208fffb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5790](https://github.com/grafana/loki/pull/5790) **chaudum**: Add UDP support for Promtail's syslog target. * [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters. * [5943](https://github.com/grafana/loki/pull/5943) **tpaschalis**: Add support for exclusion patterns in Promtail's static_config * [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode. diff --git a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go index f05a4cacbceff..769755c6c662f 100644 --- a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go +++ b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go @@ -165,6 +165,10 @@ type SyslogTargetConfig struct { // ListenAddress is the address to listen on for syslog messages. ListenAddress string `yaml:"listen_address"` + // ListenProtocol is the protocol used to listen for syslog messages. + // Must be either `tcp` (default) or `udp` + ListenProtocol string `yaml:"listen_protocol"` + // IdleTimeout is the idle timeout for tcp connections. IdleTimeout time.Duration `yaml:"idle_timeout"` diff --git a/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go b/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go index adfc44d2d525a..dfc76bb3538aa 100644 --- a/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go +++ b/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go @@ -15,20 +15,20 @@ import ( // detects octet counting. // The function returns on EOF or unrecoverable errors. func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error { - buf := bufio.NewReader(r) + buf := bufio.NewReaderSize(r, 1<<10) - firstByte, err := buf.Peek(1) + b, err := buf.ReadByte() if err != nil { return err } + _ = buf.UnreadByte() - b := firstByte[0] if b == '<' { nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } else if b >= '0' && b <= '9' { octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } else { - return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte) + return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b)) } return nil diff --git a/clients/pkg/promtail/targets/syslog/syslogtarget.go b/clients/pkg/promtail/targets/syslog/syslogtarget.go index c2ff44da00f89..57d2efd991b7e 100644 --- a/clients/pkg/promtail/targets/syslog/syslogtarget.go +++ b/clients/pkg/promtail/targets/syslog/syslogtarget.go @@ -1,30 +1,22 @@ package syslog import ( - "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" - "io/ioutil" "net" "strings" - "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/backoff" "github.com/influxdata/go-syslog/v3" "github.com/influxdata/go-syslog/v3/rfc5424" - "github.com/mwitkow/go-conntrack" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" - "github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser" "github.com/grafana/loki/clients/pkg/promtail/targets/target" "github.com/grafana/loki/pkg/logproto" @@ -33,6 +25,7 @@ import ( var ( defaultIdleTimeout = 120 * time.Second defaultMaxMessageLength = 8192 + defaultProtocol = protocolTCP ) // SyslogTarget listens to syslog messages. @@ -44,12 +37,10 @@ type SyslogTarget struct { config *scrapeconfig.SyslogTargetConfig relabelConfig []*relabel.Config - listener net.Listener - messages chan message + transport Transport - ctx context.Context - ctxCancel context.CancelFunc - openConnections *sync.WaitGroup + messages chan message + messagesDone chan struct{} } type message struct { @@ -67,144 +58,42 @@ func NewSyslogTarget( config *scrapeconfig.SyslogTargetConfig, ) (*SyslogTarget, error) { - ctx, cancel := context.WithCancel(context.Background()) - t := &SyslogTarget{ metrics: metrics, logger: logger, handler: handler, config: config, relabelConfig: relabel, - - ctx: ctx, - ctxCancel: cancel, - openConnections: new(sync.WaitGroup), + messagesDone: make(chan struct{}), + } + + switch t.transportProtocol() { + case protocolTCP: + t.transport = NewSyslogTCPTransport( + config, + t.handleMessage, + t.handleMessageError, + logger, + ) + case protocolUDP: + t.transport = NewSyslogUDPTransport( + config, + t.handleMessage, + t.handleMessageError, + logger, + ) + default: + return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol()) } t.messages = make(chan message) go t.messageSender(handler.Chan()) - err := t.run() - return t, err -} - -func (t *SyslogTarget) run() error { - l, err := net.Listen("tcp", t.config.ListenAddress) - l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress)) + err := t.transport.Run() if err != nil { - return fmt.Errorf("error setting up syslog target: %w", err) - } - - tlsEnabled := t.config.TLSConfig.CertFile != "" || t.config.TLSConfig.KeyFile != "" || t.config.TLSConfig.CAFile != "" - if tlsEnabled { - tlsConfig, err := newTLSConfig(t.config.TLSConfig.CertFile, t.config.TLSConfig.KeyFile, t.config.TLSConfig.CAFile) - if err != nil { - return fmt.Errorf("error setting up syslog target: %w", err) - } - l = tls.NewListener(l, tlsConfig) - } - - t.listener = l - level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.ListenAddress().String(), "tls", tlsEnabled) - - t.openConnections.Add(1) - go t.acceptConnections() - - return nil -} - -func newTLSConfig(certFile string, keyFile string, caFile string) (*tls.Config, error) { - if certFile == "" || keyFile == "" { - return nil, fmt.Errorf("certificate and key files are required") - } - - certs, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, fmt.Errorf("unable to load server certificate or key: %w", err) - } - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{certs}, - } - - if caFile != "" { - caCert, err := ioutil.ReadFile(caFile) - if err != nil { - return nil, fmt.Errorf("unable to load client CA certificate: %w", err) - } - - caCertPool := x509.NewCertPool() - if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { - return nil, fmt.Errorf("unable to parse client CA certificate") - } - - tlsConfig.ClientCAs = caCertPool - tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - } - - return tlsConfig, nil -} - -func (t *SyslogTarget) acceptConnections() { - defer t.openConnections.Done() - - l := log.With(t.logger, "address", t.listener.Addr().String()) - - backoff := backoff.New(t.ctx, backoff.Config{ - MinBackoff: 5 * time.Millisecond, - MaxBackoff: 1 * time.Second, - }) - - for { - c, err := t.listener.Accept() - if err != nil { - if t.ctx.Err() != nil { - level.Info(l).Log("msg", "syslog server shutting down") - return - } - - if ne, ok := err.(net.Error); ok && ne.Temporary() { - level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries()) - backoff.Wait() - continue - } - - level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err) - return - } - backoff.Reset() - - t.openConnections.Add(1) - go t.handleConnection(c) - } - -} - -func (t *SyslogTarget) handleConnection(cn net.Conn) { - defer t.openConnections.Done() - - c := &idleTimeoutConn{cn, t.idleTimeout()} - - handlerCtx, cancel := context.WithCancel(t.ctx) - defer cancel() - go func() { - <-handlerCtx.Done() - _ = c.Close() - }() - - connLabels := t.connectionLabels(c) - - err := syslogparser.ParseStream(c, func(result *syslog.Result) { - if err := result.Error; err != nil { - t.handleMessageError(err) - return - } - t.handleMessage(connLabels.Copy(), result.Message) - }, t.maxMessageLength()) - - if err != nil { - level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err) + return nil, err } + return t, nil } func (t *SyslogTarget) handleMessageError(err error) { @@ -247,7 +136,7 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil { for id, params := range *rfc5424Msg.StructuredData { - id = strings.Replace(id, "@", "_", -1) + id = strings.ReplaceAll(id, "@", "_") for name, value := range params { key := "__syslog_message_sd_" + id + "_" + name lb.Set(key, value) @@ -295,33 +184,7 @@ func (t *SyslogTarget) messageSender(entries chan<- api.Entry) { } t.metrics.syslogEntries.Inc() } -} - -func (t *SyslogTarget) connectionLabels(c net.Conn) labels.Labels { - lb := labels.NewBuilder(nil) - for k, v := range t.config.Labels { - lb.Set(string(k), string(v)) - } - - ip := ipFromConn(c).String() - lb.Set("__syslog_connection_ip_address", ip) - lb.Set("__syslog_connection_hostname", lookupAddr(ip)) - - return lb.Labels() -} - -func ipFromConn(c net.Conn) net.IP { - switch addr := c.RemoteAddr().(type) { - case *net.TCPAddr: - return addr.IP - } - - return nil -} - -func lookupAddr(addr string) string { - names, _ := net.LookupAddr(addr) - return strings.Join(names, ",") + t.messagesDone <- struct{}{} } // Type returns SyslogTargetType. @@ -331,7 +194,7 @@ func (t *SyslogTarget) Type() target.TargetType { // Ready indicates whether or not the syslog target is ready to be read from. func (t *SyslogTarget) Ready() bool { - return true + return t.transport.Ready() } // DiscoveredLabels returns the set of labels discovered by the syslog target, which @@ -353,48 +216,23 @@ func (t *SyslogTarget) Details() interface{} { // Stop shuts down the SyslogTarget. func (t *SyslogTarget) Stop() error { - t.ctxCancel() - err := t.listener.Close() - t.openConnections.Wait() + err := t.transport.Close() + t.transport.Wait() close(t.messages) + // wait for all pending messages to be processed and sent to handler + <-t.messagesDone t.handler.Stop() return err } // ListenAddress returns the address SyslogTarget is listening on. func (t *SyslogTarget) ListenAddress() net.Addr { - return t.listener.Addr() -} - -func (t *SyslogTarget) idleTimeout() time.Duration { - if t.config.IdleTimeout != 0 { - return t.config.IdleTimeout - } - return defaultIdleTimeout + return t.transport.Addr() } -func (t *SyslogTarget) maxMessageLength() int { - if t.config.MaxMessageLength != 0 { - return t.config.MaxMessageLength +func (t *SyslogTarget) transportProtocol() string { + if t.config.ListenProtocol != "" { + return t.config.ListenProtocol } - return defaultMaxMessageLength -} - -type idleTimeoutConn struct { - net.Conn - idleTimeout time.Duration -} - -func (c *idleTimeoutConn) Write(p []byte) (int, error) { - c.setDeadline() - return c.Conn.Write(p) -} - -func (c *idleTimeoutConn) Read(b []byte) (int, error) { - c.setDeadline() - return c.Conn.Read(b) -} - -func (c *idleTimeoutConn) setDeadline() { - _ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout)) + return defaultProtocol } diff --git a/clients/pkg/promtail/targets/syslog/syslogtarget_test.go b/clients/pkg/promtail/targets/syslog/syslogtarget_test.go index cf72318ed2605..696d3461208b9 100644 --- a/clients/pkg/promtail/targets/syslog/syslogtarget_test.go +++ b/clients/pkg/promtail/targets/syslog/syslogtarget_test.go @@ -13,6 +13,7 @@ import ( "unicode/utf8" "github.com/go-kit/log" + "github.com/influxdata/go-syslog/v3" promconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" @@ -21,6 +22,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/client/fake" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser" ) var ( @@ -231,64 +233,152 @@ o+KrhWQRriAj+GFMIpnT0r28EhOWS/d+f9ISk/it796YtDhfMb9GmV9VI7o= `) ) -func TestSyslogTarget_NewlineSeparatedMessages(t *testing.T) { - testSyslogTarget(t, false) -} - -func TestSyslogTarget_OctetCounting(t *testing.T) { - testSyslogTarget(t, true) -} - -func testSyslogTarget(t *testing.T, octetCounting bool) { - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) - client := fake.New(func() {}) +type formatFunc func(string) string - metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ - ListenAddress: "127.0.0.1:0", - LabelStructuredData: true, - Labels: model.LabelSet{ - "test": "syslog_target", - }, - }) - require.NoError(t, err) - defer func() { - require.NoError(t, tgt.Stop()) - }() - - addr := tgt.ListenAddress().String() - c, err := net.Dial("tcp", addr) - require.NoError(t, err) +var ( + fmtOctetCounting = func(s string) string { return fmt.Sprintf("%d %s", len(s), s) } + fmtNewline = func(s string) string { return s + "\n" } +) - messages := []string{ - `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, +func Benchmark_SyslogTarget(b *testing.B) { + for _, tt := range []struct { + name string + protocol string + formatFunc formatFunc + }{ + {"tcp", protocolTCP, fmtOctetCounting}, + {"udp", protocolUDP, fmtOctetCounting}, + } { + tt := tt + b.Run(tt.name, func(b *testing.B) { + client := fake.New(func() {}) + + metrics := NewMetrics(nil) + tgt, _ := NewSyslogTarget(metrics, log.NewNopLogger(), client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ + ListenAddress: "127.0.0.1:0", + ListenProtocol: tt.protocol, + LabelStructuredData: true, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + }) + b.Cleanup(func() { + require.NoError(b, tgt.Stop()) + }) + require.Eventually(b, tgt.Ready, time.Second, 10*time.Millisecond) + + addr := tgt.ListenAddress().String() + + messages := []string{ + `<165>1 2022-04-08T22:14:10.001Z host1 app - id1 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:11.002Z host2 app - id2 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:12.003Z host1 app - id3 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:13.004Z host2 app - id4 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:14.005Z host1 app - id5 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:15.002Z host2 app - id6 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:16.003Z host1 app - id7 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:17.004Z host2 app - id8 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:18.005Z host1 app - id9 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:19.001Z host2 app - id10 [custom@32473 exkey="1"] An application event log entry...`, + } + + b.ReportAllocs() + b.ResetTimer() + + c, _ := net.Dial(tt.protocol, addr) + for n := 0; n < b.N; n++ { + _ = writeMessagesToStream(c, messages, tt.formatFunc) + } + c.Close() + + require.Eventuallyf(b, func() bool { + return len(client.Received()) == len(messages)*b.N + }, 15*time.Second, time.Second, "expected: %d got:%d", len(messages)*b.N, len(client.Received())) + + }) } +} - err = writeMessagesToStream(c, messages, octetCounting) - require.NoError(t, err) - require.NoError(t, c.Close()) - - require.Eventuallyf(t, func() bool { - return len(client.Received()) == len(messages) - }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) - - require.Equal(t, model.LabelSet{ - "test": "syslog_target", - - "severity": "notice", - "facility": "local4", - "hostname": "host5", - "app_name": "e", - "msg_id": "id1", - - "sd_custom_exkey": "1", - }, client.Received()[0].Labels) - require.Equal(t, "An application event log entry...", client.Received()[0].Line) - - require.NotZero(t, client.Received()[0].Timestamp) +func TestSyslogTarget(t *testing.T) { + for _, tt := range []struct { + name string + protocol string + fmtFunc formatFunc + }{ + {"tpc newline separated", protocolTCP, fmtNewline}, + {"tpc octetcounting", protocolTCP, fmtOctetCounting}, + {"udp newline separated", protocolUDP, fmtNewline}, + {"udp octetcounting", protocolUDP, fmtOctetCounting}, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + client := fake.New(func() {}) + + metrics := NewMetrics(nil) + tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ + MaxMessageLength: 1 << 12, // explicitly not use default value + ListenAddress: "127.0.0.1:0", + ListenProtocol: tt.protocol, + LabelStructuredData: true, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + }) + require.NoError(t, err) + + require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond) + + addr := tgt.ListenAddress().String() + c, err := net.Dial(tt.protocol, addr) + require.NoError(t, err) + + messages := []string{ + `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, + } + + err = writeMessagesToStream(c, messages, tt.fmtFunc) + require.NoError(t, err) + require.NoError(t, c.Close()) + + if tt.protocol == protocolUDP { + time.Sleep(time.Second) + require.NoError(t, tgt.Stop()) + } else { + defer func() { + require.NoError(t, tgt.Stop()) + }() + } + + require.Eventuallyf(t, func() bool { + return len(client.Received()) == len(messages) + }, time.Second, 10*time.Millisecond, "Expected to receive %d messages.", len(messages)) + + labels := make([]model.LabelSet, 0, len(messages)) + for _, entry := range client.Received() { + labels = append(labels, entry.Labels) + } + // we only check if one of the received entries contain the wanted label set + // because UDP does not guarantee the order of the messages + require.Contains(t, labels, model.LabelSet{ + "test": "syslog_target", + + "severity": "notice", + "facility": "local4", + "hostname": "host5", + "app_name": "e", + "msg_id": "id1", + + "sd_custom_exkey": "1", + }) + require.Equal(t, "An application event log entry...", client.Received()[0].Line) + + require.NotZero(t, client.Received()[0].Timestamp) + }) + } } func relabelConfig(t *testing.T) []*relabel.Config { @@ -316,72 +406,73 @@ func relabelConfig(t *testing.T) []*relabel.Config { return relabels } -func writeMessagesToStream(w io.Writer, messages []string, octetCounting bool) error { - var formatter func(string) string - - if octetCounting { - formatter = func(s string) string { - return fmt.Sprintf("%d %s", len(s), s) - } - } else { - formatter = func(s string) string { - return s + "\n" - } - } - +func writeMessagesToStream(w io.Writer, messages []string, formatter formatFunc) error { for _, msg := range messages { _, err := fmt.Fprint(w, formatter(msg)) if err != nil { return err } } - return nil } func TestSyslogTarget_RFC5424Messages(t *testing.T) { - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) - client := fake.New(func() {}) - - metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ - ListenAddress: "127.0.0.1:0", - LabelStructuredData: true, - Labels: model.LabelSet{ - "test": "syslog_target", - }, - UseRFC5424Message: true, - }) - require.NoError(t, err) - defer func() { - require.NoError(t, tgt.Stop()) - }() - - addr := tgt.ListenAddress().String() - c, err := net.Dial("tcp", addr) - require.NoError(t, err) - - messages := []string{ - `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, - } - - err = writeMessagesToStream(c, messages, false) - require.NoError(t, err) - require.NoError(t, c.Close()) - - require.Eventuallyf(t, func() bool { - return len(client.Received()) == len(messages) - }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) - - for i, m := range messages { - require.Equal(t, model.LabelSet{ - "test": "syslog_target", - }, client.Received()[i].Labels) - require.Equal(t, m, client.Received()[i].Line) - require.NotZero(t, client.Received()[i].Timestamp) + for _, tt := range []struct { + name string + protocol string + fmtFunc formatFunc + }{ + {"tpc newline separated", protocolTCP, fmtNewline}, + {"tpc octetcounting", protocolTCP, fmtOctetCounting}, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + client := fake.New(func() {}) + + metrics := NewMetrics(nil) + tgt, err := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ + ListenAddress: "127.0.0.1:0", + ListenProtocol: tt.protocol, + LabelStructuredData: true, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + UseRFC5424Message: true, + }) + require.NoError(t, err) + require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond) + defer func() { + require.NoError(t, tgt.Stop()) + }() + + addr := tgt.ListenAddress().String() + c, err := net.Dial(tt.protocol, addr) + require.NoError(t, err) + + messages := []string{ + `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, + } + + err = writeMessagesToStream(c, messages, tt.fmtFunc) + require.NoError(t, err) + require.NoError(t, c.Close()) + + require.Eventuallyf(t, func() bool { + return len(client.Received()) == len(messages) + }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) + + for i := range messages { + require.Equal(t, model.LabelSet{ + "test": "syslog_target", + }, client.Received()[i].Labels) + require.Contains(t, messages, client.Received()[i].Line) + require.NotZero(t, client.Received()[i].Timestamp) + } + }) } } @@ -417,14 +508,14 @@ func TestSyslogTarget_TLSConfigWithoutServerKey(t *testing.T) { func TestSyslogTarget_TLSConfig(t *testing.T) { t.Run("NewlineSeparatedMessages", func(t *testing.T) { - testSyslogTargetWithTLS(t, false) + testSyslogTargetWithTLS(t, fmtNewline) }) t.Run("OctetCounting", func(t *testing.T) { - testSyslogTargetWithTLS(t, true) + testSyslogTargetWithTLS(t, fmtOctetCounting) }) } -func testSyslogTargetWithTLS(t *testing.T, octetCounting bool) { +func testSyslogTargetWithTLS(t *testing.T, fmtFunc formatFunc) { caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -483,7 +574,7 @@ func testSyslogTargetWithTLS(t *testing.T, octetCounting bool) { } messages := append(malformeddMessages, validMessages...) - err = writeMessagesToStream(c, messages, octetCounting) + err = writeMessagesToStream(c, messages, fmtFunc) require.NoError(t, err) require.NoError(t, c.Close()) @@ -526,14 +617,14 @@ func createTempFile(data []byte) (*os.File, error) { func TestSyslogTarget_TLSConfigVerifyClientCertificate(t *testing.T) { t.Run("NewlineSeparatedMessages", func(t *testing.T) { - testSyslogTargetWithTLSVerifyClientCertificate(t, false) + testSyslogTargetWithTLSVerifyClientCertificate(t, fmtNewline) }) t.Run("OctetCounting", func(t *testing.T) { - testSyslogTargetWithTLSVerifyClientCertificate(t, true) + testSyslogTargetWithTLSVerifyClientCertificate(t, fmtOctetCounting) }) } -func testSyslogTargetWithTLSVerifyClientCertificate(t *testing.T, octetCounting bool) { +func testSyslogTargetWithTLSVerifyClientCertificate(t *testing.T, fmtFunc formatFunc) { caCertFile, err := createTempFile(caCert) if err != nil { t.Fatalf("Unable to create CA certificate temporary file: %s", err) @@ -624,7 +715,7 @@ func testSyslogTargetWithTLSVerifyClientCertificate(t *testing.T, octetCounting `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, } - err = writeMessagesToStream(c, messages, octetCounting) + err = writeMessagesToStream(c, messages, fmtFunc) require.NoError(t, err) require.NoError(t, c.Close()) @@ -706,7 +797,7 @@ func TestSyslogTarget_NonUTF8Message(t *testing.T) { err = writeMessagesToStream(c, []string{ "<165>1 - - - - - - " + msg1, "<123>1 - - - - - - " + msg2, - }, true) + }, fmtOctetCounting) require.NoError(t, err) require.NoError(t, c.Close()) @@ -747,3 +838,29 @@ func TestSyslogTarget_IdleTimeout(t *testing.T) { _, err = c.Read(buf) require.EqualError(t, err, "EOF") } + +func TestParseStream_WithAsyncPipe(t *testing.T) { + lines := [3]string{ + "<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey=\"1\"] An application event log entry...\n", + "<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey=\"2\"] An application event log entry...\n", + "<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey=\"3\"] An application event log entry...\n", + } + + addr := &net.UDPAddr{IP: net.IP{127, 0, 0, 1}, Port: 1514} + pipe := NewConnPipe(addr) + go func() { + for _, line := range lines { + _, _ = pipe.Write([]byte(line)) + } + pipe.Close() + }() + + results := make([]*syslog.Result, 0) + cb := func(res *syslog.Result) { + results = append(results, res) + } + + err := syslogparser.ParseStream(pipe, cb, defaultMaxMessageLength) + require.NoError(t, err) + require.Equal(t, 3, len(results)) +} diff --git a/clients/pkg/promtail/targets/syslog/transport.go b/clients/pkg/promtail/targets/syslog/transport.go new file mode 100644 index 0000000000000..0d88d5a784ea1 --- /dev/null +++ b/clients/pkg/promtail/targets/syslog/transport.go @@ -0,0 +1,406 @@ +package syslog + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "io/ioutil" + "net" + "strings" + "sync" + "time" + + "github.com/grafana/dskit/backoff" + "github.com/mwitkow/go-conntrack" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser" + "github.com/influxdata/go-syslog/v3" + "github.com/prometheus/prometheus/model/labels" +) + +var ( + protocolUDP = "udp" + protocolTCP = "tcp" +) + +type Transport interface { + Run() error + Addr() net.Addr + Ready() bool + Close() error + Wait() +} + +type handleMessage func(labels.Labels, syslog.Message) +type handleMessageError func(error) + +type baseTransport struct { + config *scrapeconfig.SyslogTargetConfig + logger log.Logger + + openConnections *sync.WaitGroup + + handleMessage handleMessage + handleMessageError handleMessageError + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (t *baseTransport) close() { + t.ctxCancel() +} + +// Ready implements SyslogTransport +func (t *baseTransport) Ready() bool { + return t.ctx.Err() == nil +} + +func (t *baseTransport) idleTimeout() time.Duration { + if t.config.IdleTimeout != 0 { + return t.config.IdleTimeout + } + return defaultIdleTimeout +} + +func (t *baseTransport) maxMessageLength() int { + if t.config.MaxMessageLength != 0 { + return t.config.MaxMessageLength + } + return defaultMaxMessageLength +} + +func (t *baseTransport) connectionLabels(ip string) labels.Labels { + lb := labels.NewBuilder(nil) + for k, v := range t.config.Labels { + lb.Set(string(k), string(v)) + } + + lb.Set("__syslog_connection_ip_address", ip) + lb.Set("__syslog_connection_hostname", lookupAddr(ip)) + + return lb.Labels() +} + +func ipFromConn(c net.Conn) net.IP { + switch addr := c.RemoteAddr().(type) { + case *net.TCPAddr: + return addr.IP + } + + return nil +} + +func lookupAddr(addr string) string { + names, _ := net.LookupAddr(addr) + return strings.Join(names, ",") +} + +func newBaseTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) *baseTransport { + ctx, cancel := context.WithCancel(context.Background()) + return &baseTransport{ + config: config, + logger: logger, + openConnections: new(sync.WaitGroup), + handleMessage: handleMessage, + handleMessageError: handleError, + ctx: ctx, + ctxCancel: cancel, + } +} + +type idleTimeoutConn struct { + net.Conn + idleTimeout time.Duration +} + +func (c *idleTimeoutConn) Write(p []byte) (int, error) { + c.setDeadline() + return c.Conn.Write(p) +} + +func (c *idleTimeoutConn) Read(b []byte) (int, error) { + c.setDeadline() + return c.Conn.Read(b) +} + +func (c *idleTimeoutConn) setDeadline() { + _ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout)) +} + +type ConnPipe struct { + addr net.Addr + *io.PipeReader + *io.PipeWriter +} + +func NewConnPipe(addr net.Addr) *ConnPipe { + pr, pw := io.Pipe() + return &ConnPipe{ + addr: addr, + PipeReader: pr, + PipeWriter: pw, + } +} + +func (pipe *ConnPipe) Close() error { + if err := pipe.PipeWriter.Close(); err != nil { + return err + } + return nil +} + +type TCPTransport struct { + *baseTransport + listener net.Listener +} + +func NewSyslogTCPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport { + return &TCPTransport{ + baseTransport: newBaseTransport(config, handleMessage, handleError, logger), + } +} + +// Run implements SyslogTransport +func (t *TCPTransport) Run() error { + l, err := net.Listen(protocolTCP, t.config.ListenAddress) + l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress)) + if err != nil { + return fmt.Errorf("error setting up syslog target: %w", err) + } + + tlsEnabled := t.config.TLSConfig.CertFile != "" || t.config.TLSConfig.KeyFile != "" || t.config.TLSConfig.CAFile != "" + if tlsEnabled { + tlsConfig, err := newTLSConfig(t.config.TLSConfig.CertFile, t.config.TLSConfig.KeyFile, t.config.TLSConfig.CAFile) + if err != nil { + return fmt.Errorf("error setting up syslog target: %w", err) + } + l = tls.NewListener(l, tlsConfig) + } + + t.listener = l + level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolTCP, "tls", tlsEnabled) + + t.openConnections.Add(1) + go t.acceptConnections() + + return nil +} + +func newTLSConfig(certFile string, keyFile string, caFile string) (*tls.Config, error) { + if certFile == "" || keyFile == "" { + return nil, fmt.Errorf("certificate and key files are required") + } + + certs, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, fmt.Errorf("unable to load server certificate or key: %w", err) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{certs}, + } + + if caFile != "" { + caCert, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, fmt.Errorf("unable to load client CA certificate: %w", err) + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("unable to parse client CA certificate") + } + + tlsConfig.ClientCAs = caCertPool + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + + return tlsConfig, nil +} + +func (t *TCPTransport) acceptConnections() { + defer t.openConnections.Done() + + l := log.With(t.logger, "address", t.listener.Addr().String()) + + backoff := backoff.New(t.ctx, backoff.Config{ + MinBackoff: 5 * time.Millisecond, + MaxBackoff: 1 * time.Second, + }) + + for { + c, err := t.listener.Accept() + if err != nil { + if !t.Ready() { + level.Info(l).Log("msg", "syslog server shutting down", "protocol", protocolTCP, "err", t.ctx.Err()) + return + } + + if ne, ok := err.(net.Error); ok && ne.Temporary() { + level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries()) + backoff.Wait() + continue + } + + level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err) + return + } + backoff.Reset() + + t.openConnections.Add(1) + go t.handleConnection(c) + } + +} + +func (t *TCPTransport) handleConnection(cn net.Conn) { + defer t.openConnections.Done() + + c := &idleTimeoutConn{cn, t.idleTimeout()} + + handlerCtx, cancel := context.WithCancel(t.ctx) + defer cancel() + go func() { + <-handlerCtx.Done() + _ = c.Close() + }() + + lbs := t.connectionLabels(ipFromConn(c).String()) + + err := syslogparser.ParseStream(c, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + return + } + t.handleMessage(lbs.Copy(), result.Message) + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err) + } +} + +// Close implements SyslogTransport +func (t *TCPTransport) Close() error { + t.baseTransport.close() + return t.listener.Close() +} + +// Wait implements SyslogTransport +func (t *TCPTransport) Wait() { + t.openConnections.Wait() +} + +// Addr implements SyslogTransport +func (t *TCPTransport) Addr() net.Addr { + return t.listener.Addr() +} + +type UDPTransport struct { + *baseTransport + udpConn *net.UDPConn +} + +func NewSyslogUDPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport { + return &UDPTransport{ + baseTransport: newBaseTransport(config, handleMessage, handleError, logger), + } +} + +// Run implements SyslogTransport +func (t *UDPTransport) Run() error { + var err error + addr, err := net.ResolveUDPAddr(protocolUDP, t.config.ListenAddress) + if err != nil { + return fmt.Errorf("error resolving UDP address: %w", err) + } + t.udpConn, err = net.ListenUDP(protocolUDP, addr) + if err != nil { + return fmt.Errorf("error setting up syslog target: %w", err) + } + _ = t.udpConn.SetReadBuffer(1024 * 1024) + level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolUDP) + + t.openConnections.Add(1) + go t.acceptPackets() + return nil +} + +// Close implements SyslogTransport +func (t *UDPTransport) Close() error { + t.baseTransport.close() + return t.udpConn.Close() +} + +func (t *UDPTransport) acceptPackets() { + defer t.openConnections.Done() + + var ( + n int + addr net.Addr + err error + ) + streams := make(map[string]*ConnPipe) + buf := make([]byte, t.maxMessageLength()) + + for { + if !t.Ready() { + level.Info(t.logger).Log("msg", "syslog server shutting down", "protocol", protocolUDP, "err", t.ctx.Err()) + for _, stream := range streams { + if err = stream.Close(); err != nil { + level.Error(t.logger).Log("msg", "failed to close pipe", "err", err) + } + } + return + } + n, addr, err = t.udpConn.ReadFrom(buf) + if n <= 0 && err != nil { + level.Warn(t.logger).Log("msg", "failed to read packets", "addr", addr, "err", err) + continue + } + + stream, ok := streams[addr.String()] + if !ok { + stream = NewConnPipe(addr) + streams[addr.String()] = stream + t.openConnections.Add(1) + go t.handleRcv(stream) + } + if _, err := stream.Write(buf[:n]); err != nil { + level.Warn(t.logger).Log("msg", "failed to write to stream", "addr", addr, "err", err) + } + } +} + +func (t *UDPTransport) handleRcv(c *ConnPipe) { + defer t.openConnections.Done() + + lbs := t.connectionLabels(c.addr.String()) + err := syslogparser.ParseStream(c, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), result.Message) + } + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + } +} + +// Wait implements SyslogTransport +func (t *UDPTransport) Wait() { + t.openConnections.Wait() +} + +// Addr implements SyslogTransport +func (t *UDPTransport) Addr() net.Addr { + return t.udpConn.LocalAddr() +} diff --git a/docs/sources/clients/promtail/scraping.md b/docs/sources/clients/promtail/scraping.md index 6be07a852de70..c530a57869b3d 100644 --- a/docs/sources/clients/promtail/scraping.md +++ b/docs/sources/clients/promtail/scraping.md @@ -226,7 +226,7 @@ When Promtail receives GCP logs, various internal labels are made available for ## Syslog Receiver Promtail supports receiving [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424) -messages from a tcp stream. Receiving syslog messages is defined in a `syslog` +messages from a TCP or UDP stream. Receiving syslog messages is defined in a `syslog` stanza: ```yaml @@ -234,6 +234,7 @@ scrape_configs: - job_name: syslog syslog: listen_address: 0.0.0.0:1514 + listen_protocol: tcp idle_timeout: 60s label_structured_data: yes labels: @@ -244,8 +245,11 @@ scrape_configs: ``` The only required field in the syslog section is the `listen_address` field, -where a valid network address should be provided. The `idle_timeout` can help -with cleaning up stale syslog connections. If `label_structured_data` is set, +where a valid network address must be provided. The default protocol for +receiving messages is TCP. To change the protocol, the `listen_protocol` field +can be changed to `udp`. Note, that UDP does not support TLS. +The `idle_timeout` can help with cleaning up stale syslog connections. +If `label_structured_data` is set, [structured data](https://tools.ietf.org/html/rfc5424#section-6.3) in the syslog header will be translated to internal labels in the form of `__syslog_message_sd__`. @@ -275,10 +279,17 @@ destination d_loki { ### Rsyslog Output Configuration +For sending messages via TCP: + ``` -action(type="omfwd" protocol="tcp" port="" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted") +*.* action(type="omfwd" protocol="tcp" target="" port="" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted" KeepAlive="on") ``` +For sending messages via UDP: + +``` +*.* action(type="omfwd" protocol="udp" target="" port="" Template="RSYSLOG_SyslogProtocol23Format") +``` ## Kafka