From 9e65982e660b7c9908f1c0bc248fca57b30bca45 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 4 Apr 2022 11:32:59 +0200 Subject: [PATCH] Add UDP protocol support to Promtail's syslog target Until now, the syslog target in Promtail did only support TCP. This PR adds support for sending syslog messages to Promtail (via ng-syslog or rsyslog) via UDP. UPD for syslogs can be used if you prefer speed over guaranteed delivery. Signed-off-by: Christian Haudum --- CHANGELOG.md | 1 + .../pkg/promtail/scrapeconfig/scrapeconfig.go | 4 + .../syslog/syslogparser/syslogparser.go | 8 +- .../promtail/targets/syslog/syslogtarget.go | 240 ++--------- .../targets/syslog/syslogtarget_test.go | 355 ++++++++++----- .../pkg/promtail/targets/syslog/transport.go | 406 ++++++++++++++++++ docs/sources/clients/promtail/scraping.md | 19 +- 7 files changed, 705 insertions(+), 328 deletions(-) create mode 100644 clients/pkg/promtail/targets/syslog/transport.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 892819b08023..ed4d4208fffb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5790](https://github.com/grafana/loki/pull/5790) **chaudum**: Add UDP support for Promtail's syslog target. * [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters. * [5943](https://github.com/grafana/loki/pull/5943) **tpaschalis**: Add support for exclusion patterns in Promtail's static_config * [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode. diff --git a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go index f05a4cacbcef..769755c6c662 100644 --- a/clients/pkg/promtail/scrapeconfig/scrapeconfig.go +++ b/clients/pkg/promtail/scrapeconfig/scrapeconfig.go @@ -165,6 +165,10 @@ type SyslogTargetConfig struct { // ListenAddress is the address to listen on for syslog messages. ListenAddress string `yaml:"listen_address"` + // ListenProtocol is the protocol used to listen for syslog messages. + // Must be either `tcp` (default) or `udp` + ListenProtocol string `yaml:"listen_protocol"` + // IdleTimeout is the idle timeout for tcp connections. IdleTimeout time.Duration `yaml:"idle_timeout"` diff --git a/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go b/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go index adfc44d2d525..dfc76bb3538a 100644 --- a/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go +++ b/clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go @@ -15,20 +15,20 @@ import ( // detects octet counting. // The function returns on EOF or unrecoverable errors. func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error { - buf := bufio.NewReader(r) + buf := bufio.NewReaderSize(r, 1<<10) - firstByte, err := buf.Peek(1) + b, err := buf.ReadByte() if err != nil { return err } + _ = buf.UnreadByte() - b := firstByte[0] if b == '<' { nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } else if b >= '0' && b <= '9' { octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf) } else { - return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte) + return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b)) } return nil diff --git a/clients/pkg/promtail/targets/syslog/syslogtarget.go b/clients/pkg/promtail/targets/syslog/syslogtarget.go index c2ff44da00f8..57d2efd991b7 100644 --- a/clients/pkg/promtail/targets/syslog/syslogtarget.go +++ b/clients/pkg/promtail/targets/syslog/syslogtarget.go @@ -1,30 +1,22 @@ package syslog import ( - "context" - "crypto/tls" - "crypto/x509" "errors" "fmt" - "io/ioutil" "net" "strings" - "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/backoff" "github.com/influxdata/go-syslog/v3" "github.com/influxdata/go-syslog/v3/rfc5424" - "github.com/mwitkow/go-conntrack" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" - "github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser" "github.com/grafana/loki/clients/pkg/promtail/targets/target" "github.com/grafana/loki/pkg/logproto" @@ -33,6 +25,7 @@ import ( var ( defaultIdleTimeout = 120 * time.Second defaultMaxMessageLength = 8192 + defaultProtocol = protocolTCP ) // SyslogTarget listens to syslog messages. @@ -44,12 +37,10 @@ type SyslogTarget struct { config *scrapeconfig.SyslogTargetConfig relabelConfig []*relabel.Config - listener net.Listener - messages chan message + transport Transport - ctx context.Context - ctxCancel context.CancelFunc - openConnections *sync.WaitGroup + messages chan message + messagesDone chan struct{} } type message struct { @@ -67,144 +58,42 @@ func NewSyslogTarget( config *scrapeconfig.SyslogTargetConfig, ) (*SyslogTarget, error) { - ctx, cancel := context.WithCancel(context.Background()) - t := &SyslogTarget{ metrics: metrics, logger: logger, handler: handler, config: config, relabelConfig: relabel, - - ctx: ctx, - ctxCancel: cancel, - openConnections: new(sync.WaitGroup), + messagesDone: make(chan struct{}), + } + + switch t.transportProtocol() { + case protocolTCP: + t.transport = NewSyslogTCPTransport( + config, + t.handleMessage, + t.handleMessageError, + logger, + ) + case protocolUDP: + t.transport = NewSyslogUDPTransport( + config, + t.handleMessage, + t.handleMessageError, + logger, + ) + default: + return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol()) } t.messages = make(chan message) go t.messageSender(handler.Chan()) - err := t.run() - return t, err -} - -func (t *SyslogTarget) run() error { - l, err := net.Listen("tcp", t.config.ListenAddress) - l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress)) + err := t.transport.Run() if err != nil { - return fmt.Errorf("error setting up syslog target: %w", err) - } - - tlsEnabled := t.config.TLSConfig.CertFile != "" || t.config.TLSConfig.KeyFile != "" || t.config.TLSConfig.CAFile != "" - if tlsEnabled { - tlsConfig, err := newTLSConfig(t.config.TLSConfig.CertFile, t.config.TLSConfig.KeyFile, t.config.TLSConfig.CAFile) - if err != nil { - return fmt.Errorf("error setting up syslog target: %w", err) - } - l = tls.NewListener(l, tlsConfig) - } - - t.listener = l - level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.ListenAddress().String(), "tls", tlsEnabled) - - t.openConnections.Add(1) - go t.acceptConnections() - - return nil -} - -func newTLSConfig(certFile string, keyFile string, caFile string) (*tls.Config, error) { - if certFile == "" || keyFile == "" { - return nil, fmt.Errorf("certificate and key files are required") - } - - certs, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, fmt.Errorf("unable to load server certificate or key: %w", err) - } - - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{certs}, - } - - if caFile != "" { - caCert, err := ioutil.ReadFile(caFile) - if err != nil { - return nil, fmt.Errorf("unable to load client CA certificate: %w", err) - } - - caCertPool := x509.NewCertPool() - if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { - return nil, fmt.Errorf("unable to parse client CA certificate") - } - - tlsConfig.ClientCAs = caCertPool - tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - } - - return tlsConfig, nil -} - -func (t *SyslogTarget) acceptConnections() { - defer t.openConnections.Done() - - l := log.With(t.logger, "address", t.listener.Addr().String()) - - backoff := backoff.New(t.ctx, backoff.Config{ - MinBackoff: 5 * time.Millisecond, - MaxBackoff: 1 * time.Second, - }) - - for { - c, err := t.listener.Accept() - if err != nil { - if t.ctx.Err() != nil { - level.Info(l).Log("msg", "syslog server shutting down") - return - } - - if ne, ok := err.(net.Error); ok && ne.Temporary() { - level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries()) - backoff.Wait() - continue - } - - level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err) - return - } - backoff.Reset() - - t.openConnections.Add(1) - go t.handleConnection(c) - } - -} - -func (t *SyslogTarget) handleConnection(cn net.Conn) { - defer t.openConnections.Done() - - c := &idleTimeoutConn{cn, t.idleTimeout()} - - handlerCtx, cancel := context.WithCancel(t.ctx) - defer cancel() - go func() { - <-handlerCtx.Done() - _ = c.Close() - }() - - connLabels := t.connectionLabels(c) - - err := syslogparser.ParseStream(c, func(result *syslog.Result) { - if err := result.Error; err != nil { - t.handleMessageError(err) - return - } - t.handleMessage(connLabels.Copy(), result.Message) - }, t.maxMessageLength()) - - if err != nil { - level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err) + return nil, err } + return t, nil } func (t *SyslogTarget) handleMessageError(err error) { @@ -247,7 +136,7 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil { for id, params := range *rfc5424Msg.StructuredData { - id = strings.Replace(id, "@", "_", -1) + id = strings.ReplaceAll(id, "@", "_") for name, value := range params { key := "__syslog_message_sd_" + id + "_" + name lb.Set(key, value) @@ -295,33 +184,7 @@ func (t *SyslogTarget) messageSender(entries chan<- api.Entry) { } t.metrics.syslogEntries.Inc() } -} - -func (t *SyslogTarget) connectionLabels(c net.Conn) labels.Labels { - lb := labels.NewBuilder(nil) - for k, v := range t.config.Labels { - lb.Set(string(k), string(v)) - } - - ip := ipFromConn(c).String() - lb.Set("__syslog_connection_ip_address", ip) - lb.Set("__syslog_connection_hostname", lookupAddr(ip)) - - return lb.Labels() -} - -func ipFromConn(c net.Conn) net.IP { - switch addr := c.RemoteAddr().(type) { - case *net.TCPAddr: - return addr.IP - } - - return nil -} - -func lookupAddr(addr string) string { - names, _ := net.LookupAddr(addr) - return strings.Join(names, ",") + t.messagesDone <- struct{}{} } // Type returns SyslogTargetType. @@ -331,7 +194,7 @@ func (t *SyslogTarget) Type() target.TargetType { // Ready indicates whether or not the syslog target is ready to be read from. func (t *SyslogTarget) Ready() bool { - return true + return t.transport.Ready() } // DiscoveredLabels returns the set of labels discovered by the syslog target, which @@ -353,48 +216,23 @@ func (t *SyslogTarget) Details() interface{} { // Stop shuts down the SyslogTarget. func (t *SyslogTarget) Stop() error { - t.ctxCancel() - err := t.listener.Close() - t.openConnections.Wait() + err := t.transport.Close() + t.transport.Wait() close(t.messages) + // wait for all pending messages to be processed and sent to handler + <-t.messagesDone t.handler.Stop() return err } // ListenAddress returns the address SyslogTarget is listening on. func (t *SyslogTarget) ListenAddress() net.Addr { - return t.listener.Addr() -} - -func (t *SyslogTarget) idleTimeout() time.Duration { - if t.config.IdleTimeout != 0 { - return t.config.IdleTimeout - } - return defaultIdleTimeout + return t.transport.Addr() } -func (t *SyslogTarget) maxMessageLength() int { - if t.config.MaxMessageLength != 0 { - return t.config.MaxMessageLength +func (t *SyslogTarget) transportProtocol() string { + if t.config.ListenProtocol != "" { + return t.config.ListenProtocol } - return defaultMaxMessageLength -} - -type idleTimeoutConn struct { - net.Conn - idleTimeout time.Duration -} - -func (c *idleTimeoutConn) Write(p []byte) (int, error) { - c.setDeadline() - return c.Conn.Write(p) -} - -func (c *idleTimeoutConn) Read(b []byte) (int, error) { - c.setDeadline() - return c.Conn.Read(b) -} - -func (c *idleTimeoutConn) setDeadline() { - _ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout)) + return defaultProtocol } diff --git a/clients/pkg/promtail/targets/syslog/syslogtarget_test.go b/clients/pkg/promtail/targets/syslog/syslogtarget_test.go index cf72318ed260..696d3461208b 100644 --- a/clients/pkg/promtail/targets/syslog/syslogtarget_test.go +++ b/clients/pkg/promtail/targets/syslog/syslogtarget_test.go @@ -13,6 +13,7 @@ import ( "unicode/utf8" "github.com/go-kit/log" + "github.com/influxdata/go-syslog/v3" promconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" @@ -21,6 +22,7 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/client/fake" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser" ) var ( @@ -231,64 +233,152 @@ o+KrhWQRriAj+GFMIpnT0r28EhOWS/d+f9ISk/it796YtDhfMb9GmV9VI7o= `) ) -func TestSyslogTarget_NewlineSeparatedMessages(t *testing.T) { - testSyslogTarget(t, false) -} - -func TestSyslogTarget_OctetCounting(t *testing.T) { - testSyslogTarget(t, true) -} - -func testSyslogTarget(t *testing.T, octetCounting bool) { - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) - client := fake.New(func() {}) +type formatFunc func(string) string - metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ - ListenAddress: "127.0.0.1:0", - LabelStructuredData: true, - Labels: model.LabelSet{ - "test": "syslog_target", - }, - }) - require.NoError(t, err) - defer func() { - require.NoError(t, tgt.Stop()) - }() - - addr := tgt.ListenAddress().String() - c, err := net.Dial("tcp", addr) - require.NoError(t, err) +var ( + fmtOctetCounting = func(s string) string { return fmt.Sprintf("%d %s", len(s), s) } + fmtNewline = func(s string) string { return s + "\n" } +) - messages := []string{ - `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, +func Benchmark_SyslogTarget(b *testing.B) { + for _, tt := range []struct { + name string + protocol string + formatFunc formatFunc + }{ + {"tcp", protocolTCP, fmtOctetCounting}, + {"udp", protocolUDP, fmtOctetCounting}, + } { + tt := tt + b.Run(tt.name, func(b *testing.B) { + client := fake.New(func() {}) + + metrics := NewMetrics(nil) + tgt, _ := NewSyslogTarget(metrics, log.NewNopLogger(), client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ + ListenAddress: "127.0.0.1:0", + ListenProtocol: tt.protocol, + LabelStructuredData: true, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + }) + b.Cleanup(func() { + require.NoError(b, tgt.Stop()) + }) + require.Eventually(b, tgt.Ready, time.Second, 10*time.Millisecond) + + addr := tgt.ListenAddress().String() + + messages := []string{ + `<165>1 2022-04-08T22:14:10.001Z host1 app - id1 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:11.002Z host2 app - id2 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:12.003Z host1 app - id3 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:13.004Z host2 app - id4 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:14.005Z host1 app - id5 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:15.002Z host2 app - id6 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:16.003Z host1 app - id7 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:17.004Z host2 app - id8 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:18.005Z host1 app - id9 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2022-04-08T22:14:19.001Z host2 app - id10 [custom@32473 exkey="1"] An application event log entry...`, + } + + b.ReportAllocs() + b.ResetTimer() + + c, _ := net.Dial(tt.protocol, addr) + for n := 0; n < b.N; n++ { + _ = writeMessagesToStream(c, messages, tt.formatFunc) + } + c.Close() + + require.Eventuallyf(b, func() bool { + return len(client.Received()) == len(messages)*b.N + }, 15*time.Second, time.Second, "expected: %d got:%d", len(messages)*b.N, len(client.Received())) + + }) } +} - err = writeMessagesToStream(c, messages, octetCounting) - require.NoError(t, err) - require.NoError(t, c.Close()) - - require.Eventuallyf(t, func() bool { - return len(client.Received()) == len(messages) - }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) - - require.Equal(t, model.LabelSet{ - "test": "syslog_target", - - "severity": "notice", - "facility": "local4", - "hostname": "host5", - "app_name": "e", - "msg_id": "id1", - - "sd_custom_exkey": "1", - }, client.Received()[0].Labels) - require.Equal(t, "An application event log entry...", client.Received()[0].Line) - - require.NotZero(t, client.Received()[0].Timestamp) +func TestSyslogTarget(t *testing.T) { + for _, tt := range []struct { + name string + protocol string + fmtFunc formatFunc + }{ + {"tpc newline separated", protocolTCP, fmtNewline}, + {"tpc octetcounting", protocolTCP, fmtOctetCounting}, + {"udp newline separated", protocolUDP, fmtNewline}, + {"udp octetcounting", protocolUDP, fmtOctetCounting}, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + client := fake.New(func() {}) + + metrics := NewMetrics(nil) + tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ + MaxMessageLength: 1 << 12, // explicitly not use default value + ListenAddress: "127.0.0.1:0", + ListenProtocol: tt.protocol, + LabelStructuredData: true, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + }) + require.NoError(t, err) + + require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond) + + addr := tgt.ListenAddress().String() + c, err := net.Dial(tt.protocol, addr) + require.NoError(t, err) + + messages := []string{ + `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, + } + + err = writeMessagesToStream(c, messages, tt.fmtFunc) + require.NoError(t, err) + require.NoError(t, c.Close()) + + if tt.protocol == protocolUDP { + time.Sleep(time.Second) + require.NoError(t, tgt.Stop()) + } else { + defer func() { + require.NoError(t, tgt.Stop()) + }() + } + + require.Eventuallyf(t, func() bool { + return len(client.Received()) == len(messages) + }, time.Second, 10*time.Millisecond, "Expected to receive %d messages.", len(messages)) + + labels := make([]model.LabelSet, 0, len(messages)) + for _, entry := range client.Received() { + labels = append(labels, entry.Labels) + } + // we only check if one of the received entries contain the wanted label set + // because UDP does not guarantee the order of the messages + require.Contains(t, labels, model.LabelSet{ + "test": "syslog_target", + + "severity": "notice", + "facility": "local4", + "hostname": "host5", + "app_name": "e", + "msg_id": "id1", + + "sd_custom_exkey": "1", + }) + require.Equal(t, "An application event log entry...", client.Received()[0].Line) + + require.NotZero(t, client.Received()[0].Timestamp) + }) + } } func relabelConfig(t *testing.T) []*relabel.Config { @@ -316,72 +406,73 @@ func relabelConfig(t *testing.T) []*relabel.Config { return relabels } -func writeMessagesToStream(w io.Writer, messages []string, octetCounting bool) error { - var formatter func(string) string - - if octetCounting { - formatter = func(s string) string { - return fmt.Sprintf("%d %s", len(s), s) - } - } else { - formatter = func(s string) string { - return s + "\n" - } - } - +func writeMessagesToStream(w io.Writer, messages []string, formatter formatFunc) error { for _, msg := range messages { _, err := fmt.Fprint(w, formatter(msg)) if err != nil { return err } } - return nil } func TestSyslogTarget_RFC5424Messages(t *testing.T) { - w := log.NewSyncWriter(os.Stderr) - logger := log.NewLogfmtLogger(w) - client := fake.New(func() {}) - - metrics := NewMetrics(nil) - tgt, err := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ - ListenAddress: "127.0.0.1:0", - LabelStructuredData: true, - Labels: model.LabelSet{ - "test": "syslog_target", - }, - UseRFC5424Message: true, - }) - require.NoError(t, err) - defer func() { - require.NoError(t, tgt.Stop()) - }() - - addr := tgt.ListenAddress().String() - c, err := net.Dial("tcp", addr) - require.NoError(t, err) - - messages := []string{ - `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, - `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, - } - - err = writeMessagesToStream(c, messages, false) - require.NoError(t, err) - require.NoError(t, c.Close()) - - require.Eventuallyf(t, func() bool { - return len(client.Received()) == len(messages) - }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) - - for i, m := range messages { - require.Equal(t, model.LabelSet{ - "test": "syslog_target", - }, client.Received()[i].Labels) - require.Equal(t, m, client.Received()[i].Line) - require.NotZero(t, client.Received()[i].Timestamp) + for _, tt := range []struct { + name string + protocol string + fmtFunc formatFunc + }{ + {"tpc newline separated", protocolTCP, fmtNewline}, + {"tpc octetcounting", protocolTCP, fmtOctetCounting}, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + client := fake.New(func() {}) + + metrics := NewMetrics(nil) + tgt, err := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{ + ListenAddress: "127.0.0.1:0", + ListenProtocol: tt.protocol, + LabelStructuredData: true, + Labels: model.LabelSet{ + "test": "syslog_target", + }, + UseRFC5424Message: true, + }) + require.NoError(t, err) + require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond) + defer func() { + require.NoError(t, tgt.Stop()) + }() + + addr := tgt.ListenAddress().String() + c, err := net.Dial(tt.protocol, addr) + require.NoError(t, err) + + messages := []string{ + `<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`, + `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, + } + + err = writeMessagesToStream(c, messages, tt.fmtFunc) + require.NoError(t, err) + require.NoError(t, c.Close()) + + require.Eventuallyf(t, func() bool { + return len(client.Received()) == len(messages) + }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) + + for i := range messages { + require.Equal(t, model.LabelSet{ + "test": "syslog_target", + }, client.Received()[i].Labels) + require.Contains(t, messages, client.Received()[i].Line) + require.NotZero(t, client.Received()[i].Timestamp) + } + }) } } @@ -417,14 +508,14 @@ func TestSyslogTarget_TLSConfigWithoutServerKey(t *testing.T) { func TestSyslogTarget_TLSConfig(t *testing.T) { t.Run("NewlineSeparatedMessages", func(t *testing.T) { - testSyslogTargetWithTLS(t, false) + testSyslogTargetWithTLS(t, fmtNewline) }) t.Run("OctetCounting", func(t *testing.T) { - testSyslogTargetWithTLS(t, true) + testSyslogTargetWithTLS(t, fmtOctetCounting) }) } -func testSyslogTargetWithTLS(t *testing.T, octetCounting bool) { +func testSyslogTargetWithTLS(t *testing.T, fmtFunc formatFunc) { caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -483,7 +574,7 @@ func testSyslogTargetWithTLS(t *testing.T, octetCounting bool) { } messages := append(malformeddMessages, validMessages...) - err = writeMessagesToStream(c, messages, octetCounting) + err = writeMessagesToStream(c, messages, fmtFunc) require.NoError(t, err) require.NoError(t, c.Close()) @@ -526,14 +617,14 @@ func createTempFile(data []byte) (*os.File, error) { func TestSyslogTarget_TLSConfigVerifyClientCertificate(t *testing.T) { t.Run("NewlineSeparatedMessages", func(t *testing.T) { - testSyslogTargetWithTLSVerifyClientCertificate(t, false) + testSyslogTargetWithTLSVerifyClientCertificate(t, fmtNewline) }) t.Run("OctetCounting", func(t *testing.T) { - testSyslogTargetWithTLSVerifyClientCertificate(t, true) + testSyslogTargetWithTLSVerifyClientCertificate(t, fmtOctetCounting) }) } -func testSyslogTargetWithTLSVerifyClientCertificate(t *testing.T, octetCounting bool) { +func testSyslogTargetWithTLSVerifyClientCertificate(t *testing.T, fmtFunc formatFunc) { caCertFile, err := createTempFile(caCert) if err != nil { t.Fatalf("Unable to create CA certificate temporary file: %s", err) @@ -624,7 +715,7 @@ func testSyslogTargetWithTLSVerifyClientCertificate(t *testing.T, octetCounting `<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`, } - err = writeMessagesToStream(c, messages, octetCounting) + err = writeMessagesToStream(c, messages, fmtFunc) require.NoError(t, err) require.NoError(t, c.Close()) @@ -706,7 +797,7 @@ func TestSyslogTarget_NonUTF8Message(t *testing.T) { err = writeMessagesToStream(c, []string{ "<165>1 - - - - - - " + msg1, "<123>1 - - - - - - " + msg2, - }, true) + }, fmtOctetCounting) require.NoError(t, err) require.NoError(t, c.Close()) @@ -747,3 +838,29 @@ func TestSyslogTarget_IdleTimeout(t *testing.T) { _, err = c.Read(buf) require.EqualError(t, err, "EOF") } + +func TestParseStream_WithAsyncPipe(t *testing.T) { + lines := [3]string{ + "<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey=\"1\"] An application event log entry...\n", + "<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey=\"2\"] An application event log entry...\n", + "<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey=\"3\"] An application event log entry...\n", + } + + addr := &net.UDPAddr{IP: net.IP{127, 0, 0, 1}, Port: 1514} + pipe := NewConnPipe(addr) + go func() { + for _, line := range lines { + _, _ = pipe.Write([]byte(line)) + } + pipe.Close() + }() + + results := make([]*syslog.Result, 0) + cb := func(res *syslog.Result) { + results = append(results, res) + } + + err := syslogparser.ParseStream(pipe, cb, defaultMaxMessageLength) + require.NoError(t, err) + require.Equal(t, 3, len(results)) +} diff --git a/clients/pkg/promtail/targets/syslog/transport.go b/clients/pkg/promtail/targets/syslog/transport.go new file mode 100644 index 000000000000..0d88d5a784ea --- /dev/null +++ b/clients/pkg/promtail/targets/syslog/transport.go @@ -0,0 +1,406 @@ +package syslog + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "io/ioutil" + "net" + "strings" + "sync" + "time" + + "github.com/grafana/dskit/backoff" + "github.com/mwitkow/go-conntrack" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser" + "github.com/influxdata/go-syslog/v3" + "github.com/prometheus/prometheus/model/labels" +) + +var ( + protocolUDP = "udp" + protocolTCP = "tcp" +) + +type Transport interface { + Run() error + Addr() net.Addr + Ready() bool + Close() error + Wait() +} + +type handleMessage func(labels.Labels, syslog.Message) +type handleMessageError func(error) + +type baseTransport struct { + config *scrapeconfig.SyslogTargetConfig + logger log.Logger + + openConnections *sync.WaitGroup + + handleMessage handleMessage + handleMessageError handleMessageError + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (t *baseTransport) close() { + t.ctxCancel() +} + +// Ready implements SyslogTransport +func (t *baseTransport) Ready() bool { + return t.ctx.Err() == nil +} + +func (t *baseTransport) idleTimeout() time.Duration { + if t.config.IdleTimeout != 0 { + return t.config.IdleTimeout + } + return defaultIdleTimeout +} + +func (t *baseTransport) maxMessageLength() int { + if t.config.MaxMessageLength != 0 { + return t.config.MaxMessageLength + } + return defaultMaxMessageLength +} + +func (t *baseTransport) connectionLabels(ip string) labels.Labels { + lb := labels.NewBuilder(nil) + for k, v := range t.config.Labels { + lb.Set(string(k), string(v)) + } + + lb.Set("__syslog_connection_ip_address", ip) + lb.Set("__syslog_connection_hostname", lookupAddr(ip)) + + return lb.Labels() +} + +func ipFromConn(c net.Conn) net.IP { + switch addr := c.RemoteAddr().(type) { + case *net.TCPAddr: + return addr.IP + } + + return nil +} + +func lookupAddr(addr string) string { + names, _ := net.LookupAddr(addr) + return strings.Join(names, ",") +} + +func newBaseTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) *baseTransport { + ctx, cancel := context.WithCancel(context.Background()) + return &baseTransport{ + config: config, + logger: logger, + openConnections: new(sync.WaitGroup), + handleMessage: handleMessage, + handleMessageError: handleError, + ctx: ctx, + ctxCancel: cancel, + } +} + +type idleTimeoutConn struct { + net.Conn + idleTimeout time.Duration +} + +func (c *idleTimeoutConn) Write(p []byte) (int, error) { + c.setDeadline() + return c.Conn.Write(p) +} + +func (c *idleTimeoutConn) Read(b []byte) (int, error) { + c.setDeadline() + return c.Conn.Read(b) +} + +func (c *idleTimeoutConn) setDeadline() { + _ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout)) +} + +type ConnPipe struct { + addr net.Addr + *io.PipeReader + *io.PipeWriter +} + +func NewConnPipe(addr net.Addr) *ConnPipe { + pr, pw := io.Pipe() + return &ConnPipe{ + addr: addr, + PipeReader: pr, + PipeWriter: pw, + } +} + +func (pipe *ConnPipe) Close() error { + if err := pipe.PipeWriter.Close(); err != nil { + return err + } + return nil +} + +type TCPTransport struct { + *baseTransport + listener net.Listener +} + +func NewSyslogTCPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport { + return &TCPTransport{ + baseTransport: newBaseTransport(config, handleMessage, handleError, logger), + } +} + +// Run implements SyslogTransport +func (t *TCPTransport) Run() error { + l, err := net.Listen(protocolTCP, t.config.ListenAddress) + l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress)) + if err != nil { + return fmt.Errorf("error setting up syslog target: %w", err) + } + + tlsEnabled := t.config.TLSConfig.CertFile != "" || t.config.TLSConfig.KeyFile != "" || t.config.TLSConfig.CAFile != "" + if tlsEnabled { + tlsConfig, err := newTLSConfig(t.config.TLSConfig.CertFile, t.config.TLSConfig.KeyFile, t.config.TLSConfig.CAFile) + if err != nil { + return fmt.Errorf("error setting up syslog target: %w", err) + } + l = tls.NewListener(l, tlsConfig) + } + + t.listener = l + level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolTCP, "tls", tlsEnabled) + + t.openConnections.Add(1) + go t.acceptConnections() + + return nil +} + +func newTLSConfig(certFile string, keyFile string, caFile string) (*tls.Config, error) { + if certFile == "" || keyFile == "" { + return nil, fmt.Errorf("certificate and key files are required") + } + + certs, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, fmt.Errorf("unable to load server certificate or key: %w", err) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{certs}, + } + + if caFile != "" { + caCert, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, fmt.Errorf("unable to load client CA certificate: %w", err) + } + + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("unable to parse client CA certificate") + } + + tlsConfig.ClientCAs = caCertPool + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + + return tlsConfig, nil +} + +func (t *TCPTransport) acceptConnections() { + defer t.openConnections.Done() + + l := log.With(t.logger, "address", t.listener.Addr().String()) + + backoff := backoff.New(t.ctx, backoff.Config{ + MinBackoff: 5 * time.Millisecond, + MaxBackoff: 1 * time.Second, + }) + + for { + c, err := t.listener.Accept() + if err != nil { + if !t.Ready() { + level.Info(l).Log("msg", "syslog server shutting down", "protocol", protocolTCP, "err", t.ctx.Err()) + return + } + + if ne, ok := err.(net.Error); ok && ne.Temporary() { + level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries()) + backoff.Wait() + continue + } + + level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err) + return + } + backoff.Reset() + + t.openConnections.Add(1) + go t.handleConnection(c) + } + +} + +func (t *TCPTransport) handleConnection(cn net.Conn) { + defer t.openConnections.Done() + + c := &idleTimeoutConn{cn, t.idleTimeout()} + + handlerCtx, cancel := context.WithCancel(t.ctx) + defer cancel() + go func() { + <-handlerCtx.Done() + _ = c.Close() + }() + + lbs := t.connectionLabels(ipFromConn(c).String()) + + err := syslogparser.ParseStream(c, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + return + } + t.handleMessage(lbs.Copy(), result.Message) + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err) + } +} + +// Close implements SyslogTransport +func (t *TCPTransport) Close() error { + t.baseTransport.close() + return t.listener.Close() +} + +// Wait implements SyslogTransport +func (t *TCPTransport) Wait() { + t.openConnections.Wait() +} + +// Addr implements SyslogTransport +func (t *TCPTransport) Addr() net.Addr { + return t.listener.Addr() +} + +type UDPTransport struct { + *baseTransport + udpConn *net.UDPConn +} + +func NewSyslogUDPTransport(config *scrapeconfig.SyslogTargetConfig, handleMessage handleMessage, handleError handleMessageError, logger log.Logger) Transport { + return &UDPTransport{ + baseTransport: newBaseTransport(config, handleMessage, handleError, logger), + } +} + +// Run implements SyslogTransport +func (t *UDPTransport) Run() error { + var err error + addr, err := net.ResolveUDPAddr(protocolUDP, t.config.ListenAddress) + if err != nil { + return fmt.Errorf("error resolving UDP address: %w", err) + } + t.udpConn, err = net.ListenUDP(protocolUDP, addr) + if err != nil { + return fmt.Errorf("error setting up syslog target: %w", err) + } + _ = t.udpConn.SetReadBuffer(1024 * 1024) + level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.Addr().String(), "protocol", protocolUDP) + + t.openConnections.Add(1) + go t.acceptPackets() + return nil +} + +// Close implements SyslogTransport +func (t *UDPTransport) Close() error { + t.baseTransport.close() + return t.udpConn.Close() +} + +func (t *UDPTransport) acceptPackets() { + defer t.openConnections.Done() + + var ( + n int + addr net.Addr + err error + ) + streams := make(map[string]*ConnPipe) + buf := make([]byte, t.maxMessageLength()) + + for { + if !t.Ready() { + level.Info(t.logger).Log("msg", "syslog server shutting down", "protocol", protocolUDP, "err", t.ctx.Err()) + for _, stream := range streams { + if err = stream.Close(); err != nil { + level.Error(t.logger).Log("msg", "failed to close pipe", "err", err) + } + } + return + } + n, addr, err = t.udpConn.ReadFrom(buf) + if n <= 0 && err != nil { + level.Warn(t.logger).Log("msg", "failed to read packets", "addr", addr, "err", err) + continue + } + + stream, ok := streams[addr.String()] + if !ok { + stream = NewConnPipe(addr) + streams[addr.String()] = stream + t.openConnections.Add(1) + go t.handleRcv(stream) + } + if _, err := stream.Write(buf[:n]); err != nil { + level.Warn(t.logger).Log("msg", "failed to write to stream", "addr", addr, "err", err) + } + } +} + +func (t *UDPTransport) handleRcv(c *ConnPipe) { + defer t.openConnections.Done() + + lbs := t.connectionLabels(c.addr.String()) + err := syslogparser.ParseStream(c, func(result *syslog.Result) { + if err := result.Error; err != nil { + t.handleMessageError(err) + } else { + t.handleMessage(lbs.Copy(), result.Message) + } + }, t.maxMessageLength()) + + if err != nil { + level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) + } +} + +// Wait implements SyslogTransport +func (t *UDPTransport) Wait() { + t.openConnections.Wait() +} + +// Addr implements SyslogTransport +func (t *UDPTransport) Addr() net.Addr { + return t.udpConn.LocalAddr() +} diff --git a/docs/sources/clients/promtail/scraping.md b/docs/sources/clients/promtail/scraping.md index 6be07a852de7..c530a57869b3 100644 --- a/docs/sources/clients/promtail/scraping.md +++ b/docs/sources/clients/promtail/scraping.md @@ -226,7 +226,7 @@ When Promtail receives GCP logs, various internal labels are made available for ## Syslog Receiver Promtail supports receiving [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424) -messages from a tcp stream. Receiving syslog messages is defined in a `syslog` +messages from a TCP or UDP stream. Receiving syslog messages is defined in a `syslog` stanza: ```yaml @@ -234,6 +234,7 @@ scrape_configs: - job_name: syslog syslog: listen_address: 0.0.0.0:1514 + listen_protocol: tcp idle_timeout: 60s label_structured_data: yes labels: @@ -244,8 +245,11 @@ scrape_configs: ``` The only required field in the syslog section is the `listen_address` field, -where a valid network address should be provided. The `idle_timeout` can help -with cleaning up stale syslog connections. If `label_structured_data` is set, +where a valid network address must be provided. The default protocol for +receiving messages is TCP. To change the protocol, the `listen_protocol` field +can be changed to `udp`. Note, that UDP does not support TLS. +The `idle_timeout` can help with cleaning up stale syslog connections. +If `label_structured_data` is set, [structured data](https://tools.ietf.org/html/rfc5424#section-6.3) in the syslog header will be translated to internal labels in the form of `__syslog_message_sd__`. @@ -275,10 +279,17 @@ destination d_loki { ### Rsyslog Output Configuration +For sending messages via TCP: + ``` -action(type="omfwd" protocol="tcp" port="" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted") +*.* action(type="omfwd" protocol="tcp" target="" port="" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted" KeepAlive="on") ``` +For sending messages via UDP: + +``` +*.* action(type="omfwd" protocol="udp" target="" port="" Template="RSYSLOG_SyslogProtocol23Format") +``` ## Kafka