Skip to content

Commit

Permalink
builtin input add syslog (#24)
Browse files Browse the repository at this point in the history
* add syslog input operator
  • Loading branch information
wph95 authored Feb 25, 2021
1 parent 4954610 commit 10cb4e4
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 172 deletions.
44 changes: 44 additions & 0 deletions docs/operators/syslog_input.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
## `syslog_input` operator

The `syslog_input` operator listens for syslog format logs from UDP/TCP packages.

### Configuration Fields

| Field | Default | Description |
| ---------- | ---------------- | ------------------------------------------------------------ |
| `id` | `syslog_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `tcp` | {} | A [tcp_input config](./tcp_input.md#configuration-fields) to defined syslog_parser operator. |
| `udp` | {} | A [udp_input config](./udp_input.md#configuration-fields) to defined syslog_parser operator. |
| `syslog` | required | A [syslog parser config](./syslog_parser.md#configuration-fields) to defined syslog_parser operator. |
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |





### Example Configurations

#### Simple

TCP Configuration:
```yaml
- type: syslog_input
tcp:
listen_adress: "0.0.0.0:54526"
syslog:
protocol: rfc5424
```
UDP Configuration:
```yaml
- type: syslog_input
udp:
listen_adress: "0.0.0.0:54526"
syslog:
protocol: rfc3164
location: UTC
```
61 changes: 61 additions & 0 deletions operator/builtin/input/syslog/syslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package syslog

import (
"fmt"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/input/tcp"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/input/udp"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/syslog"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

func init() {
operator.Register("syslog_input", func() operator.Builder { return NewSyslogInputConfig("") })
}
func NewSyslogInputConfig(operatorID string) *SyslogInputConfig {
return &SyslogInputConfig{
InputConfig: helper.NewInputConfig(operatorID, "syslog_input"),
}
}

type SyslogInputConfig struct {
helper.InputConfig `yaml:",inline"`
Tcp *tcp.TCPInputConfig `json:"tcp" yaml:"tcp"`
Udp *udp.UDPInputConfig `json:"udp" yaml:"udp"`
Syslog *syslog.SyslogParserConfig `json:"syslog" yaml:"syslog"`
}

func (c SyslogInputConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
if c.Syslog == nil {
return nil, fmt.Errorf("need syslog config")
}
if c.Tcp == nil && c.Udp == nil {
return nil, fmt.Errorf("need tcp config or udp config")
}

c.Syslog.OutputIDs = c.OutputIDs
ops, err := c.Syslog.Build(context)
if err != nil {
return nil, fmt.Errorf("failed to resolve syslog config: %s", err)
}

if c.Tcp != nil {
c.Tcp.OutputIDs = []string{ops[0].ID()}
inputOps, err := c.Tcp.Build(context)
if err != nil {
return nil, fmt.Errorf("failed to resolve tcp config: %s", err)
}
ops = append(ops, inputOps...)
}

if c.Udp != nil {
c.Udp.OutputIDs = []string{ops[0].ID()}
inputOps, err := c.Udp.Build(context)
if err != nil {
return nil, fmt.Errorf("failed to resolve upd config: %s", err)
}
ops = append(ops, inputOps...)
}

return ops, nil
}
98 changes: 98 additions & 0 deletions operator/builtin/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package syslog

import (
"fmt"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/input/tcp"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/input/udp"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/syslog"
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
"github.com/stretchr/testify/require"
"net"
"testing"
"time"
)

func TestSyslogInput(t *testing.T) {
basicConfig := func() *syslog.SyslogParserConfig {
cfg := syslog.NewSyslogParserConfig("test_syslog_parser")
return cfg
}

cases, err := syslog.CreateCases(basicConfig)
require.NoError(t, err)

for _, tc := range cases {
t.Run(fmt.Sprintf("TCP-%s", tc.Name), func(t *testing.T) {
SyslogInputTest(t, NewSyslogInputConfigWithTcp(tc.Config), tc)
})
t.Run(fmt.Sprintf("UDP-%s", tc.Name), func(t *testing.T) {
SyslogInputTest(t, NewSyslogInputConfigWithUdp(tc.Config), tc)
})
}
}

func SyslogInputTest(t *testing.T, cfg *SyslogInputConfig, tc syslog.Case) {
ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)

fake := testutil.NewFakeOutput(t)
ops = append(ops, fake)
p, err := pipeline.NewDirectedPipeline(ops)
require.NoError(t, err)

err = p.Start()
require.NoError(t, err)

var conn net.Conn
if cfg.Tcp != nil {
conn, err = net.Dial("tcp", cfg.Tcp.ListenAddress)
require.NoError(t, err)
}
if cfg.Udp != nil {
conn, err = net.Dial("udp", cfg.Udp.ListenAddress)
require.NoError(t, err)
}


switch tc.InputRecord.(type) {
case string:
_, err = conn.Write([]byte(tc.InputRecord.(string)))
case []byte:
_, err = conn.Write(tc.InputRecord.([]byte))
}

conn.Close()
require.NoError(t, err)

select {
case e := <-fake.Received:
// close pipeline to avoid data race
p.Stop()
require.Equal(t, tc.ExpectedRecord, e.Record)
require.Equal(t, tc.ExpectedTimestamp, e.Timestamp)
require.Equal(t, tc.ExpectedSeverity, e.Severity)
require.Equal(t, tc.ExpectedSeverityText, e.SeverityText)
case <-time.After(time.Second):
p.Stop()
require.FailNow(t, "Timed out waiting for entry to be processed")
}
}

func NewSyslogInputConfigWithTcp(syslogCfg *syslog.SyslogParserConfig) *SyslogInputConfig {
cfg := NewSyslogInputConfig("test_syslog")
cfg.Tcp = tcp.NewTCPInputConfig("test_syslog_tcp")
cfg.Tcp.ListenAddress = ":14201"
cfg.OutputIDs = []string{"fake"}
cfg.Syslog = syslogCfg
return cfg
}

func NewSyslogInputConfigWithUdp(syslogCfg *syslog.SyslogParserConfig) *SyslogInputConfig {
cfg := NewSyslogInputConfig("test_syslog")
cfg.Udp = udp.NewUDPInputConfig("test_syslog_udp")
cfg.Udp.ListenAddress = ":12032"
cfg.OutputIDs = []string{"fake"}
cfg.Syslog = syslogCfg
return cfg
}
182 changes: 182 additions & 0 deletions operator/builtin/parser/syslog/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package syslog

import (
"github.com/open-telemetry/opentelemetry-log-collection/entry"
"time"
)

type Case struct {
Name string
Config *SyslogParserConfig
InputRecord interface{}
ExpectedTimestamp time.Time
ExpectedRecord interface{}
ExpectedSeverity entry.Severity
ExpectedSeverityText string
}

func testLocations() (map[string]*time.Location, error) {
locations := map[string]string{
"utc": "UTC",
"detroit": "America/Detroit",
"athens": "Europe/Athens",
}

l := make(map[string]*time.Location)
for k, v := range locations {
var err error
if l[k], err = time.LoadLocation(v); err != nil {
return nil, err
}
}
return l, nil
}

func CreateCases(basicConfig func() *SyslogParserConfig) ([]Case, error) {
location, err := testLocations()
if err != nil {
return nil, err
}

var cases = []Case{
{
"RFC3164",
func() *SyslogParserConfig {
cfg := basicConfig()
cfg.Protocol = "rfc3164"
cfg.Location = location["utc"].String()
return cfg
}(),
"<34>Jan 12 06:30:00 1.2.3.4 apache_server: test message",
time.Date(time.Now().Year(), 1, 12, 6, 30, 0, 0, location["utc"]),
map[string]interface{}{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
entry.Critical,
"crit",
},
{
"RFC3164Detroit",
func() *SyslogParserConfig {
cfg := basicConfig()
cfg.Protocol = "rfc3164"
cfg.Location = location["detroit"].String()
return cfg
}(),
"<34>Jan 12 06:30:00 1.2.3.4 apache_server: test message",
time.Date(time.Now().Year(), 1, 12, 6, 30, 0, 0, location["detroit"]),
map[string]interface{}{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
entry.Critical,
"crit",
},
{
"RFC3164Athens",
func() *SyslogParserConfig {
cfg := basicConfig()
cfg.Protocol = "rfc3164"
cfg.Location = location["athens"].String()
return cfg
}(),
"<34>Jan 12 06:30:00 1.2.3.4 apache_server: test message",
time.Date(time.Now().Year(), 1, 12, 6, 30, 0, 0, location["athens"]),
map[string]interface{}{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
entry.Critical,
"crit",
},
{
"RFC3164Bytes",
func() *SyslogParserConfig {
cfg := basicConfig()
cfg.Protocol = "rfc3164"
return cfg
}(),
[]byte("<34>Jan 12 06:30:00 1.2.3.4 apache_server: test message"),
time.Date(time.Now().Year(), 1, 12, 6, 30, 0, 0, time.UTC),
map[string]interface{}{
"appname": "apache_server",
"facility": 4,
"hostname": "1.2.3.4",
"message": "test message",
"priority": 34,
},
entry.Critical,
"crit",
},
{
"RFC5424",
func() *SyslogParserConfig {
cfg := basicConfig()
cfg.Protocol = "rfc5424"
return cfg
}(),
`<86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 [SecureAuth@27389 UserHostAddress="192.168.2.132" Realm="SecureAuth0" UserID="Tester2" PEN="27389"] Found the user for retrieving user's profile`,
time.Date(2015, 8, 5, 21, 58, 59, 693000000, time.UTC),
map[string]interface{}{
"appname": "SecureAuth0",
"facility": 10,
"hostname": "192.168.2.132",
"message": "Found the user for retrieving user's profile",
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]map[string]string{
"SecureAuth@27389": {
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
"UserID": "Tester2",
},
},
"version": 1,
},
entry.Info,
"info",
},
{
"RFC5424LongSDName",
func() *SyslogParserConfig {
cfg := basicConfig()
cfg.Protocol = "rfc5424"
return cfg
}(),
`<86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 [verylongsdnamethatisgreaterthan32bytes@12345 UserHostAddress="192.168.2.132"] my message`,
time.Date(2015, 8, 5, 21, 58, 59, 693000000, time.UTC),
map[string]interface{}{
"appname": "SecureAuth0",
"facility": 10,
"hostname": "192.168.2.132",
"message": "my message",
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]map[string]string{
"verylongsdnamethatisgreaterthan32bytes@12345": {
"UserHostAddress": "192.168.2.132",
},
},
"version": 1,
},
entry.Info,
"info",
},
}

return cases, nil

}
Loading

0 comments on commit 10cb4e4

Please sign in to comment.