Skip to content

Commit

Permalink
Fix deadlock when bundle and decision logging enabled
Browse files Browse the repository at this point in the history
This commit attempts to fix the deadlock that happens
when bundle and decision logging are both enabled.

The opa-envoy plugin creates a new transaction during
query evaluation and closes it once eval is complete.
Then when it attempts to log the decision, the decision
log plugin grabs mask mutex and calls the PrepareForEval
function in the rego package which tries to open a new
read transaction on the store since the log plugin does
not provide one. This call gets blocked
if concurrently the bundle plugin has a write transaction
open on the store. This write invokes the decision log plugin's callback
and tries to grab the mask mutex. This call gets blocked because
the decision log plugin is already holding onto it for the mask query.

To avoid this, we keep the transaction open in the opa-envoy plugin
till we log the decision.

Fixes: open-policy-agent/opa#3722

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Sep 14, 2021
1 parent c39751f commit 67911b9
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 32 deletions.
70 changes: 40 additions & 30 deletions envoyauth/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,57 @@ type EvalContext interface {

//Eval - Evaluates an input against a provided EvalContext and yields result
func Eval(ctx context.Context, evalContext EvalContext, input ast.Value, result *EvalResult, opts ...func(*rego.Rego)) error {
var err error

err := storage.Txn(ctx, evalContext.Store(), storage.TransactionParams{}, func(txn storage.Transaction) error {
err := getRevision(ctx, evalContext.Store(), txn, result)
if result.Txn == nil {
var txn storage.Transaction
var txnClose TransactionCloser
txn, txnClose, err = result.GetTxn(ctx, evalContext.Store())
if err != nil {
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
return err
}
defer txnClose(ctx, err)
result.Txn = txn
}

result.TxnID = txn.ID()
err = getRevision(ctx, evalContext.Store(), result.Txn, result)
if err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"input": input,
"query": evalContext.ParsedQuery().String(),
"txn": result.TxnID,
}).Debug("Executing policy query.")
result.TxnID = result.Txn.ID()

err = constructPreparedQuery(evalContext, txn, result.Metrics, opts)
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"input": input,
"query": evalContext.ParsedQuery().String(),
"txn": result.TxnID,
}).Debug("Executing policy query.")

rs, err := evalContext.PreparedQuery().Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
rego.EvalMetrics(result.Metrics),
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
)
err = constructPreparedQuery(evalContext, result.Txn, result.Metrics, opts)
if err != nil {
return err
}

if err != nil {
return err
} else if len(rs) == 0 {
return fmt.Errorf("undefined decision")
} else if len(rs) > 1 {
return fmt.Errorf("multiple evaluation results")
}
var rs rego.ResultSet
rs, err = evalContext.PreparedQuery().Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(result.Txn),
rego.EvalMetrics(result.Metrics),
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
)

result.Decision = rs[0].Expressions[0].Value
return nil
})
if err != nil {
return err
} else if len(rs) == 0 {
return fmt.Errorf("undefined decision")
} else if len(rs) > 1 {
return fmt.Errorf("multiple evaluation results")
}

return err
result.Decision = rs[0].Expressions[0].Value
return nil
}

func constructPreparedQuery(evalContext EvalContext, txn storage.Transaction, m metrics.Metrics, opts []func(*rego.Rego)) error {
Expand Down
172 changes: 172 additions & 0 deletions envoyauth/evaluation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ package envoyauth
import (
"context"
"reflect"
"strings"
"sync"
"testing"

"github.com/open-policy-agent/opa/plugins/logs"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
iCache "github.com/open-policy-agent/opa/topdown/cache"
)

func TestGetRevisionLegacy(t *testing.T) {
Expand Down Expand Up @@ -102,3 +110,167 @@ func TestGetRevisionMulti(t *testing.T) {
}

}

func TestEval(t *testing.T) {
ctx := context.Background()
server, err := testAuthzServer()
if err != nil {
t.Fatal(err)
}

parsedBody := make(map[string]interface{})
parsedBody["firstname"] = "foo"
parsedBody["lastname"] = "bar"

input := make(map[string]interface{})
input["parsed_body"] = parsedBody

inputValue, err := ast.InterfaceToValue(input)
if err != nil {
t.Fatal(err)
}

err = Eval(ctx, server, inputValue, &EvalResult{})
if err != nil {
t.Fatal(err)
}

// include transaction in the result object
er := &EvalResult{}
var txn storage.Transaction
var txnClose TransactionCloser

txn, txnClose, err = er.GetTxn(ctx, server.Store())
if err != nil {
t.Fatal(err)
}

defer txnClose(ctx, err)
er.Txn = txn

err = Eval(ctx, server, inputValue, er)
if err != nil {
t.Fatal(err)
}
}

func testAuthzServer() (*mockExtAuthzGrpcServer, error) {

module := `
package envoy.authz
default allow = false
allow {
input.parsed_body.firstname == "foo"
input.parsed_body.lastname == "bar"
}`

ctx := context.Background()
store := inmem.New()
txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams)
store.UpsertPolicy(ctx, txn, "example.rego", []byte(module))
store.Commit(ctx, txn)

m, err := plugins.New([]byte{}, "test", store)
if err != nil {
return nil, err
}

m.Register("test_plugin", &testPlugin{})
config, err := logs.ParseConfig([]byte(`{"plugin": "test_plugin"}`), nil, []string{"test_plugin"})
if err != nil {
return nil, err
}

plugin := logs.New(config, m)
m.Register(logs.Name, plugin)

if err := m.Start(ctx); err != nil {
return nil, err
}

path := "envoy/authz/allow"
query := "data." + strings.Replace(path, "/", ".", -1)
parsedQuery, err := ast.ParseBody(query)
if err != nil {
return nil, err
}

cfg := Config{
Addr: ":0",
Path: path,
parsedQuery: parsedQuery,
}

return &mockExtAuthzGrpcServer{
cfg: cfg,
manager: m,
preparedQueryDoOnce: new(sync.Once),
// interQueryBuiltinCache: iCache.NewInterQueryCache(m.InterQueryBuiltinCacheConfig()),
}, nil
}

type Config struct {
Addr string `json:"addr"`
Path string `json:"path"`
parsedQuery ast.Body
}

type mockExtAuthzGrpcServer struct {
cfg Config
manager *plugins.Manager
preparedQuery *rego.PreparedEvalQuery
preparedQueryDoOnce *sync.Once
}

func (m *mockExtAuthzGrpcServer) ParsedQuery() ast.Body {
return m.cfg.parsedQuery
}

func (m *mockExtAuthzGrpcServer) Store() storage.Store {
return m.manager.Store
}

func (m *mockExtAuthzGrpcServer) Compiler() *ast.Compiler {
return m.manager.GetCompiler()
}

func (m *mockExtAuthzGrpcServer) Runtime() *ast.Term {
return m.manager.Info
}

func (m *mockExtAuthzGrpcServer) PreparedQueryDoOnce() *sync.Once {
return m.preparedQueryDoOnce
}

func (m *mockExtAuthzGrpcServer) InterQueryBuiltinCache() iCache.InterQueryCache {
return nil
}

func (m *mockExtAuthzGrpcServer) PreparedQuery() *rego.PreparedEvalQuery {
return m.preparedQuery
}

func (m *mockExtAuthzGrpcServer) SetPreparedQuery(pq *rego.PreparedEvalQuery) {
m.preparedQuery = pq
}

type testPlugin struct {
events []logs.EventV1
}

func (p *testPlugin) Start(context.Context) error {
return nil
}

func (p *testPlugin) Stop(context.Context) {
}

func (p *testPlugin) Reconfigure(context.Context, interface{}) {
}

func (p *testPlugin) Log(_ context.Context, event logs.EventV1) error {
p.events = append(p.events, event)
return nil
}
33 changes: 31 additions & 2 deletions envoyauth/response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package envoyauth

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -9,6 +10,7 @@ import (
ext_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/open-policy-agent/opa-envoy-plugin/internal/util"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/storage"
)

// EvalResult - Captures the result from evaluating a query against an input
Expand All @@ -19,11 +21,15 @@ type EvalResult struct {
TxnID uint64
Decision interface{}
Metrics metrics.Metrics
Txn storage.Transaction
}

// StopFunc should be called as soon as the evaluation is finished
type StopFunc = func()

// TransactionCloser should be called to abort the transaction
type TransactionCloser func(ctx context.Context, err error) error

// NewEvalResult creates a new EvalResult and a StopFunc that is used to stop the timer for metrics
func NewEvalResult() (*EvalResult, StopFunc, error) {
var err error
Expand All @@ -34,18 +40,41 @@ func NewEvalResult() (*EvalResult, StopFunc, error) {
er.DecisionID, err = util.UUID4()

if err != nil {
return nil, func() {}, err
return nil, nil, err
}

er.Metrics.Timer(metrics.ServerHandler).Start()

stop := func() {
er.Metrics.Timer(metrics.ServerHandler).Stop()
_ = er.Metrics.Timer(metrics.ServerHandler).Stop()
}

return &er, stop, nil
}

// GetTxn creates a read transaction suitable for the configured EvalResult object
func (result *EvalResult) GetTxn(ctx context.Context, store storage.Store) (storage.Transaction, TransactionCloser, error) {
params := storage.TransactionParams{}

noopCloser := func(ctx context.Context, err error) error {
return nil // no-op default
}

txn, err := store.NewTransaction(ctx, params)
if err != nil {
return nil, noopCloser, err
}

// Setup a closer function that will abort the transaction.
closer := func(ctx context.Context, txnErr error) error {
store.Abort(ctx, txn)
result.Txn = nil
return nil
}

return txn, closer, nil
}

func (result *EvalResult) invalidDecisionErr() error {
return fmt.Errorf("illegal value for policy evaluation result: %T", result.Decision)
}
Expand Down
13 changes: 13 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,23 @@ func (p *envoyExtAuthzGrpcServer) Check(ctx context.Context, req *ext_authz_v3.C

func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*ext_authz_v3.CheckResponse, func() *rpc_status.Status, error) {
var err error
var evalErr error
start := time.Now()

result, stopeval, err := envoyauth.NewEvalResult()
if err != nil {
logrus.WithField("err", err).Error("Unable to start new evaluation.")
return nil, func() *rpc_status.Status { return nil }, err
}

txn, txnClose, err := result.GetTxn(ctx, p.Store())
if err != nil {
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
return nil, func() *rpc_status.Status { return nil }, err
}

result.Txn = txn

logEntry := logrus.WithField("decision-id", result.DecisionID)

var input map[string]interface{}
Expand All @@ -281,11 +291,13 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*
stopeval()
logErr := p.log(ctx, input, result, err)
if logErr != nil {
_ = txnClose(ctx, logErr) // Ignore error
return &rpc_status.Status{
Code: int32(code.Code_UNKNOWN),
Message: logErr.Error(),
}
}
_ = txnClose(ctx, evalErr) // Ignore error
return nil
}

Expand All @@ -306,6 +318,7 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*

err = envoyauth.Eval(ctx, p, inputValue, result)
if err != nil {
evalErr = err
return nil, stop, err
}

Expand Down
Loading

0 comments on commit 67911b9

Please sign in to comment.