Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/reduced decision log locking #6859

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion plugins/logs/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func (enc *chunkEncoder) Write(event EventV1) (result [][]byte, err error) {
return nil, err
}

bs := buf.Bytes()
return enc.WriteBytes(buf.Bytes())
}

func (enc *chunkEncoder) WriteBytes(bs []byte) (result [][]byte, err error) {
if len(bs) == 0 {
return nil, nil
} else if int64(len(bs)+2) > enc.limit {
Expand Down
32 changes: 20 additions & 12 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package logs

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -420,6 +421,7 @@ type Plugin struct {
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
statusMtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
preparedMask prepareOnce
Expand Down Expand Up @@ -714,9 +716,7 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
}

if p.config.Service != "" {
p.mtx.Lock()
p.encodeAndBufferEvent(event)
p.mtx.Unlock()
}

if p.config.Plugin != nil {
Expand Down Expand Up @@ -831,13 +831,11 @@ func (p *Plugin) loop() {
func (p *Plugin) doOneShot(ctx context.Context) error {
uploaded, err := p.oneShot(ctx)

// Make a local copy of the plugins's status. This is needed as locking the status for
// the status upload duration will block policy evaluation and result in
// increased latency for OPA clients
p.mtx.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still valid, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status update is no longer protected by the same mutex that is used for logging, so I don't think policy evaluation is affected anymore.

// Make a local copy of the plugins's status.
p.statusMtx.Lock()
p.status.SetError(err)
oldStatus := p.status
p.mtx.Unlock()
p.statusMtx.Unlock()

if s := status.Lookup(p.manager); s != nil {
s.UpdateDecisionLogsStatus(*oldStatus)
Expand Down Expand Up @@ -892,12 +890,9 @@ func (p *Plugin) oneShot(ctx context.Context) (ok bool, err error) {
continue
}

p.mtx.Lock()
for _, event := range events {
p.encodeAndBufferEvent(event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encodeAndBufferEvent is called in the Log method as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. encodeAndBufferEvent now does it's own locking, which is why we shouldn't also lock here.

}
p.mtx.Unlock()

} else {
// requeue the chunk
p.mtx.Lock()
Expand Down Expand Up @@ -938,7 +933,7 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
}
}

result, err := p.enc.Write(event)
result, err := p.encodeEvent(event)
if err != nil {
// If there's no ND builtins cache in the event, then we don't
// need to retry encoding anything.
Expand All @@ -958,7 +953,7 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
newEvent := event
newEvent.NDBuiltinCache = nil

result, err = p.enc.Write(newEvent)
result, err = p.encodeEvent(newEvent)
if err != nil {
if p.metrics != nil {
p.metrics.Counter(logEncodingFailureCounterName).Incr()
Expand All @@ -972,11 +967,24 @@ func (p *Plugin) encodeAndBufferEvent(event EventV1) {
p.metrics.Counter(logNDBDropCounterName).Incr()
}

p.mtx.Lock()
defer p.mtx.Unlock()
for _, chunk := range result {
p.bufferChunk(p.buffer, chunk)
}
}

func (p *Plugin) encodeEvent(event EventV1) ([][]byte, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(event); err != nil {
return nil, err
}

p.mtx.Lock()
defer p.mtx.Unlock()
return p.enc.WriteBytes(buf.Bytes())
}

func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) {
dropped := buffer.Push(bs)
if dropped > 0 {
Expand Down