-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathhook.go
173 lines (151 loc) · 4.26 KB
/
hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package logrus_firehose
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/sirupsen/logrus"
)
var defaultLevels = []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel,
}
// FirehoseHook is logrus hook for AWS Firehose.
// Amazon Kinesis Firehose is a fully-managed service that delivers real-time
// streaming data to destinations such as Amazon Simple Storage Service (Amazon
// S3), Amazon Elasticsearch Service (Amazon ES), and Amazon Redshift.
type FirehoseHook struct {
client *firehose.Firehose
defaultStreamName string
defaultPartitionKey string
async bool
levels []logrus.Level
ignoreFields map[string]struct{}
filters map[string]func(interface{}) interface{}
addNewline bool
}
// New returns initialized logrus hook for Firehose with persistent Firehose logger.
func New(name string, conf Config) (*FirehoseHook, error) {
sess, err := session.NewSession(conf.AWSConfig())
if err != nil {
return nil, err
}
svc := firehose.New(sess)
return &FirehoseHook{
client: svc,
defaultStreamName: name,
levels: defaultLevels,
ignoreFields: make(map[string]struct{}),
filters: make(map[string]func(interface{}) interface{}),
}, nil
}
// NewWithConfig returns initialized logrus hook for Firehose with persistent Firehose logger.
func NewWithAWSConfig(name string, conf *aws.Config) (*FirehoseHook, error) {
sess, err := session.NewSession(conf)
if err != nil {
return nil, err
}
svc := firehose.New(sess)
return &FirehoseHook{
client: svc,
defaultStreamName: name,
levels: defaultLevels,
ignoreFields: make(map[string]struct{}),
filters: make(map[string]func(interface{}) interface{}),
}, nil
}
// Levels returns logging level to fire this hook.
func (h *FirehoseHook) Levels() []logrus.Level {
return h.levels
}
// SetLevels sets logging level to fire this hook.
func (h *FirehoseHook) SetLevels(levels []logrus.Level) {
h.levels = levels
}
// Async sets async flag and send log asynchroniously.
// If use this option, Fire() does not return error.
func (h *FirehoseHook) Async() {
h.async = true
}
// AddIgnore adds field name to ignore.
func (h *FirehoseHook) AddIgnore(name string) {
h.ignoreFields[name] = struct{}{}
}
// AddFilter adds a custom filter function.
func (h *FirehoseHook) AddFilter(name string, fn func(interface{}) interface{}) {
h.filters[name] = fn
}
// AddNewline sets if a newline is added to each message.
func (h *FirehoseHook) AddNewLine(b bool) {
h.addNewline = b
}
// Fire is invoked by logrus and sends log to Firehose.
func (h *FirehoseHook) Fire(entry *logrus.Entry) error {
if !h.async {
return h.fire(entry)
}
// send log asynchroniously and return no error.
go h.fire(entry)
return nil
}
// Fire is invoked by logrus and sends log to Firehose.
func (h *FirehoseHook) fire(entry *logrus.Entry) error {
in := &firehose.PutRecordInput{
DeliveryStreamName: stringPtr(h.getStreamName(entry)),
Record: &firehose.Record{
Data: h.getData(entry),
},
}
_, err := h.client.PutRecord(in)
return err
}
func (h *FirehoseHook) getStreamName(entry *logrus.Entry) string {
if name, ok := entry.Data["stream_name"].(string); ok {
return name
}
return h.defaultStreamName
}
func (h *FirehoseHook) getData(entry *logrus.Entry) []byte {
data := make(logrus.Fields)
entry.Data["message"] = entry.Message
for k, v := range entry.Data {
if _, ok := h.ignoreFields[k]; ok {
continue
}
if fn, ok := h.filters[k]; ok {
v = fn(v) // apply custom filter
} else {
v = formatData(v) // use default formatter
}
data[k] = v
}
bytes, err := json.Marshal(data)
if err != nil {
return nil
}
if h.addNewline {
n := []byte("\n")
bytes = append(bytes, n...)
}
return bytes
}
// formatData returns value as a suitable format.
func formatData(value interface{}) (formatted interface{}) {
switch value := value.(type) {
case json.Marshaler:
return value
case error:
return value.Error()
case fmt.Stringer:
return value.String()
default:
return value
}
}
func stringPtr(str string) *string {
return &str
}