-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
dque.go
132 lines (109 loc) · 2.84 KB
/
dque.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package main
import (
"fmt"
"os"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/joncrlsn/dque"
"github.com/prometheus/common/model"
)
type dqueConfig struct {
queueDir string
queueSegmentSize int
queueSync bool
queueName string
}
var defaultDqueConfig = dqueConfig{
queueDir: "/tmp/flb-storage/loki",
queueSegmentSize: 500,
queueSync: false,
queueName: "dque",
}
type dqueEntry struct {
Lbs model.LabelSet
Ts time.Time
Line string
}
func dqueEntryBuilder() interface{} {
return &dqueEntry{}
}
type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
quit chan struct{}
once sync.Once
wg sync.WaitGroup
}
// New makes a new dque loki client
func newDque(cfg *config, logger log.Logger) (client.Client, error) {
var err error
q := &dqueClient{
logger: log.With(logger, "component", "queue", "name", cfg.bufferConfig.dqueConfig.queueName),
quit: make(chan struct{}),
}
err = os.MkdirAll(cfg.bufferConfig.dqueConfig.queueDir, 0644)
if err != nil {
return nil, fmt.Errorf("cannot create queue directory: %s", err)
}
q.queue, err = dque.NewOrOpen(cfg.bufferConfig.dqueConfig.queueName, cfg.bufferConfig.dqueConfig.queueDir, cfg.bufferConfig.dqueConfig.queueSegmentSize, dqueEntryBuilder)
if err != nil {
return nil, err
}
if !cfg.bufferConfig.dqueConfig.queueSync {
q.queue.TurboOn()
}
q.loki, err = client.New(cfg.clientConfig, logger)
if err != nil {
return nil, err
}
q.wg.Add(1)
go q.dequeuer()
return q, nil
}
func (c *dqueClient) dequeuer() {
defer func() {
if err := c.queue.Close(); err != nil {
level.Error(c.logger).Log("msg", "error closing queue", "err", err)
}
c.wg.Done()
}()
for {
select {
case <-c.quit:
return
default:
}
// Dequeue the next item in the queue
entry, err := c.queue.DequeueBlock()
if err != nil {
level.Error(c.logger).Log("msg", "error dequeuing record", "error", err)
continue
}
// Assert type of the response to an Item pointer so we can work with it
record, ok := entry.(*dqueEntry)
if !ok {
level.Error(c.logger).Log("msg", "error dequeued record is not an valid type", "error")
continue
}
if err := c.loki.Handle(record.Lbs, record.Ts, record.Line); err != nil {
level.Error(c.logger).Log("msg", "error sending record to Loki", "error", err)
}
}
}
// Stop the client
func (c *dqueClient) Stop() {
c.once.Do(func() { close(c.quit) })
c.loki.Stop()
c.wg.Wait()
}
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
return fmt.Errorf("cannot enqueue record %s: %s", s, err)
}
return nil
}