diff --git a/go.mod b/go.mod index 261c013..0325b77 100644 --- a/go.mod +++ b/go.mod @@ -15,4 +15,5 @@ require ( github.com/prometheus/client_golang v1.4.1 github.com/prometheus/client_model v0.2.0 github.com/stretchr/testify v1.4.0 + gopkg.in/mcuadros/go-syslog.v2 v2.3.0 // indirect ) diff --git a/go.sum b/go.sum index 17fc388..239889e 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/mcuadros/go-syslog.v2 v2.3.0 h1:kcsiS+WsTKyIEPABJBJtoG0KkOS6yzvJ+/eZlhD79kk= +gopkg.in/mcuadros/go-syslog.v2 v2.3.0/go.mod h1:l5LPIyOOyIdQquNg+oU6Z3524YwrcqEm0aKH+5zpt2U= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/logsource_syslog.go b/logsource_syslog.go new file mode 100644 index 0000000..79633ce --- /dev/null +++ b/logsource_syslog.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/alecthomas/kingpin" + "gopkg.in/mcuadros/go-syslog.v2" +) + +type syslogLogSource struct { + network string + listen string + server *syslog.Server + lines chan string + channel syslog.LogPartsChannel +} + +func NewSyslogLogSource(network string, listen string) (*syslogLogSource, error) { + channel := make(syslog.LogPartsChannel) + handler := syslog.NewChannelHandler(channel) + + server := syslog.NewServer() + server.SetFormat(syslog.Automatic) + server.SetHandler(handler) + if network == "udp" { + server.ListenUDP(listen) + } else { + server.ListenTCP(listen) + } + server.Boot() + lines := make(chan string, 1024) + + go func() { + for logParts := range channel { + lines <- fmt.Sprintf("%s[0]: %s", logParts["tag"], logParts["content"]) + } + }() + + return &syslogLogSource{network, listen, server, lines, channel}, nil +} + +func (s *syslogLogSource) Close() error { + defer close(s.channel) + return s.server.Kill() +} + +func (s *syslogLogSource) Path() string { + return fmt.Sprintf("%s:%s", s.network, s.listen) +} + +func (s *syslogLogSource) Read(ctx context.Context) (string, error) { + select { + case line, ok := <-s.lines: + if !ok { + return "", io.EOF + } + return line, nil + case <-ctx.Done(): + return "", ctx.Err() + } +} + +// A syslogLogSourceFactory is a factory than can create log sources +// from command line flags. +type syslogLogSourceFactory struct { + network string + listen string + enable bool +} + +func (f *syslogLogSourceFactory) Init(app *kingpin.Application) { + app.Flag("syslog.listen", "Host and port to listen on for syslog messages.").Default("0.0.0.0:514").StringVar(&f.listen) + app.Flag("syslog.network", "Network protocol to use (tcp/udp).").Default("udp").StringVar(&f.network) + app.Flag("syslog.enable", "Enable the syslog server.").Default("true").BoolVar(&f.enable) +} + +func (f *syslogLogSourceFactory) New(ctx context.Context) (LogSourceCloser, error) { + if !f.enable { + return nil, nil + } + + if !(f.network == "tcp" || f.network == "udp") { + return nil, errors.New(fmt.Sprintf("Unknown network protocol %s", f.network)) + } + + return NewSyslogLogSource(f.network, f.listen) +} + +func init() { + RegisterLogSourceFactory(&syslogLogSourceFactory{}) +} diff --git a/logsource_syslog_test.go b/logsource_syslog_test.go new file mode 100644 index 0000000..f2cc9a8 --- /dev/null +++ b/logsource_syslog_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "fmt" + "log/syslog" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewSyslogLogSource(t *testing.T) { + src, err := NewSyslogLogSource("udp", "localhost:8514") + if err != nil { + t.Fatalf("NewSyslogLogSource failed: %v", err) + } + + if err := src.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } +} + +func TestSyslogLogSource_Path(t *testing.T) { + src, err := NewSyslogLogSource("udp", "localhost:8514") + if err != nil { + t.Fatalf("NewSyslogLogSource failed: %v", err) + } + defer src.Close() + + assert.Equal(t, "udp:localhost:8514", src.Path(), "Path should be set by New.") +} + +func TestSyslogLogSource_Read(t *testing.T) { + ctx := context.Background() + + src, err := NewSyslogLogSource("udp", "localhost:8514") + if err != nil { + t.Fatalf("NewSyslogLogSource failed: %v", err) + } + defer src.Close() + + sysLog, err := syslog.Dial("udp", "localhost:8514", syslog.LOG_MAIL|syslog.LOG_INFO, "tag") + fmt.Fprint(sysLog, "a line") + + s, err := src.Read(ctx) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + assert.Contains(t, s, "a line") +}