Skip to content

Commit

Permalink
Add UDP protocol support to Promtail's syslog target
Browse files Browse the repository at this point in the history
Until now, the syslog target in Promtail did only support TCP. This PR
adds support for sending syslog messages to Promtail (via ng-syslog or
rsyslog) via UDP.

UPD for syslogs can be used if you prefer speed over guaranteed
delivery.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed May 2, 2022
1 parent a5b9a9a commit 9974849
Show file tree
Hide file tree
Showing 7 changed files with 705 additions and 328 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Main
* [5790](https://github.com/grafana/loki/pull/5790) **chaudum**: Add UDP support for Promtail's syslog target.
* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters.
* [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode.
* [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9.
Expand Down
4 changes: 4 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type SyslogTargetConfig struct {
// ListenAddress is the address to listen on for syslog messages.
ListenAddress string `yaml:"listen_address"`

// ListenProtocol is the protocol used to listen for syslog messages.
// Must be either `tcp` (default) or `udp`
ListenProtocol string `yaml:"listen_protocol"`

// IdleTimeout is the idle timeout for tcp connections.
IdleTimeout time.Duration `yaml:"idle_timeout"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import (
// detects octet counting.
// The function returns on EOF or unrecoverable errors.
func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
buf := bufio.NewReader(r)
buf := bufio.NewReaderSize(r, 1<<10)

firstByte, err := buf.Peek(1)
b, err := buf.ReadByte()
if err != nil {
return err
}
_ = buf.UnreadByte()

b := firstByte[0]
if b == '<' {
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else if b >= '0' && b <= '9' {
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else {
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte)
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b))
}

return nil
Expand Down
240 changes: 39 additions & 201 deletions clients/pkg/promtail/targets/syslog/syslogtarget.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
package syslog

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/influxdata/go-syslog/v3"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/mwitkow/go-conntrack"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/pkg/logproto"
Expand All @@ -33,6 +25,7 @@ import (
var (
defaultIdleTimeout = 120 * time.Second
defaultMaxMessageLength = 8192
defaultProtocol = protocolTCP
)

// SyslogTarget listens to syslog messages.
Expand All @@ -44,12 +37,10 @@ type SyslogTarget struct {
config *scrapeconfig.SyslogTargetConfig
relabelConfig []*relabel.Config

listener net.Listener
messages chan message
transport Transport

ctx context.Context
ctxCancel context.CancelFunc
openConnections *sync.WaitGroup
messages chan message
messagesDone chan struct{}
}

type message struct {
Expand All @@ -67,144 +58,42 @@ func NewSyslogTarget(
config *scrapeconfig.SyslogTargetConfig,
) (*SyslogTarget, error) {

ctx, cancel := context.WithCancel(context.Background())

t := &SyslogTarget{
metrics: metrics,
logger: logger,
handler: handler,
config: config,
relabelConfig: relabel,

ctx: ctx,
ctxCancel: cancel,
openConnections: new(sync.WaitGroup),
messagesDone: make(chan struct{}),
}

switch t.transportProtocol() {
case protocolTCP:
t.transport = NewSyslogTCPTransport(
config,
t.handleMessage,
t.handleMessageError,
logger,
)
case protocolUDP:
t.transport = NewSyslogUDPTransport(
config,
t.handleMessage,
t.handleMessageError,
logger,
)
default:
return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol())
}

t.messages = make(chan message)
go t.messageSender(handler.Chan())

err := t.run()
return t, err
}

func (t *SyslogTarget) run() error {
l, err := net.Listen("tcp", t.config.ListenAddress)
l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress))
err := t.transport.Run()
if err != nil {
return fmt.Errorf("error setting up syslog target: %w", err)
}

tlsEnabled := t.config.TLSConfig.CertFile != "" || t.config.TLSConfig.KeyFile != "" || t.config.TLSConfig.CAFile != ""
if tlsEnabled {
tlsConfig, err := newTLSConfig(t.config.TLSConfig.CertFile, t.config.TLSConfig.KeyFile, t.config.TLSConfig.CAFile)
if err != nil {
return fmt.Errorf("error setting up syslog target: %w", err)
}
l = tls.NewListener(l, tlsConfig)
}

t.listener = l
level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.ListenAddress().String(), "tls", tlsEnabled)

t.openConnections.Add(1)
go t.acceptConnections()

return nil
}

func newTLSConfig(certFile string, keyFile string, caFile string) (*tls.Config, error) {
if certFile == "" || keyFile == "" {
return nil, fmt.Errorf("certificate and key files are required")
}

certs, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("unable to load server certificate or key: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{certs},
}

if caFile != "" {
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, fmt.Errorf("unable to load client CA certificate: %w", err)
}

caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("unable to parse client CA certificate")
}

tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}

return tlsConfig, nil
}

func (t *SyslogTarget) acceptConnections() {
defer t.openConnections.Done()

l := log.With(t.logger, "address", t.listener.Addr().String())

backoff := backoff.New(t.ctx, backoff.Config{
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 1 * time.Second,
})

for {
c, err := t.listener.Accept()
if err != nil {
if t.ctx.Err() != nil {
level.Info(l).Log("msg", "syslog server shutting down")
return
}

if ne, ok := err.(net.Error); ok && ne.Temporary() {
level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries())
backoff.Wait()
continue
}

level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err)
return
}
backoff.Reset()

t.openConnections.Add(1)
go t.handleConnection(c)
}

}

func (t *SyslogTarget) handleConnection(cn net.Conn) {
defer t.openConnections.Done()

c := &idleTimeoutConn{cn, t.idleTimeout()}

handlerCtx, cancel := context.WithCancel(t.ctx)
defer cancel()
go func() {
<-handlerCtx.Done()
_ = c.Close()
}()

connLabels := t.connectionLabels(c)

err := syslogparser.ParseStream(c, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
return
}
t.handleMessage(connLabels.Copy(), result.Message)
}, t.maxMessageLength())

if err != nil {
level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err)
return nil, err
}
return t, nil
}

func (t *SyslogTarget) handleMessageError(err error) {
Expand Down Expand Up @@ -247,7 +136,7 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag

if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil {
for id, params := range *rfc5424Msg.StructuredData {
id = strings.Replace(id, "@", "_", -1)
id = strings.ReplaceAll(id, "@", "_")
for name, value := range params {
key := "__syslog_message_sd_" + id + "_" + name
lb.Set(key, value)
Expand Down Expand Up @@ -295,33 +184,7 @@ func (t *SyslogTarget) messageSender(entries chan<- api.Entry) {
}
t.metrics.syslogEntries.Inc()
}
}

func (t *SyslogTarget) connectionLabels(c net.Conn) labels.Labels {
lb := labels.NewBuilder(nil)
for k, v := range t.config.Labels {
lb.Set(string(k), string(v))
}

ip := ipFromConn(c).String()
lb.Set("__syslog_connection_ip_address", ip)
lb.Set("__syslog_connection_hostname", lookupAddr(ip))

return lb.Labels()
}

func ipFromConn(c net.Conn) net.IP {
switch addr := c.RemoteAddr().(type) {
case *net.TCPAddr:
return addr.IP
}

return nil
}

func lookupAddr(addr string) string {
names, _ := net.LookupAddr(addr)
return strings.Join(names, ",")
t.messagesDone <- struct{}{}
}

// Type returns SyslogTargetType.
Expand All @@ -331,7 +194,7 @@ func (t *SyslogTarget) Type() target.TargetType {

// Ready indicates whether or not the syslog target is ready to be read from.
func (t *SyslogTarget) Ready() bool {
return true
return t.transport.Ready()
}

// DiscoveredLabels returns the set of labels discovered by the syslog target, which
Expand All @@ -353,48 +216,23 @@ func (t *SyslogTarget) Details() interface{} {

// Stop shuts down the SyslogTarget.
func (t *SyslogTarget) Stop() error {
t.ctxCancel()
err := t.listener.Close()
t.openConnections.Wait()
err := t.transport.Close()
t.transport.Wait()
close(t.messages)
// wait for all pending messages to be processed and sent to handler
<-t.messagesDone
t.handler.Stop()
return err
}

// ListenAddress returns the address SyslogTarget is listening on.
func (t *SyslogTarget) ListenAddress() net.Addr {
return t.listener.Addr()
}

func (t *SyslogTarget) idleTimeout() time.Duration {
if t.config.IdleTimeout != 0 {
return t.config.IdleTimeout
}
return defaultIdleTimeout
return t.transport.Addr()
}

func (t *SyslogTarget) maxMessageLength() int {
if t.config.MaxMessageLength != 0 {
return t.config.MaxMessageLength
func (t *SyslogTarget) transportProtocol() string {
if t.config.ListenProtocol != "" {
return t.config.ListenProtocol
}
return defaultMaxMessageLength
}

type idleTimeoutConn struct {
net.Conn
idleTimeout time.Duration
}

func (c *idleTimeoutConn) Write(p []byte) (int, error) {
c.setDeadline()
return c.Conn.Write(p)
}

func (c *idleTimeoutConn) Read(b []byte) (int, error) {
c.setDeadline()
return c.Conn.Read(b)
}

func (c *idleTimeoutConn) setDeadline() {
_ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout))
return defaultProtocol
}
Loading

0 comments on commit 9974849

Please sign in to comment.