Skip to content

Commit

Permalink
Make loki syncer thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
dongxuny committed Jan 30, 2022
1 parent 5c841b5 commit ef62949
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 53 deletions.
117 changes: 76 additions & 41 deletions syncer_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewLokiSyncer(opts ...LokiSyncerOption) *LokiSyncer {
maxBatchWaitMs: 3000 * time.Millisecond,
maxBatchSize: 1000,
quitChannel: make(chan struct{}),
logChannel: make(chan *lokiValue),
buffer: newAtomicSlice(),
}

for i := range opts {
Expand Down Expand Up @@ -156,24 +156,29 @@ func (syncer *LokiSyncer) initBasicAuth() {

// LokiSyncer which will periodically send logs to Loki
type LokiSyncer struct {
addr string `yaml:"addr" json:"addr"`
path string `yaml:"path" json:"path"`
username string `yaml:"username" json:"username"`
password string `yaml:"-" json:"-"`
basicAuthHeader string `yaml:"-" json:"-"`
tlsConfig *tls.Config `yaml:"-" json:"-"`
maxBatchWaitMs time.Duration `yaml:"maxBatchWaitMs" json:"maxBatchWaitMs"`
maxBatchSize int `yaml:"maxBatchSize" json:"maxBatchSize"`
labels *atomicMap `yaml:"-" json:"-"`
logChannel chan *lokiValue `yaml:"-" json:"-"`
quitChannel chan struct{} `yaml:"-" json:"-"`
waitGroup sync.WaitGroup `yaml:"-" json:"-"`
httpClient *http.Client `yaml:"-" json:"-"`
addr string `yaml:"addr" json:"addr"`
path string `yaml:"path" json:"path"`
username string `yaml:"username" json:"username"`
password string `yaml:"-" json:"-"`
basicAuthHeader string `yaml:"-" json:"-"`
tlsConfig *tls.Config `yaml:"-" json:"-"`
maxBatchWaitMs time.Duration `yaml:"maxBatchWaitMs" json:"maxBatchWaitMs"`
maxBatchSize int `yaml:"maxBatchSize" json:"maxBatchSize"`
labels *atomicMap `yaml:"-" json:"-"`
buffer *atomicSlice `yaml:"-" json:"-"`
quitChannel chan struct{} `yaml:"-" json:"-"`
waitGroup sync.WaitGroup `yaml:"-" json:"-"`
httpClient *http.Client `yaml:"-" json:"-"`
}

// Send message to remote loki server
func (syncer *LokiSyncer) send(entries []*lokiValue) {
streams := syncer.newLokiStreamList(entries)
func (syncer *LokiSyncer) send() {
values := syncer.buffer.snapshotAndClear()
if len(values) < 1 {
return
}

streams := syncer.newLokiStreamList(values)

req, _ := http.NewRequest(http.MethodPost, syncer.addr+syncer.path, bytes.NewBuffer(streams))
req.Header.Set("Content-Type", "application/json")
Expand All @@ -199,38 +204,25 @@ func (syncer *LokiSyncer) send(entries []*lokiValue) {
// Bootstrap run periodic jobs
func (syncer *LokiSyncer) Bootstrap(context.Context) {
go func() {
var batch []*lokiValue
batchSize := 0
waitChannel := time.NewTimer(syncer.maxBatchWaitMs)

defer func() {
if batchSize > 0 {
syncer.send(batch)
}

syncer.send()
syncer.waitGroup.Done()
}()

for {
select {
case <-syncer.quitChannel:
return
case entry := <-syncer.logChannel:
batch = append(batch, entry)
batchSize++
if batchSize >= syncer.maxBatchSize {
syncer.send(batch)
batch = []*lokiValue{}
batchSize = 0
waitChannel.Reset(syncer.maxBatchWaitMs)
}
case <-waitChannel.C:
if batchSize > 0 {
syncer.send(batch)
batch = []*lokiValue{}
batchSize = 0
}
syncer.send()
waitChannel.Reset(syncer.maxBatchWaitMs)
default:
if syncer.buffer.len() >= syncer.maxBatchSize {
syncer.send()
waitChannel.Reset(syncer.maxBatchWaitMs)
}
}
}
}()
Expand Down Expand Up @@ -292,11 +284,10 @@ type lokiStreamList struct {

// Write to logChannel
func (syncer *LokiSyncer) Write(p []byte) (n int, err error) {
go func() {
syncer.logChannel <- &lokiValue{
Values: []string{fmt.Sprintf("%d", time.Now().UnixNano()), string(p)},
}
}()
syncer.buffer.add(&lokiValue{
Values: []string{fmt.Sprintf("%d", time.Now().UnixNano()), string(p)},
})

return len(p), nil
}

Expand All @@ -305,6 +296,50 @@ func (syncer *LokiSyncer) Sync() error {
return nil
}

func newAtomicSlice() *atomicSlice {
return &atomicSlice{
buf: make([]*lokiValue, 0),
mutex: sync.Mutex{},
}
}

type atomicSlice struct {
buf []*lokiValue
mutex sync.Mutex
}

func (a *atomicSlice) add(item *lokiValue) {
if item == nil {
return
}

a.mutex.Lock()
defer a.mutex.Unlock()

a.buf = append(a.buf, item)
}

func (a *atomicSlice) snapshotAndClear() []*lokiValue {
a.mutex.Lock()
defer a.mutex.Unlock()

res := make([]*lokiValue, 0)
for i := range a.buf {
res = append(res, a.buf[i])
}

a.buf = make([]*lokiValue, 0)

return res
}

func (a *atomicSlice) len() int {
a.mutex.Lock()
defer a.mutex.Unlock()

return len(a.buf)
}

func newAtomicMap() *atomicMap {
return &atomicMap{
mutex: sync.Mutex{},
Expand Down
18 changes: 6 additions & 12 deletions syncer_loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewLokiSyncer(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, syncer.maxBatchWaitMs)
assert.Equal(t, 1000, syncer.maxBatchSize)
assert.NotNil(t, syncer.quitChannel)
assert.NotNil(t, syncer.logChannel)
assert.NotNil(t, syncer.buffer)
syncer.Bootstrap(context.TODO())
syncer.Interrupt(context.TODO())

Expand All @@ -48,7 +48,7 @@ func TestNewLokiSyncer(t *testing.T) {
assert.Equal(t, time.Second, syncer.maxBatchWaitMs)
assert.Equal(t, 10, syncer.maxBatchSize)
assert.NotNil(t, syncer.quitChannel)
assert.NotNil(t, syncer.logChannel)
assert.NotNil(t, syncer.buffer)

syncer.Bootstrap(context.TODO())
syncer.Interrupt(context.TODO())
Expand All @@ -60,25 +60,19 @@ func TestLokiSyncer_send(t *testing.T) {
syncer := LokiSyncer{
httpClient: http.DefaultClient,
basicAuthHeader: "Basic xxx",
buffer: newAtomicSlice(),
}

entries := make([]*lokiValue, 0)

syncer.send(entries)
syncer.send()
}

func TestLokiSyncer_Write(t *testing.T) {
defer assertNotPanic(t)

syncer := &LokiSyncer{
logChannel: make(chan *lokiValue),
}

go func() {
<-syncer.logChannel
}()
syncer := NewLokiSyncer()

syncer.Write([]byte("ut"))
assert.Equal(t, syncer.buffer.len(), 1)
}

func TestLokiSyncer_Sync(t *testing.T) {
Expand Down

0 comments on commit ef62949

Please sign in to comment.