diff --git a/bridge_test.go b/bridge_test.go index c986c489..7816e7d1 100644 --- a/bridge_test.go +++ b/bridge_test.go @@ -20,6 +20,9 @@ import ( "testing" "time" + "github.com/prometheus/statsd_exporter/pkg/parser" + "github.com/stretchr/testify/require" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -531,67 +534,77 @@ func TestHandlePacket(t *testing.T) { }, } - parser := line.NewParser() - parser.EnableDogstatsdParsing() - parser.EnableInfluxdbParsing() - parser.EnableLibratoParsing() - parser.EnableSignalFXParsing() + testParser := line.NewParser() + testParser.EnableDogstatsdParsing() + testParser.EnableInfluxdbParsing() + testParser.EnableLibratoParsing() + testParser.EnableSignalFXParsing() for k, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ - Conn: nil, - EventHandler: nil, - Logger: log.NewNopLogger(), - LineParser: parser, - UDPPackets: udpPackets, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, + Conn: nil, + Logger: log.NewNopLogger(), + UDPPackets: udpPackets, }, &mockStatsDTCPListener{listener.StatsDTCPListener{ - Conn: nil, - EventHandler: nil, - Logger: log.NewNopLogger(), - LineParser: parser, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, - TCPConnections: tcpConnections, - TCPErrors: tcpErrors, - TCPLineTooLong: tcpLineTooLong, + Conn: nil, + Logger: log.NewNopLogger(), + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, }, log.NewNopLogger()}} { + packets := make(chan string, 32) events := make(chan event.Events, 32) - l.SetEventHandler(&event.UnbufferedEventHandler{C: events}) + workers := make([]*parser.Worker, 1) + for i := range workers { + workers[i] = parser.NewWorker( + log.NewNopLogger(), + &event.UnbufferedEventHandler{C: events}, + testParser, + nil, + linesReceived, + *sampleErrors, + samplesReceived, + tagErrors, + tagsReceived, + ) + } + go workers[0].Consume(packets) + l.SetPacketBuffer(packets) for i, scenario := range scenarios { - l.HandlePacket([]byte(scenario.in)) - - le := len(events) - // Flatten actual events. - actual := event.Events{} - for j := 0; j < le; j++ { - actual = append(actual, <-events...) - } - - if len(actual) != len(scenario.out) { - t.Fatalf("%d.%d. Expected %d events, got %d in scenario '%s'", k, i, len(scenario.out), len(actual), scenario.name) - } - - for j, expected := range scenario.out { - if !reflect.DeepEqual(&expected, &actual[j]) { - t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name) + t.Run(scenario.name, func(t *testing.T) { + l.HandlePacket([]byte(scenario.in)) + + le := 0 + if len(scenario.out) != 0 { + // wait until workers produce events + require.Eventually(t, func() bool { + le = len(events) + return le > 0 + }, time.Second, time.Millisecond*10) + } + + // Flatten actual events. + actual := event.Events{} + for j := 0; j < le; j++ { + actual = append(actual, <-events...) + } + + if len(actual) != len(scenario.out) { + t.Fatalf("%d.%d. Expected %d events, got %d in scenario '%s'", k, i, len(scenario.out), len(actual), scenario.name) + } + + for j, expected := range scenario.out { + if !reflect.DeepEqual(&expected, &actual[j]) { + t.Fatalf("%d.%d.%d. Expected %#v, got %#v in scenario '%s'", k, i, j, expected, actual[j], scenario.name) + } } - } + }) } } } type statsDPacketHandler interface { HandlePacket(packet []byte) - SetEventHandler(eh event.EventHandler) + SetPacketBuffer(pb chan string) } type mockStatsDTCPListener struct { diff --git a/exporter_benchmark_test.go b/exporter_benchmark_test.go index 9a26760c..e18bd78f 100644 --- a/exporter_benchmark_test.go +++ b/exporter_benchmark_test.go @@ -17,6 +17,8 @@ import ( "fmt" "testing" + "github.com/prometheus/statsd_exporter/pkg/parser" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -50,11 +52,11 @@ func benchmarkUDPListener(times int, b *testing.B) { } } - parser := line.NewParser() - parser.EnableDogstatsdParsing() - parser.EnableInfluxdbParsing() - parser.EnableLibratoParsing() - parser.EnableSignalFXParsing() + testParser := line.NewParser() + testParser.EnableDogstatsdParsing() + testParser.EnableInfluxdbParsing() + testParser.EnableLibratoParsing() + testParser.EnableSignalFXParsing() // reset benchmark timer to not measure startup costs b.ResetTimer() @@ -65,25 +67,39 @@ func benchmarkUDPListener(times int, b *testing.B) { // there are more events than input lines, need bigger buffer events := make(chan event.Events, len(bytesInput)*times*2) + packets := make(chan string, len(input)*times*2) l := listener.StatsDUDPListener{ - EventHandler: &event.UnbufferedEventHandler{C: events}, - Logger: logger, - LineParser: parser, - UDPPackets: udpPackets, - LinesReceived: linesReceived, - SamplesReceived: samplesReceived, - TagsReceived: tagsReceived, + Logger: logger, + UDPPackets: udpPackets, + PacketBuffer: packets, + } + + workers := make([]*parser.Worker, 2) + for i := 0; i < len(workers); i++ { + workers[i] = parser.NewWorker( + logger, + &event.UnbufferedEventHandler{C: events}, + testParser, + nil, + linesReceived, + *sampleErrors, + samplesReceived, + tagErrors, + tagsReceived, + ) + go workers[i].Consume(packets) } // resume benchmark timer b.StartTimer() for i := 0; i < times; i++ { - for _, line := range bytesInput { - l.HandlePacket([]byte(line)) + for _, li := range bytesInput { + l.HandlePacket([]byte(li)) } } + close(packets) } } diff --git a/go.mod b/go.mod index 1637a300..7e6bccd5 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.37.0 + github.com/stretchr/testify v1.4.0 github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/yaml.v2 v2.4.0 @@ -18,9 +19,11 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d // indirect google.golang.org/protobuf v1.28.0 // indirect diff --git a/main.go b/main.go index fdee20ef..a07d8de3 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,8 @@ import ( "strconv" "syscall" + "github.com/prometheus/statsd_exporter/pkg/parser" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -249,6 +251,8 @@ func main() { readBuffer = kingpin.Flag("statsd.read-buffer", "Size (in bytes) of the operating system's transmit read buffer associated with the UDP or Unixgram connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.").Int() cacheSize = kingpin.Flag("statsd.cache-size", "Maximum size of your metric mapping cache. Relies on least recently used replacement policy if max size is reached.").Default("1000").Int() cacheType = kingpin.Flag("statsd.cache-type", "Metric mapping cache type. Valid options are \"lru\" and \"random\"").Default("lru").Enum("lru", "random") + parserWorkerPool = kingpin.Flag("statsd.parser-worker-pool-size", "How many workers will process raw statsd packets simultaneously.").Default("1").Uint() + packetBufferSize = kingpin.Flag("statsd.packet-buffer-size", "Size of buffer that holds raw statsd packets for parsing.").Default("5000").Uint() eventQueueSize = kingpin.Flag("statsd.event-queue-size", "Size of internal queue for processing events.").Default("10000").Uint() eventFlushThreshold = kingpin.Flag("statsd.event-flush-threshold", "Number of events to hold in queue before flushing.").Default("1000").Int() eventFlushInterval = kingpin.Flag("statsd.event-flush-interval", "Maximum time between event queue flushes.").Default("200ms").Duration() @@ -274,23 +278,26 @@ func main() { } prometheus.MustRegister(version.NewCollector("statsd_exporter")) - parser := line.NewParser() + lineParser := line.NewParser() if *dogstatsdTagsEnabled { - parser.EnableDogstatsdParsing() + lineParser.EnableDogstatsdParsing() } if *influxdbTagsEnabled { - parser.EnableInfluxdbParsing() + lineParser.EnableInfluxdbParsing() } if *libratoTagsEnabled { - parser.EnableLibratoParsing() + lineParser.EnableLibratoParsing() } if *signalFXTagsEnabled { - parser.EnableSignalFXParsing() + lineParser.EnableSignalFXParsing() } level.Info(logger).Log("msg", "Starting StatsD -> Prometheus Exporter", "version", version.Info()) level.Info(logger).Log("msg", "Build context", "context", version.BuildContext()) + packets := make(chan string, *packetBufferSize) + defer close(packets) + events := make(chan event.Events, *eventQueueSize) defer close(events) eventQueue := event.NewEventQueue(events, *eventFlushThreshold, *eventFlushInterval, eventsFlushed) @@ -321,7 +328,17 @@ func main() { } } - exporter := exporter.NewExporter(prometheus.DefaultRegisterer, thisMapper, logger, eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) + exporterInstance := exporter.NewExporter( + prometheus.DefaultRegisterer, + thisMapper, + logger, + eventsActions, + eventsUnmapped, + errorEventStats, + eventStats, + conflictingEventStats, + metricsCount, + ) if *checkConfig { level.Info(logger).Log("msg", "Configuration check successful, exiting") @@ -367,18 +384,11 @@ func main() { } ul := &listener.StatsDUDPListener{ - Conn: uconn, - EventHandler: eventQueue, - Logger: logger, - LineParser: parser, - UDPPackets: udpPackets, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - Relay: relayTarget, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, + Conn: uconn, + Logger: logger, + PacketBuffer: packets, + + UDPPackets: udpPackets, } go ul.Listen() @@ -398,20 +408,13 @@ func main() { defer tconn.Close() tl := &listener.StatsDTCPListener{ - Conn: tconn, - EventHandler: eventQueue, - Logger: logger, - LineParser: parser, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - Relay: relayTarget, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, - TCPConnections: tcpConnections, - TCPErrors: tcpErrors, - TCPLineTooLong: tcpLineTooLong, + Conn: tconn, + Logger: logger, + PacketBuffer: packets, + + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + TCPLineTooLong: tcpLineTooLong, } go tl.Listen() @@ -443,18 +446,11 @@ func main() { } ul := &listener.StatsDUnixgramListener{ - Conn: uxgconn, - EventHandler: eventQueue, - Logger: logger, - LineParser: parser, + Conn: uxgconn, + Logger: logger, + UnixgramPackets: unixgramPackets, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - Relay: relayTarget, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, + PacketBuffer: packets, } go ul.Listen() @@ -477,6 +473,21 @@ func main() { } } + for i := 0; i < int(*parserWorkerPool); i++ { + worker := parser.NewWorker( + logger, + eventQueue, + lineParser, + relayTarget, + linesReceived, + *sampleErrors, + samplesReceived, + tagErrors, + tagsReceived, + ) + go worker.Consume(packets) + } + mux := http.DefaultServeMux mux.Handle(*metricsEndpoint, promhttp.Handler()) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -530,7 +541,7 @@ func main() { go serveHTTP(mux, *listenAddress, logger) go sighupConfigReloader(*mappingConfig, thisMapper, logger) - go exporter.Listen(events) + go exporterInstance.Listen(events) signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) diff --git a/pkg/event/event.go b/pkg/event/event.go index d5e65cef..2fe652fe 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -118,7 +118,7 @@ func (eq *EventQueue) Flush() { func (eq *EventQueue) FlushUnlocked() { eq.C <- eq.q - eq.q = make([]Event, 0, cap(eq.q)) + eq.q = make([]Event, 0, eq.flushThreshold) eq.eventsFlushed.Inc() } diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index ef7836f7..41c06695 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -19,6 +19,9 @@ import ( "testing" "time" + "github.com/prometheus/statsd_exporter/pkg/parser" + "github.com/stretchr/testify/require" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -39,12 +42,6 @@ var ( }, []string{"type"}, ) - eventsFlushed = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "statsd_exporter_event_queue_flushed_total", - Help: "Number of times events were flushed to exporter", - }, - ) eventsUnmapped = prometheus.NewCounter( prometheus.CounterOpts{ Name: "statsd_exporter_events_unmapped_total", @@ -596,51 +593,58 @@ func TestInvalidUtf8InDatadogTagValue(t *testing.T) { } }() - events := make(chan event.Events) + events := make(chan event.Events, 10) + packets := make(chan string, 10) ueh := &event.UnbufferedEventHandler{C: events} - parser := line.NewParser() - parser.EnableDogstatsdParsing() - parser.EnableInfluxdbParsing() - parser.EnableLibratoParsing() - parser.EnableSignalFXParsing() + testParser := line.NewParser() + testParser.EnableDogstatsdParsing() + testParser.EnableInfluxdbParsing() + testParser.EnableLibratoParsing() + testParser.EnableSignalFXParsing() + + workers := make([]*parser.Worker, 1) + for i := range workers { + workers[i] = parser.NewWorker( + log.NewNopLogger(), + ueh, + testParser, + nil, + linesReceived, + *sampleErrors, + samplesReceived, + tagErrors, + tagsReceived, + ) + } + go workers[0].Consume(packets) go func() { for _, l := range []statsDPacketHandler{&listener.StatsDUDPListener{ - Conn: nil, - EventHandler: nil, - Logger: log.NewNopLogger(), - LineParser: parser, - UDPPackets: udpPackets, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, + Conn: nil, + Logger: log.NewNopLogger(), + PacketBuffer: packets, + UDPPackets: udpPackets, }, &mockStatsDTCPListener{listener.StatsDTCPListener{ - Conn: nil, - EventHandler: nil, - Logger: log.NewNopLogger(), - LineParser: parser, - LinesReceived: linesReceived, - EventsFlushed: eventsFlushed, - SampleErrors: *sampleErrors, - SamplesReceived: samplesReceived, - TagErrors: tagErrors, - TagsReceived: tagsReceived, - TCPConnections: tcpConnections, - TCPErrors: tcpErrors, - TCPLineTooLong: tcpLineTooLong, + Conn: nil, + Logger: log.NewNopLogger(), + TCPConnections: tcpConnections, + TCPErrors: tcpErrors, + PacketBuffer: packets, + TCPLineTooLong: tcpLineTooLong, }, log.NewNopLogger()}} { - l.SetEventHandler(ueh) + l.SetPacketBuffer(packets) l.HandlePacket([]byte("bar:200|c|#tag:value\nbar:200|c|#tag:\xc3\x28invalid")) } - close(events) }() testMapper := mapper.MetricMapper{} + require.Eventually(t, func() bool { + return len(events) > 0 + }, time.Second, time.Millisecond*10) + close(events) + close(packets) ex := NewExporter(prometheus.DefaultRegisterer, &testMapper, log.NewNopLogger(), eventsActions, eventsUnmapped, errorEventStats, eventStats, conflictingEventStats, metricsCount) ex.Listen(events) } @@ -776,7 +780,7 @@ func TestCounterIncrement(t *testing.T) { type statsDPacketHandler interface { HandlePacket(packet []byte) - SetEventHandler(eh event.EventHandler) + SetPacketBuffer(pb chan string) } type mockStatsDTCPListener struct { diff --git a/pkg/listener/listener.go b/pkg/listener/listener.go index 09f9dbfe..d8137693 100644 --- a/pkg/listener/listener.go +++ b/pkg/listener/listener.go @@ -25,7 +25,6 @@ import ( "github.com/prometheus/statsd_exporter/pkg/event" "github.com/prometheus/statsd_exporter/pkg/level" - "github.com/prometheus/statsd_exporter/pkg/relay" ) type Parser interface { @@ -33,22 +32,14 @@ type Parser interface { } type StatsDUDPListener struct { - Conn *net.UDPConn - EventHandler event.EventHandler - Logger log.Logger - LineParser Parser - UDPPackets prometheus.Counter - LinesReceived prometheus.Counter - EventsFlushed prometheus.Counter - Relay *relay.Relay - SampleErrors prometheus.CounterVec - SamplesReceived prometheus.Counter - TagErrors prometheus.Counter - TagsReceived prometheus.Counter + Conn *net.UDPConn + Logger log.Logger + UDPPackets prometheus.Counter + PacketBuffer chan string } -func (l *StatsDUDPListener) SetEventHandler(eh event.EventHandler) { - l.EventHandler = eh +func (l *StatsDUDPListener) SetPacketBuffer(pb chan string) { + l.PacketBuffer = pb } func (l *StatsDUDPListener) Listen() { @@ -70,36 +61,21 @@ func (l *StatsDUDPListener) Listen() { func (l *StatsDUDPListener) HandlePacket(packet []byte) { l.UDPPackets.Inc() - lines := strings.Split(string(packet), "\n") - for _, line := range lines { - level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "udp", "line", line) - l.LinesReceived.Inc() - if l.Relay != nil && len(line) > 0 { - l.Relay.RelayLine(line) - } - l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) - } + l.PacketBuffer <- string(packet) } type StatsDTCPListener struct { - Conn *net.TCPListener - EventHandler event.EventHandler - Logger log.Logger - LineParser Parser - LinesReceived prometheus.Counter - EventsFlushed prometheus.Counter - Relay *relay.Relay - SampleErrors prometheus.CounterVec - SamplesReceived prometheus.Counter - TagErrors prometheus.Counter - TagsReceived prometheus.Counter - TCPConnections prometheus.Counter - TCPErrors prometheus.Counter - TCPLineTooLong prometheus.Counter + Conn *net.TCPListener + PacketBuffer chan string + Logger log.Logger + + TCPConnections prometheus.Counter + TCPErrors prometheus.Counter + TCPLineTooLong prometheus.Counter } -func (l *StatsDTCPListener) SetEventHandler(eh event.EventHandler) { - l.EventHandler = eh +func (l *StatsDTCPListener) SetPacketBuffer(pb chan string) { + l.PacketBuffer = pb } func (l *StatsDTCPListener) Listen() { @@ -139,31 +115,21 @@ func (l *StatsDTCPListener) HandleConn(c *net.TCPConn) { level.Debug(l.Logger).Log("msg", "Read failed: line too long", "addr", c.RemoteAddr()) break } - l.LinesReceived.Inc() - if l.Relay != nil && len(line) > 0 { - l.Relay.RelayLine(string(line)) - } - l.EventHandler.Queue(l.LineParser.LineToEvents(string(line), l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) + + l.PacketBuffer <- string(line) } } type StatsDUnixgramListener struct { - Conn *net.UnixConn - EventHandler event.EventHandler - Logger log.Logger - LineParser Parser + Conn *net.UnixConn + PacketBuffer chan string + Logger log.Logger + UnixgramPackets prometheus.Counter - LinesReceived prometheus.Counter - EventsFlushed prometheus.Counter - Relay *relay.Relay - SampleErrors prometheus.CounterVec - SamplesReceived prometheus.Counter - TagErrors prometheus.Counter - TagsReceived prometheus.Counter } -func (l *StatsDUnixgramListener) SetEventHandler(eh event.EventHandler) { - l.EventHandler = eh +func (l *StatsDUnixgramListener) SetPacketBuffer(pb chan string) { + l.PacketBuffer = pb } func (l *StatsDUnixgramListener) Listen() { @@ -185,13 +151,5 @@ func (l *StatsDUnixgramListener) Listen() { func (l *StatsDUnixgramListener) HandlePacket(packet []byte) { l.UnixgramPackets.Inc() - lines := strings.Split(string(packet), "\n") - for _, line := range lines { - level.Debug(l.Logger).Log("msg", "Incoming line", "proto", "unixgram", "line", line) - l.LinesReceived.Inc() - if l.Relay != nil && len(line) > 0 { - l.Relay.RelayLine(line) - } - l.EventHandler.Queue(l.LineParser.LineToEvents(line, l.SampleErrors, l.SamplesReceived, l.TagErrors, l.TagsReceived, l.Logger)) - } + l.PacketBuffer <- string(packet) } diff --git a/pkg/parser/worker.go b/pkg/parser/worker.go new file mode 100644 index 00000000..6f702ac2 --- /dev/null +++ b/pkg/parser/worker.go @@ -0,0 +1,86 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parser + +import ( + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/statsd_exporter/pkg/event" + "github.com/prometheus/statsd_exporter/pkg/line" + "github.com/prometheus/statsd_exporter/pkg/relay" +) + +type Worker struct { + EventHandler event.EventHandler + Logger log.Logger + LineParser *line.Parser + Relay *relay.Relay + + LinesReceived prometheus.Counter + SampleErrors prometheus.CounterVec + SamplesReceived prometheus.Counter + TagErrors prometheus.Counter + TagsReceived prometheus.Counter +} + +func NewWorker( + logger log.Logger, + eventHandler event.EventHandler, + lineParser *line.Parser, + relay *relay.Relay, + linesReceived prometheus.Counter, + sampleErrors prometheus.CounterVec, + samplesReceived prometheus.Counter, + tagErrors prometheus.Counter, + tagsReceived prometheus.Counter, +) *Worker { + return &Worker{ + EventHandler: eventHandler, + Logger: logger, + LineParser: lineParser, + Relay: relay, + LinesReceived: linesReceived, + SampleErrors: sampleErrors, + SamplesReceived: samplesReceived, + TagErrors: tagErrors, + TagsReceived: tagsReceived, + } +} + +func (w *Worker) Consume(c <-chan string) { + for { + bytes, ok := <-c + + if !ok { + level.Debug(w.Logger).Log("msg", "channel closed, exiting consume loop") + return + } + w.handle(bytes) + } +} + +func (w *Worker) handle(packet string) { + lines := strings.Split(packet, "\n") + for _, l := range lines { + level.Debug(w.Logger).Log("msg", "Incoming line", "sample", l) + w.LinesReceived.Inc() + if w.Relay != nil && len(l) > 0 { + w.Relay.RelayLine(l) + } + w.EventHandler.Queue(w.LineParser.LineToEvents(l, w.SampleErrors, w.SamplesReceived, w.TagErrors, w.TagsReceived, w.Logger)) + } +}