Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDP transport option to Promtail's syslog target #5790

Merged
merged 1 commit into from
May 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [5790](https://github.com/grafana/loki/pull/5790) **chaudum**: Add UDP support for Promtail's syslog target.
* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters.
* [5943](https://github.com/grafana/loki/pull/5943) **tpaschalis**: Add support for exclusion patterns in Promtail's static_config
* [5879](https://github.com/grafana/loki/pull/5879) **MichelHollands**: Remove lines matching delete request expression when using "filter-and-delete" deletion mode.
Expand Down
4 changes: 4 additions & 0 deletions clients/pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type SyslogTargetConfig struct {
// ListenAddress is the address to listen on for syslog messages.
ListenAddress string `yaml:"listen_address"`

// ListenProtocol is the protocol used to listen for syslog messages.
// Must be either `tcp` (default) or `udp`
ListenProtocol string `yaml:"listen_protocol"`

// IdleTimeout is the idle timeout for tcp connections.
IdleTimeout time.Duration `yaml:"idle_timeout"`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import (
// detects octet counting.
// The function returns on EOF or unrecoverable errors.
func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
buf := bufio.NewReader(r)
buf := bufio.NewReaderSize(r, 1<<10)

firstByte, err := buf.Peek(1)
b, err := buf.ReadByte()
if err != nil {
return err
}
_ = buf.UnreadByte()

b := firstByte[0]
if b == '<' {
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else if b >= '0' && b <= '9' {
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else {
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte)
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b))
}

return nil
Expand Down
240 changes: 39 additions & 201 deletions clients/pkg/promtail/targets/syslog/syslogtarget.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
package syslog

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/influxdata/go-syslog/v3"
"github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/mwitkow/go-conntrack"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/pkg/logproto"
Expand All @@ -33,6 +25,7 @@ import (
var (
defaultIdleTimeout = 120 * time.Second
defaultMaxMessageLength = 8192
defaultProtocol = protocolTCP
)

// SyslogTarget listens to syslog messages.
Expand All @@ -44,12 +37,10 @@ type SyslogTarget struct {
config *scrapeconfig.SyslogTargetConfig
relabelConfig []*relabel.Config

listener net.Listener
messages chan message
transport Transport

ctx context.Context
ctxCancel context.CancelFunc
openConnections *sync.WaitGroup
messages chan message
messagesDone chan struct{}
}

type message struct {
Expand All @@ -67,144 +58,42 @@ func NewSyslogTarget(
config *scrapeconfig.SyslogTargetConfig,
) (*SyslogTarget, error) {

ctx, cancel := context.WithCancel(context.Background())

t := &SyslogTarget{
metrics: metrics,
logger: logger,
handler: handler,
config: config,
relabelConfig: relabel,

ctx: ctx,
ctxCancel: cancel,
openConnections: new(sync.WaitGroup),
messagesDone: make(chan struct{}),
}

switch t.transportProtocol() {
case protocolTCP:
t.transport = NewSyslogTCPTransport(
config,
t.handleMessage,
t.handleMessageError,
logger,
)
case protocolUDP:
t.transport = NewSyslogUDPTransport(
config,
t.handleMessage,
t.handleMessageError,
logger,
)
default:
return nil, fmt.Errorf("invalid transport protocol. expected 'tcp' or 'udp', got '%s'", t.transportProtocol())
}

t.messages = make(chan message)
go t.messageSender(handler.Chan())

err := t.run()
return t, err
}

func (t *SyslogTarget) run() error {
l, err := net.Listen("tcp", t.config.ListenAddress)
l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress))
err := t.transport.Run()
if err != nil {
return fmt.Errorf("error setting up syslog target: %w", err)
}

tlsEnabled := t.config.TLSConfig.CertFile != "" || t.config.TLSConfig.KeyFile != "" || t.config.TLSConfig.CAFile != ""
if tlsEnabled {
tlsConfig, err := newTLSConfig(t.config.TLSConfig.CertFile, t.config.TLSConfig.KeyFile, t.config.TLSConfig.CAFile)
if err != nil {
return fmt.Errorf("error setting up syslog target: %w", err)
}
l = tls.NewListener(l, tlsConfig)
}

t.listener = l
level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.ListenAddress().String(), "tls", tlsEnabled)

t.openConnections.Add(1)
go t.acceptConnections()

return nil
}

func newTLSConfig(certFile string, keyFile string, caFile string) (*tls.Config, error) {
if certFile == "" || keyFile == "" {
return nil, fmt.Errorf("certificate and key files are required")
}

certs, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, fmt.Errorf("unable to load server certificate or key: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{certs},
}

if caFile != "" {
caCert, err := ioutil.ReadFile(caFile)
if err != nil {
return nil, fmt.Errorf("unable to load client CA certificate: %w", err)
}

caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("unable to parse client CA certificate")
}

tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}

return tlsConfig, nil
}

func (t *SyslogTarget) acceptConnections() {
defer t.openConnections.Done()

l := log.With(t.logger, "address", t.listener.Addr().String())

backoff := backoff.New(t.ctx, backoff.Config{
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 1 * time.Second,
})

for {
c, err := t.listener.Accept()
if err != nil {
if t.ctx.Err() != nil {
level.Info(l).Log("msg", "syslog server shutting down")
return
}

if ne, ok := err.(net.Error); ok && ne.Temporary() {
level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries())
backoff.Wait()
continue
}

level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err)
return
}
backoff.Reset()

t.openConnections.Add(1)
go t.handleConnection(c)
}

}

func (t *SyslogTarget) handleConnection(cn net.Conn) {
defer t.openConnections.Done()

c := &idleTimeoutConn{cn, t.idleTimeout()}

handlerCtx, cancel := context.WithCancel(t.ctx)
defer cancel()
go func() {
<-handlerCtx.Done()
_ = c.Close()
}()

connLabels := t.connectionLabels(c)

err := syslogparser.ParseStream(c, func(result *syslog.Result) {
if err := result.Error; err != nil {
t.handleMessageError(err)
return
}
t.handleMessage(connLabels.Copy(), result.Message)
}, t.maxMessageLength())

if err != nil {
level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err)
return nil, err
}
return t, nil
}

func (t *SyslogTarget) handleMessageError(err error) {
Expand Down Expand Up @@ -247,7 +136,7 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag

if t.config.LabelStructuredData && rfc5424Msg.StructuredData != nil {
for id, params := range *rfc5424Msg.StructuredData {
id = strings.Replace(id, "@", "_", -1)
id = strings.ReplaceAll(id, "@", "_")
for name, value := range params {
key := "__syslog_message_sd_" + id + "_" + name
lb.Set(key, value)
Expand Down Expand Up @@ -295,33 +184,7 @@ func (t *SyslogTarget) messageSender(entries chan<- api.Entry) {
}
t.metrics.syslogEntries.Inc()
}
}

func (t *SyslogTarget) connectionLabels(c net.Conn) labels.Labels {
lb := labels.NewBuilder(nil)
for k, v := range t.config.Labels {
lb.Set(string(k), string(v))
}

ip := ipFromConn(c).String()
lb.Set("__syslog_connection_ip_address", ip)
lb.Set("__syslog_connection_hostname", lookupAddr(ip))

return lb.Labels()
}

func ipFromConn(c net.Conn) net.IP {
switch addr := c.RemoteAddr().(type) {
case *net.TCPAddr:
return addr.IP
}

return nil
}

func lookupAddr(addr string) string {
names, _ := net.LookupAddr(addr)
return strings.Join(names, ",")
t.messagesDone <- struct{}{}
}

// Type returns SyslogTargetType.
Expand All @@ -331,7 +194,7 @@ func (t *SyslogTarget) Type() target.TargetType {

// Ready indicates whether or not the syslog target is ready to be read from.
func (t *SyslogTarget) Ready() bool {
return true
return t.transport.Ready()
}

// DiscoveredLabels returns the set of labels discovered by the syslog target, which
Expand All @@ -353,48 +216,23 @@ func (t *SyslogTarget) Details() interface{} {

// Stop shuts down the SyslogTarget.
func (t *SyslogTarget) Stop() error {
t.ctxCancel()
err := t.listener.Close()
t.openConnections.Wait()
err := t.transport.Close()
t.transport.Wait()
close(t.messages)
// wait for all pending messages to be processed and sent to handler
<-t.messagesDone
t.handler.Stop()
return err
}

// ListenAddress returns the address SyslogTarget is listening on.
func (t *SyslogTarget) ListenAddress() net.Addr {
return t.listener.Addr()
}

func (t *SyslogTarget) idleTimeout() time.Duration {
if t.config.IdleTimeout != 0 {
return t.config.IdleTimeout
}
return defaultIdleTimeout
return t.transport.Addr()
}

func (t *SyslogTarget) maxMessageLength() int {
if t.config.MaxMessageLength != 0 {
return t.config.MaxMessageLength
func (t *SyslogTarget) transportProtocol() string {
if t.config.ListenProtocol != "" {
return t.config.ListenProtocol
}
return defaultMaxMessageLength
}

type idleTimeoutConn struct {
net.Conn
idleTimeout time.Duration
}

func (c *idleTimeoutConn) Write(p []byte) (int, error) {
c.setDeadline()
return c.Conn.Write(p)
}

func (c *idleTimeoutConn) Read(b []byte) (int, error) {
c.setDeadline()
return c.Conn.Read(b)
}

func (c *idleTimeoutConn) setDeadline() {
_ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout))
return defaultProtocol
}
Loading