Skip to content

Commit

Permalink
plugins: Reduce locks during decision logging (open-policy-agent#6797)
Browse files Browse the repository at this point in the history
Signed-off-by: Magnus Jungsbluth <magnus.jungsbluth@zalando.de>
Co-authored-by: Johan Fylling <johan.dev@fylling.se>
  • Loading branch information
mjungsbluth and johanfylling authored Jun 12, 2024
1 parent 612b93a commit b463d30
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 94 deletions.
186 changes: 92 additions & 94 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,21 +415,42 @@ func (c *Config) validateAndInjectDefaults(services []string, pluginsList []stri

// Plugin implements decision log buffering and uploading.
type Plugin struct {
manager *plugins.Manager
config Config
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
mask *rego.PreparedEvalQuery
maskMutex sync.Mutex
drop *rego.PreparedEvalQuery
dropMutex sync.Mutex
limiter *rate.Limiter
metrics metrics.Metrics
logger logging.Logger
status *lstat.Status
manager *plugins.Manager
config Config
buffer *logBuffer
enc *chunkEncoder
mtx sync.Mutex
stop chan chan struct{}
reconfig chan reconfigure
preparedMask prepareOnce
preparedDrop prepareOnce
limiter *rate.Limiter
metrics metrics.Metrics
logger logging.Logger
status *lstat.Status
}

type prepareOnce struct {
once *sync.Once
preparedQuery *rego.PreparedEvalQuery
err error
}

func newPrepareOnce() *prepareOnce {
return &prepareOnce{
once: new(sync.Once),
}
}

func (po *prepareOnce) drop() {
po.once = new(sync.Once)
}

func (po *prepareOnce) prepareOnce(f func() (*rego.PreparedEvalQuery, error)) (*rego.PreparedEvalQuery, error) {
po.once.Do(func() {
po.preparedQuery, po.err = f()
})
return po.preparedQuery, po.err
}

type reconfigure struct {
Expand Down Expand Up @@ -513,14 +534,16 @@ func (b *ConfigBuilder) Parse() (*Config, error) {
func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {

plugin := &Plugin{
manager: manager,
config: *parsedConfig,
stop: make(chan chan struct{}),
buffer: newLogBuffer(*parsedConfig.Reporting.BufferSizeLimitBytes),
enc: newChunkEncoder(*parsedConfig.Reporting.UploadSizeLimitBytes),
reconfig: make(chan reconfigure),
logger: manager.Logger().WithFields(map[string]interface{}{"plugin": Name}),
status: &lstat.Status{},
manager: manager,
config: *parsedConfig,
stop: make(chan chan struct{}),
buffer: newLogBuffer(*parsedConfig.Reporting.BufferSizeLimitBytes),
enc: newChunkEncoder(*parsedConfig.Reporting.UploadSizeLimitBytes),
reconfig: make(chan reconfigure),
logger: manager.Logger().WithFields(map[string]interface{}{"plugin": Name}),
status: &lstat.Status{},
preparedDrop: *newPrepareOnce(),
preparedMask: *newPrepareOnce(),
}

if parsedConfig.Reporting.MaxDecisionsPerSecond != nil {
Expand Down Expand Up @@ -713,13 +736,8 @@ func (p *Plugin) Reconfigure(_ context.Context, config interface{}) {
done := make(chan struct{})
p.reconfig <- reconfigure{config: config, done: done}

p.maskMutex.Lock()
defer p.maskMutex.Unlock()
p.mask = nil

p.dropMutex.Lock()
defer p.dropMutex.Unlock()
p.drop = nil
p.preparedMask.drop()
p.preparedDrop.drop()

<-done
}
Expand Down Expand Up @@ -753,13 +771,8 @@ func (p *Plugin) Trigger(ctx context.Context) error {
// fires. This indicates a new compiler instance is available. The decision
// logger needs to prepare a new masking query.
func (p *Plugin) compilerUpdated(storage.Transaction) {
p.maskMutex.Lock()
defer p.maskMutex.Unlock()
p.mask = nil

p.dropMutex.Lock()
defer p.dropMutex.Unlock()
p.drop = nil
p.preparedMask.drop()
p.preparedDrop.drop()
}

func (p *Plugin) loop() {
Expand Down Expand Up @@ -975,42 +988,33 @@ func (p *Plugin) bufferChunk(buffer *logBuffer, bs []byte) {
}

func (p *Plugin) maskEvent(ctx context.Context, txn storage.Transaction, input ast.Value, event *EventV1) error {

mask, err := func() (rego.PreparedEvalQuery, error) {

p.maskMutex.Lock()
defer p.maskMutex.Unlock()

if p.mask == nil {

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.maskDecisionRef)))

r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return rego.PreparedEvalQuery{}, err
}

p.mask = &pq
pq, err := p.preparedMask.prepareOnce(func() (*rego.PreparedEvalQuery, error) {
var pq rego.PreparedEvalQuery

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.maskDecisionRef)))

r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return nil, err
}

return *p.mask, nil
}()
return &pq, nil
})

if err != nil {
return err
}

rs, err := mask.Eval(
rs, err := pq.Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
Expand Down Expand Up @@ -1038,40 +1042,34 @@ func (p *Plugin) maskEvent(ctx context.Context, txn storage.Transaction, input a
}

func (p *Plugin) dropEvent(ctx context.Context, txn storage.Transaction, input ast.Value) (bool, error) {
var err error

drop, err := func() (rego.PreparedEvalQuery, error) {

p.dropMutex.Lock()
defer p.dropMutex.Unlock()

if p.drop == nil {
query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.dropDecisionRef)))
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return rego.PreparedEvalQuery{}, err
}

p.drop = &pq
pq, err := p.preparedDrop.prepareOnce(func() (*rego.PreparedEvalQuery, error) {
var pq rego.PreparedEvalQuery

query := ast.NewBody(ast.NewExpr(ast.NewTerm(p.config.dropDecisionRef)))
r := rego.New(
rego.ParsedQuery(query),
rego.Compiler(p.manager.GetCompiler()),
rego.Store(p.manager.Store),
rego.Transaction(txn),
rego.Runtime(p.manager.Info),
rego.EnablePrintStatements(p.manager.EnablePrintStatements()),
rego.PrintHook(p.manager.PrintHook()),
)

pq, err := r.PrepareForEval(context.Background())
if err != nil {
return nil, err
}

return *p.drop, nil
}()
return &pq, nil
})

if err != nil {
return false, err
}

rs, err := drop.Eval(
rs, err := pq.Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
Expand Down
146 changes: 146 additions & 0 deletions plugins/logs/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,152 @@ func TestPluginDrop(t *testing.T) {
}
}

func TestPluginMaskErrorHandling(t *testing.T) {
rawPolicy := []byte(`
package system.log
drop {
endswith(input.path, "bar")
}`)
event := &EventV1{Path: "foo/bar"}

// Setup fixture. Populate store with simple drop policy.
ctx := context.Background()
store := inmem.New()

//checks if raw policy is valid and stores policy in store
err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error {
if err := store.UpsertPolicy(ctx, txn, "test.rego", rawPolicy); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}

var output []string

// Create and start manager. Start is required so that stored policies
// get compiled and made available to the plugin.
manager, err := plugins.New(
nil,
"test",
store,
plugins.EnablePrintStatements(true),
plugins.PrintHook(appendingPrintHook{printed: &output}),
)
if err != nil {
t.Fatal(err)
}
if err := manager.Start(ctx); err != nil {
t.Fatal(err)
}

// Instantiate the plugin.
cfg := &Config{Service: "svc"}
trigger := plugins.DefaultTriggerMode
cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger)

plugin := New(cfg, manager)

if err := plugin.Start(ctx); err != nil {
t.Fatal(err)
}
input, err := event.AST()
if err != nil {
t.Fatal(err)
}

type badTransaction struct {
storage.Transaction
}

expErr := "storage_invalid_txn_error: unexpected transaction type *logs.badTransaction"
err = plugin.maskEvent(ctx, &badTransaction{}, input, event)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}

// We expect the same error on a second call, even though the mask query failed to prepare and won't be prepared again.
err = plugin.maskEvent(ctx, nil, input, event)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}
}

func TestPluginDropErrorHandling(t *testing.T) {
rawPolicy := []byte(`
package system.log
drop {
endswith(input.path, "bar")
}`)
event := &EventV1{Path: "foo/bar"}

// Setup fixture. Populate store with simple drop policy.
ctx := context.Background()
store := inmem.New()

//checks if raw policy is valid and stores policy in store
err := storage.Txn(ctx, store, storage.WriteParams, func(txn storage.Transaction) error {
if err := store.UpsertPolicy(ctx, txn, "test.rego", rawPolicy); err != nil {
return err
}
return nil
})
if err != nil {
t.Fatal(err)
}

var output []string

// Create and start manager. Start is required so that stored policies
// get compiled and made available to the plugin.
manager, err := plugins.New(
nil,
"test",
store,
plugins.EnablePrintStatements(true),
plugins.PrintHook(appendingPrintHook{printed: &output}),
)
if err != nil {
t.Fatal(err)
}
if err := manager.Start(ctx); err != nil {
t.Fatal(err)
}

// Instantiate the plugin.
cfg := &Config{Service: "svc"}
trigger := plugins.DefaultTriggerMode
cfg.validateAndInjectDefaults([]string{"svc"}, nil, &trigger)

plugin := New(cfg, manager)

if err := plugin.Start(ctx); err != nil {
t.Fatal(err)
}
input, err := event.AST()
if err != nil {
t.Fatal(err)
}

type badTransaction struct {
storage.Transaction
}

expErr := "storage_invalid_txn_error: unexpected transaction type *logs.badTransaction"
_, err = plugin.dropEvent(ctx, &badTransaction{}, input)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}

// We expect the same error on a second call, even though the drop query failed to prepare and won't be prepared again.
_, err = plugin.dropEvent(ctx, nil, input)
if err.Error() != expErr {
t.Fatalf("Expected error %v got %v", expErr, err)
}
}

type testFixtureOptions struct {
ConsoleLogger *test.Logger
ReportingUploadSizeLimitBytes int64
Expand Down

0 comments on commit b463d30

Please sign in to comment.