Skip to content

Commit

Permalink
Fix/reduced decision log locking (#6859)
Browse files Browse the repository at this point in the history
Reducing amount of work performed inside global lock in decision log plugin.

Signed-off-by: Johan Fylling <johan.dev@fylling.se>
  • Loading branch information
johanfylling authored Jul 9, 2024
1 parent a9cadd9 commit 3209910
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
4 changes: 3 additions & 1 deletion plugins/logs/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (enc *chunkEncoder) Write(event EventV1) (result [][]byte, err error) {
return nil, err
}

bs := buf.Bytes()
return enc.WriteBytes(buf.Bytes())
}

func (enc *chunkEncoder) WriteBytes(bs []byte) (result [][]byte, err error) {
if len(bs) == 0 {
return nil, nil
} else if int64(len(bs)+2) > enc.limit {
Expand Down
32 changes: 20 additions & 12 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package logs

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -420,6 +421,7 @@ type Plugin struct {
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
statusMtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
preparedMask prepareOnce
Expand Down Expand Up @@ -714,9 +716,7 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
}

if p.config.Service != "" {
p.mtx.Lock()
p.encodeAndBufferEvent(event)
p.mtx.Unlock()
}

if p.config.Plugin != nil {
Expand Down Expand Up @@ -831,13 +831,11 @@ func (p *Plugin) loop() {
func (p *Plugin) doOneShot(ctx context.Context) error {
uploaded, err := p.oneShot(ctx)

// Make a local copy of the plugins's status. This is needed as locking the status for
// the status upload duration will block policy evaluation and result in
// increased latency for OPA clients
p.mtx.Lock()
// Make a local copy of the plugins's status.
p.statusMtx.Lock()
p.status.SetError(err)
oldStatus := p.status
p.mtx.Unlock()
p.statusMtx.Unlock()

if s := status.Lookup(p.manager); s != nil {
s.UpdateDecisionLogsStatus(*oldStatus)
Expand Down Expand Up @@ -892,12 +890,9 @@ func (p *Plugin) oneShot(ctx context.Context) (ok bool, err error) {
continue
}

p.mtx.Lock()
for _, event := range events {
p.encodeAndBufferEvent(event)
}
p.mtx.Unlock()

} else {
// requeue the chunk
p.mtx.Lock()
Expand Down Expand Up @@ -938,7 +933,7 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
}
}

result, err := p.enc.Write(event)
result, err := p.encodeEvent(event)
if err != nil {
// If there's no ND builtins cache in the event, then we don't
// need to retry encoding anything.
Expand All @@ -958,7 +953,7 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
newEvent := event
newEvent.NDBuiltinCache = nil

result, err = p.enc.Write(newEvent)
result, err = p.encodeEvent(newEvent)
if err != nil {
if p.metrics != nil {
p.metrics.Counter(logEncodingFailureCounterName).Incr()
Expand All @@ -972,11 +967,24 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
p.metrics.Counter(logNDBDropCounterName).Incr()
}

p.mtx.Lock()
defer p.mtx.Unlock()
for _, chunk := range result {
p.bufferChunk(p.buffer, chunk)
}
}

func (p *Plugin) encodeEvent(event EventV1) ([][]byte, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(event); err != nil {
return nil, err
}

p.mtx.Lock()
defer p.mtx.Unlock()
return p.enc.WriteBytes(buf.Bytes())
}

func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) {
dropped := buffer.Push(bs)
if dropped > 0 {
Expand Down

0 comments on commit 3209910

Please sign in to comment.