forked from rjocoleman/syslog-cloudwatch-bridge
-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
150 lines (124 loc) · 4.11 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net/http"
"os"
"time"
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/satori/go.uuid"
"gopkg.in/davaops/go-syslog.v3"
"gopkg.in/davaops/go-syslog.v3/format"
)
var port = os.Getenv("PORT")
var logGroupName = os.Getenv("LOG_GROUP_NAME")
var streamName, err = uuid.NewV4()
var sequenceToken = ""
var (
client *http.Client
pool *x509.CertPool
)
func init() {
pool = x509.NewCertPool()
pool.AppendCertsFromPEM(pemCerts)
}
func main() {
if logGroupName == "" {
log.Fatal("LOG_GROUP_NAME must be specified")
}
if port == "" {
port = "514"
}
address := fmt.Sprintf("0.0.0.0:%v", port)
log.Println("Starting syslog server on", address)
log.Println("Logging to group:", logGroupName)
initCloudWatchStream()
channel := make(syslog.LogPartsChannel)
handler := syslog.NewChannelHandler(channel)
server := syslog.NewServer()
server.SetFormat(syslog.Automatic)
server.SetHandler(handler)
server.ListenUDP(address)
server.ListenTCP(address)
server.Boot()
go func(channel syslog.LogPartsChannel) {
for logParts := range channel {
sendToCloudWatch(logParts)
}
}(channel)
server.Wait()
}
func sendToCloudWatch(logPart format.LogParts) {
// service is defined at run time to avoid session expiry in long running processes
var svc = cloudwatchlogs.New(session.New())
// set the AWS SDK to use our bundled certs for the minimal container (certs from CoreOS linux)
svc.Config.HTTPClient = &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{RootCAs: pool}}}
params := &cloudwatchlogs.PutLogEventsInput{
LogEvents: []*cloudwatchlogs.InputLogEvent{
{
Message: aws.String(formatMessageContent(logPart)),
Timestamp: aws.Int64(makeMilliTimestamp(logPart["timestamp"].(time.Time))),
},
},
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(streamName.String()),
}
// first request has no SequenceToken - in all subsequent request we set it
if sequenceToken != "" {
params.SequenceToken = aws.String(sequenceToken)
}
resp, err := svc.PutLogEvents(params)
if err != nil {
log.Println(err)
}
sequenceToken = *resp.NextSequenceToken
}
func initCloudWatchStream() {
// service is defined at run time to avoid session expiry in long running processes
var svc = cloudwatchlogs.New(session.New())
// set the AWS SDK to use our bundled certs for the minimal container (certs from CoreOS linux)
svc.Config.HTTPClient = &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{RootCAs: pool}}}
_, err := svc.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(streamName.String()),
})
if err != nil {
log.Fatal(err)
}
log.Println("Created CloudWatch Logs stream:", streamName)
}
func makeMilliTimestamp(input time.Time) int64 {
return input.UnixNano() / int64(time.Millisecond)
}
//Receives the logParts map and returns the string message in format <hostname> <tag/app_name> [<proc_id>]: <content>
func formatMessageContent(message format.LogParts) string {
var buffer bytes.Buffer
if message["hostname"] != nil && message["hostname"] != " " {
buffer.WriteString(message["hostname"].(string))
buffer.WriteString(" ")
}
if message["app_name"] != nil && message["app_name"] != " " {
buffer.WriteString(message["app_name"].(string))
} else {
buffer.WriteString("-")
}
if message["proc_id"] != nil && message["proc_id"] != " " && message["proc_id"] != "-" {
buffer.WriteString("[")
buffer.WriteString(message["proc_id"].(string))
buffer.WriteString("]")
} else if message["pid"] != nil && message["pid"] != " " && message["pid"] != "-" {
buffer.WriteString("[")
buffer.WriteString(message["pid"].(string))
buffer.WriteString("]")
}
buffer.WriteString(": ")
if message["message"] != nil && message["message"] != " " {
buffer.WriteString(message["message"].(string))
}
return buffer.String()
}