Skip to content

Commit

Permalink
fixup! Add benchmark for UDP transport
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Apr 12, 2022
1 parent 2c1b752 commit 4c0b3f3
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 103 deletions.
3 changes: 3 additions & 0 deletions clients/pkg/promtail/client/fake/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fake

import (
"fmt"
"os"
"sync"

"github.com/grafana/loki/clients/pkg/promtail/api"
Expand All @@ -26,6 +28,7 @@ func New(stop func()) *Client {
defer c.wg.Done()
for e := range c.entries {
c.mtx.Lock()
fmt.Fprintf(os.Stderr, "received=%v\n", e)
c.received = append(c.received, e)
c.mtx.Unlock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import (
// detects octet counting.
// The function returns on EOF or unrecoverable errors.
func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
buf := bufio.NewReader(r)
buf := bufio.NewReaderSize(r, 1<<10)

firstByte, err := buf.Peek(1)
b, err := buf.ReadByte()
if err != nil {
return err
}
_ = buf.UnreadByte()

b := firstByte[0]
if b == '<' {
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else if b >= '0' && b <= '9' {
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else {
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte)
return fmt.Errorf("invalid or unsupported framing. first byte: '%v'", b)
}

return nil
Expand Down
11 changes: 10 additions & 1 deletion clients/pkg/promtail/targets/syslog/syslogtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type SyslogTarget struct {
relabelConfig []*relabel.Config

transport Transport
messages chan message

messages chan message
messagesDone chan struct{}
}

type message struct {
Expand All @@ -62,6 +64,7 @@ func NewSyslogTarget(
handler: handler,
config: config,
relabelConfig: relabel,
messagesDone: make(chan struct{}),
}

switch t.transportProtocol() {
Expand Down Expand Up @@ -171,6 +174,7 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag
}

func (t *SyslogTarget) messageSender(entries chan<- api.Entry) {
count := 0
for msg := range t.messages {
entries <- api.Entry{
Labels: msg.labels,
Expand All @@ -179,8 +183,11 @@ func (t *SyslogTarget) messageSender(entries chan<- api.Entry) {
Line: msg.message,
},
}
count++
t.metrics.syslogEntries.Inc()
}
level.Warn(t.logger).Log("count", count)
t.messagesDone <- struct{}{}
}

// Type returns SyslogTargetType.
Expand Down Expand Up @@ -215,6 +222,8 @@ func (t *SyslogTarget) Stop() error {
err := t.transport.Close()
t.transport.Wait()
close(t.messages)
// wait for all pending messages to be processed and sent to handler
<-t.messagesDone
t.handler.Stop()
return err
}
Expand Down
139 changes: 85 additions & 54 deletions clients/pkg/promtail/targets/syslog/syslogtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"unicode/utf8"

"github.com/go-kit/log"
"github.com/influxdata/go-syslog/v3"
promconfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
Expand All @@ -21,6 +22,7 @@ import (

"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/syslog/syslogparser"
)

var (
Expand Down Expand Up @@ -239,23 +241,22 @@ var (
)

func Benchmark_SyslogTarget(b *testing.B) {
for _, testdata := range []struct {
for _, tt := range []struct {
name string
protocol string
formatFunc formatFunc
}{
{"tcp", protocolTCP, fmtOctetCounting},
{"udp", protocolUDP, fmtOctetCounting},
} {
b.Run(testdata.name, func(b *testing.B) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
tt := tt
b.Run(tt.name, func(b *testing.B) {
client := fake.New(func() {})

metrics := NewMetrics(nil)
tgt, _ := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{
tgt, _ := NewSyslogTarget(metrics, log.NewNopLogger(), client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{
ListenAddress: "127.0.0.1:0",
ListenProtocol: testdata.protocol,
ListenProtocol: tt.protocol,
LabelStructuredData: true,
Labels: model.LabelSet{
"test": "syslog_target",
Expand Down Expand Up @@ -284,10 +285,11 @@ func Benchmark_SyslogTarget(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

c, _ := net.Dial(testdata.protocol, addr)
c, _ := net.Dial(tt.protocol, addr)
for n := 0; n < b.N; n++ {
require.NoError(b, writeMessagesToStream(c, messages, testdata.formatFunc))
writeMessagesToStream(c, messages, tt.formatFunc)
}
c.Close()

require.Eventuallyf(b, func() bool {
return len(client.Received()) == len(messages)*b.N
Expand All @@ -308,13 +310,15 @@ func TestSyslogTarget(t *testing.T) {
{"udp newline separated", protocolUDP, fmtNewline},
{"udp octetcounting", protocolUDP, fmtOctetCounting},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
client := fake.New(func() {})

metrics := NewMetrics(nil)
tgt, err := NewSyslogTarget(metrics, logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{
MaxMessageLength: 1 << 12, // explicitly not use default value
ListenAddress: "127.0.0.1:0",
ListenProtocol: tt.protocol,
LabelStructuredData: true,
Expand All @@ -323,10 +327,9 @@ func TestSyslogTarget(t *testing.T) {
},
})
require.NoError(t, err)
defer tgt.Stop()

require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond)
defer func() {
require.NoError(t, tgt.Stop())
}()

addr := tgt.ListenAddress().String()
c, err := net.Dial(tt.protocol, addr)
Expand All @@ -344,7 +347,7 @@ func TestSyslogTarget(t *testing.T) {

require.Eventuallyf(t, func() bool {
return len(client.Received()) == len(messages)
}, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received()))
}, time.Second, 10*time.Millisecond, "Expected to receive %d messages.", len(messages))

labels := make([]model.LabelSet, 0, len(messages))
for _, entry := range client.Received() {
Expand Down Expand Up @@ -416,52 +419,54 @@ func TestSyslogTarget_RFC5424Messages(t *testing.T) {
{"udp newline separated", protocolUDP, fmtNewline},
{"udp octetcounting", protocolUDP, fmtOctetCounting},
} {
t.Run(tt.name, func(t *testing.T) {})
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
client := fake.New(func() {})

metrics := NewMetrics(nil)
tgt, err := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{
ListenAddress: "127.0.0.1:0",
ListenProtocol: tt.protocol,
LabelStructuredData: true,
Labels: model.LabelSet{
"test": "syslog_target",
},
UseRFC5424Message: true,
})
require.NoError(t, err)
require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond)
defer func() {
require.NoError(t, tgt.Stop())
}()
tt := tt
t.Run(tt.name, func(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
client := fake.New(func() {})

addr := tgt.ListenAddress().String()
c, err := net.Dial(tt.protocol, addr)
require.NoError(t, err)
metrics := NewMetrics(nil)
tgt, err := NewSyslogTarget(metrics, logger, client, []*relabel.Config{}, &scrapeconfig.SyslogTargetConfig{
ListenAddress: "127.0.0.1:0",
ListenProtocol: tt.protocol,
LabelStructuredData: true,
Labels: model.LabelSet{
"test": "syslog_target",
},
UseRFC5424Message: true,
})
require.NoError(t, err)
require.Eventually(t, tgt.Ready, time.Second, 10*time.Millisecond)
defer func() {
require.NoError(t, tgt.Stop())
}()

messages := []string{
`<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`,
}
addr := tgt.ListenAddress().String()
c, err := net.Dial(tt.protocol, addr)
require.NoError(t, err)

err = writeMessagesToStream(c, messages, tt.fmtFunc)
require.NoError(t, err)
require.NoError(t, c.Close())
messages := []string{
`<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`,
}

require.Eventuallyf(t, func() bool {
return len(client.Received()) == len(messages)
}, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received()))
err = writeMessagesToStream(c, messages, tt.fmtFunc)
require.NoError(t, err)
require.NoError(t, c.Close())

for i := range messages {
require.Equal(t, model.LabelSet{
"test": "syslog_target",
}, client.Received()[i].Labels)
require.Contains(t, messages, client.Received()[i].Line)
require.NotZero(t, client.Received()[i].Timestamp)
}
require.Eventuallyf(t, func() bool {
return len(client.Received()) == len(messages)
}, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received()))

for i := range messages {
require.Equal(t, model.LabelSet{
"test": "syslog_target",
}, client.Received()[i].Labels)
require.Contains(t, messages, client.Received()[i].Line)
require.NotZero(t, client.Received()[i].Timestamp)
}
})
}
}

Expand Down Expand Up @@ -826,4 +831,30 @@ func TestSyslogTarget_IdleTimeout(t *testing.T) {
buf := make([]byte, 1)
_, err = c.Read(buf)
require.EqualError(t, err, "EOF")
}
}

func TestParseStream_WithAsyncPipe(t *testing.T) {
lines := [3]string{
"<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey=\"1\"] An application event log entry...\n",
"<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey=\"2\"] An application event log entry...\n",
"<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey=\"3\"] An application event log entry...\n",
}

addr := &net.UDPAddr{IP: net.IP{127, 0, 0, 1}, Port: 1514}
pipe := NewAsycPipe(addr, 1024)
go func() {
for _, line := range lines {
pipe.Write([]byte(line))
}
pipe.Close()
}()

results := make([]*syslog.Result, 0)
cb := func(res *syslog.Result) {
results = append(results, res)
}

err := syslogparser.ParseStream(pipe, cb, defaultMaxMessageLength)
require.NoError(t, err)
require.Equal(t, 3, len(results))
}
Loading

0 comments on commit 4c0b3f3

Please sign in to comment.