Skip to content

Commit

Permalink
Fix deadlock when bundle and decision logging enabled
Browse files Browse the repository at this point in the history
This commit attempts to fix the deadlock that happens
when bundle and decision logging are both enabled.

The opa-envoy plugin creates a new transaction during
query evaluation and closes it once eval is complete.
Then when it attempts to log the decision, the decision
log plugin grabs mask mutex and calls the PrepareForEval
function in the rego package which tries to open a new
read transaction on the store since the log plugin does
not provide one. This call gets blocked
if concurrently the bundle plugin has a write transaction
open on the store. This write invokes the decision log plugin's callback
and tries to grab the mask mutex. This call gets blocked because
the decision log plugin is already holding onto it for the mask query.

To avoid this, we keep the transaction open in the opa-envoy plugin
till we log the decision.

Fixes: open-policy-agent/opa#3722

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Aug 30, 2021
1 parent c39751f commit f3b5dc6
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 36 deletions.
64 changes: 30 additions & 34 deletions envoyauth/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,42 @@ type EvalContext interface {
//Eval - Evaluates an input against a provided EvalContext and yields result
func Eval(ctx context.Context, evalContext EvalContext, input ast.Value, result *EvalResult, opts ...func(*rego.Rego)) error {

err := storage.Txn(ctx, evalContext.Store(), storage.TransactionParams{}, func(txn storage.Transaction) error {
err := getRevision(ctx, evalContext.Store(), txn, result)
if err != nil {
return err
}

result.TxnID = txn.ID()
err := getRevision(ctx, evalContext.Store(), result.Txn, result)
if err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"input": input,
"query": evalContext.ParsedQuery().String(),
"txn": result.TxnID,
}).Debug("Executing policy query.")
result.TxnID = result.Txn.ID()

err = constructPreparedQuery(evalContext, txn, result.Metrics, opts)
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"input": input,
"query": evalContext.ParsedQuery().String(),
"txn": result.TxnID,
}).Debug("Executing policy query.")

rs, err := evalContext.PreparedQuery().Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
rego.EvalMetrics(result.Metrics),
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
)
err = constructPreparedQuery(evalContext, result.Txn, result.Metrics, opts)
if err != nil {
return err
}

if err != nil {
return err
} else if len(rs) == 0 {
return fmt.Errorf("undefined decision")
} else if len(rs) > 1 {
return fmt.Errorf("multiple evaluation results")
}
rs, err := evalContext.PreparedQuery().Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(result.Txn),
rego.EvalMetrics(result.Metrics),
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
)

result.Decision = rs[0].Expressions[0].Value
return nil
})
if err != nil {
return err
} else if len(rs) == 0 {
return fmt.Errorf("undefined decision")
} else if len(rs) > 1 {
return fmt.Errorf("multiple evaluation results")
}

return err
result.Decision = rs[0].Expressions[0].Value
return nil
}

func constructPreparedQuery(evalContext EvalContext, txn storage.Transaction, m metrics.Metrics, opts []func(*rego.Rego)) error {
Expand Down
11 changes: 9 additions & 2 deletions envoyauth/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
ext_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/open-policy-agent/opa-envoy-plugin/internal/util"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/storage"
)

// EvalResult - Captures the result from evaluating a query against an input
Expand All @@ -19,6 +20,7 @@ type EvalResult struct {
TxnID uint64
Decision interface{}
Metrics metrics.Metrics
Txn storage.Transaction
}

// StopFunc should be called as soon as the evaluation is finished
Expand All @@ -34,18 +36,23 @@ func NewEvalResult() (*EvalResult, StopFunc, error) {
er.DecisionID, err = util.UUID4()

if err != nil {
return nil, func() {}, err
return nil, nil, err
}

er.Metrics.Timer(metrics.ServerHandler).Start()

stop := func() {
er.Metrics.Timer(metrics.ServerHandler).Stop()
_ = er.Metrics.Timer(metrics.ServerHandler).Stop()
}

return &er, stop, nil
}

// WithTxn sets the transaction on the evaluation result
func (result *EvalResult) WithTxn(txn storage.Transaction) {
result.Txn = txn
}

func (result *EvalResult) invalidDecisionErr() error {
return fmt.Errorf("illegal value for policy evaluation result: %T", result.Decision)
}
Expand Down
45 changes: 45 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,38 @@ func (p *envoyExtAuthzGrpcServer) listen() {
p.manager.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateNotReady})
}

type transactionCloser func(ctx context.Context, result *envoyauth.EvalResult, err error) error

func (p *envoyExtAuthzGrpcServer) getTxn(ctx context.Context) (storage.Transaction, transactionCloser, error) {
params := storage.TransactionParams{}

noopCloser := func(ctx context.Context, result *envoyauth.EvalResult, err error) error {
return nil // no-op default
}

txn, err := p.Store().NewTransaction(ctx, params)
if err != nil {
return nil, noopCloser, err
}

// Setup a closer function that will abort or commit as needed.
closer := func(ctx context.Context, result *envoyauth.EvalResult, txnErr error) error {
var err error

if txnErr != nil {
p.Store().Abort(ctx, txn)
} else {
err = p.Store().Commit(ctx, txn)
}

result.WithTxn(nil)

return err
}

return txn, closer, nil
}

// Check is envoy.service.auth.v3.Authorization/Check
func (p *envoyExtAuthzGrpcServer) Check(ctx context.Context, req *ext_authz_v3.CheckRequest) (*ext_authz_v3.CheckResponse, error) {
resp, stop, err := p.check(ctx, req)
Expand All @@ -266,13 +298,23 @@ func (p *envoyExtAuthzGrpcServer) Check(ctx context.Context, req *ext_authz_v3.C

func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*ext_authz_v3.CheckResponse, func() *rpc_status.Status, error) {
var err error
var evalErr error
start := time.Now()

result, stopeval, err := envoyauth.NewEvalResult()
if err != nil {
logrus.WithField("err", err).Error("Unable to start new evaluation.")
return nil, func() *rpc_status.Status { return nil }, err
}

txn, txnClose, err := p.getTxn(ctx)
if err != nil {
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
return nil, func() *rpc_status.Status { return nil }, err
}

result.WithTxn(txn)

logEntry := logrus.WithField("decision-id", result.DecisionID)

var input map[string]interface{}
Expand All @@ -281,11 +323,13 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*
stopeval()
logErr := p.log(ctx, input, result, err)
if logErr != nil {
_ = txnClose(ctx, result, logErr) // Ignore error
return &rpc_status.Status{
Code: int32(code.Code_UNKNOWN),
Message: logErr.Error(),
}
}
_ = txnClose(ctx, result, evalErr) // Ignore error
return nil
}

Expand All @@ -306,6 +350,7 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*

err = envoyauth.Eval(ctx, p, inputValue, result)
if err != nil {
evalErr = err
return nil, stop, err
}

Expand Down
1 change: 1 addition & 0 deletions opa/decisionlog/decision_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func LogDecision(ctx context.Context, manager *plugins.Manager, info *server.Inf

info.DecisionID = result.DecisionID
info.Metrics = result.Metrics
info.Txn = result.Txn

if err != nil {
switch err.(type) {
Expand Down

0 comments on commit f3b5dc6

Please sign in to comment.