Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when bundle and decision logging enabled #279

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions envoyauth/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,57 @@ type EvalContext interface {

//Eval - Evaluates an input against a provided EvalContext and yields result
func Eval(ctx context.Context, evalContext EvalContext, input ast.Value, result *EvalResult, opts ...func(*rego.Rego)) error {
var err error

err := storage.Txn(ctx, evalContext.Store(), storage.TransactionParams{}, func(txn storage.Transaction) error {
err := getRevision(ctx, evalContext.Store(), txn, result)
if result.Txn == nil {
var txn storage.Transaction
var txnClose TransactionCloser
txn, txnClose, err = result.GetTxn(ctx, evalContext.Store())
if err != nil {
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
return err
}
defer txnClose(ctx, err)
result.Txn = txn
}

result.TxnID = txn.ID()
err = getRevision(ctx, evalContext.Store(), result.Txn, result)
if err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"input": input,
"query": evalContext.ParsedQuery().String(),
"txn": result.TxnID,
}).Debug("Executing policy query.")
result.TxnID = result.Txn.ID()

err = constructPreparedQuery(evalContext, txn, result.Metrics, opts)
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"input": input,
"query": evalContext.ParsedQuery().String(),
"txn": result.TxnID,
}).Debug("Executing policy query.")

rs, err := evalContext.PreparedQuery().Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(txn),
rego.EvalMetrics(result.Metrics),
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
)
err = constructPreparedQuery(evalContext, result.Txn, result.Metrics, opts)
if err != nil {
return err
}

if err != nil {
return err
} else if len(rs) == 0 {
return fmt.Errorf("undefined decision")
} else if len(rs) > 1 {
return fmt.Errorf("multiple evaluation results")
}
var rs rego.ResultSet
rs, err = evalContext.PreparedQuery().Eval(
ctx,
rego.EvalParsedInput(input),
rego.EvalTransaction(result.Txn),
rego.EvalMetrics(result.Metrics),
rego.EvalInterQueryBuiltinCache(evalContext.InterQueryBuiltinCache()),
)

result.Decision = rs[0].Expressions[0].Value
return nil
})
if err != nil {
return err
} else if len(rs) == 0 {
return fmt.Errorf("undefined decision")
} else if len(rs) > 1 {
return fmt.Errorf("multiple evaluation results")
}

return err
result.Decision = rs[0].Expressions[0].Value
return nil
}

func constructPreparedQuery(evalContext EvalContext, txn storage.Transaction, m metrics.Metrics, opts []func(*rego.Rego)) error {
Expand Down
172 changes: 172 additions & 0 deletions envoyauth/evaluation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@ package envoyauth
import (
"context"
"reflect"
"strings"
"sync"
"testing"

"github.com/open-policy-agent/opa/plugins/logs"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
iCache "github.com/open-policy-agent/opa/topdown/cache"
)

func TestGetRevisionLegacy(t *testing.T) {
Expand Down Expand Up @@ -102,3 +110,167 @@ func TestGetRevisionMulti(t *testing.T) {
}

}

func TestEval(t *testing.T) {
ctx := context.Background()
server, err := testAuthzServer()
if err != nil {
t.Fatal(err)
}

parsedBody := make(map[string]interface{})
parsedBody["firstname"] = "foo"
parsedBody["lastname"] = "bar"

input := make(map[string]interface{})
input["parsed_body"] = parsedBody

inputValue, err := ast.InterfaceToValue(input)
if err != nil {
t.Fatal(err)
}

err = Eval(ctx, server, inputValue, &EvalResult{})
if err != nil {
t.Fatal(err)
}

// include transaction in the result object
er := &EvalResult{}
var txn storage.Transaction
var txnClose TransactionCloser

txn, txnClose, err = er.GetTxn(ctx, server.Store())
if err != nil {
t.Fatal(err)
}

defer txnClose(ctx, err)
er.Txn = txn

err = Eval(ctx, server, inputValue, er)
if err != nil {
t.Fatal(err)
}
}

func testAuthzServer() (*mockExtAuthzGrpcServer, error) {

module := `
package envoy.authz

default allow = false

allow {
input.parsed_body.firstname == "foo"
input.parsed_body.lastname == "bar"
}`

ctx := context.Background()
store := inmem.New()
txn := storage.NewTransactionOrDie(ctx, store, storage.WriteParams)
store.UpsertPolicy(ctx, txn, "example.rego", []byte(module))
store.Commit(ctx, txn)

m, err := plugins.New([]byte{}, "test", store)
if err != nil {
return nil, err
}

m.Register("test_plugin", &testPlugin{})
config, err := logs.ParseConfig([]byte(`{"plugin": "test_plugin"}`), nil, []string{"test_plugin"})
if err != nil {
return nil, err
}

plugin := logs.New(config, m)
m.Register(logs.Name, plugin)

if err := m.Start(ctx); err != nil {
return nil, err
}

path := "envoy/authz/allow"
query := "data." + strings.Replace(path, "/", ".", -1)
parsedQuery, err := ast.ParseBody(query)
if err != nil {
return nil, err
}

cfg := Config{
Addr: ":0",
Path: path,
parsedQuery: parsedQuery,
}

return &mockExtAuthzGrpcServer{
cfg: cfg,
manager: m,
preparedQueryDoOnce: new(sync.Once),
// interQueryBuiltinCache: iCache.NewInterQueryCache(m.InterQueryBuiltinCacheConfig()),
}, nil
}

type Config struct {
Addr string `json:"addr"`
Path string `json:"path"`
parsedQuery ast.Body
}

type mockExtAuthzGrpcServer struct {
cfg Config
manager *plugins.Manager
preparedQuery *rego.PreparedEvalQuery
preparedQueryDoOnce *sync.Once
}

func (m *mockExtAuthzGrpcServer) ParsedQuery() ast.Body {
return m.cfg.parsedQuery
}

func (m *mockExtAuthzGrpcServer) Store() storage.Store {
return m.manager.Store
}

func (m *mockExtAuthzGrpcServer) Compiler() *ast.Compiler {
return m.manager.GetCompiler()
}

func (m *mockExtAuthzGrpcServer) Runtime() *ast.Term {
return m.manager.Info
}

func (m *mockExtAuthzGrpcServer) PreparedQueryDoOnce() *sync.Once {
return m.preparedQueryDoOnce
}

func (m *mockExtAuthzGrpcServer) InterQueryBuiltinCache() iCache.InterQueryCache {
return nil
}

func (m *mockExtAuthzGrpcServer) PreparedQuery() *rego.PreparedEvalQuery {
return m.preparedQuery
}

func (m *mockExtAuthzGrpcServer) SetPreparedQuery(pq *rego.PreparedEvalQuery) {
m.preparedQuery = pq
}

type testPlugin struct {
events []logs.EventV1
}

func (p *testPlugin) Start(context.Context) error {
return nil
}

func (p *testPlugin) Stop(context.Context) {
}

func (p *testPlugin) Reconfigure(context.Context, interface{}) {
}

func (p *testPlugin) Log(_ context.Context, event logs.EventV1) error {
p.events = append(p.events, event)
return nil
}
33 changes: 31 additions & 2 deletions envoyauth/response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package envoyauth

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -9,6 +10,7 @@ import (
ext_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/open-policy-agent/opa-envoy-plugin/internal/util"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/storage"
)

// EvalResult - Captures the result from evaluating a query against an input
Expand All @@ -19,11 +21,15 @@ type EvalResult struct {
TxnID uint64
Decision interface{}
Metrics metrics.Metrics
Txn storage.Transaction
}

// StopFunc should be called as soon as the evaluation is finished
type StopFunc = func()

// TransactionCloser should be called to abort the transaction
type TransactionCloser func(ctx context.Context, err error) error

// NewEvalResult creates a new EvalResult and a StopFunc that is used to stop the timer for metrics
func NewEvalResult() (*EvalResult, StopFunc, error) {
var err error
Expand All @@ -34,18 +40,41 @@ func NewEvalResult() (*EvalResult, StopFunc, error) {
er.DecisionID, err = util.UUID4()

if err != nil {
return nil, func() {}, err
return nil, nil, err
}

er.Metrics.Timer(metrics.ServerHandler).Start()

stop := func() {
er.Metrics.Timer(metrics.ServerHandler).Stop()
_ = er.Metrics.Timer(metrics.ServerHandler).Stop()
}

return &er, stop, nil
}

// GetTxn creates a read transaction suitable for the configured EvalResult object
func (result *EvalResult) GetTxn(ctx context.Context, store storage.Store) (storage.Transaction, TransactionCloser, error) {
params := storage.TransactionParams{}

noopCloser := func(ctx context.Context, err error) error {
return nil // no-op default
}

txn, err := store.NewTransaction(ctx, params)
if err != nil {
return nil, noopCloser, err
}

// Setup a closer function that will abort the transaction.
closer := func(ctx context.Context, txnErr error) error {
store.Abort(ctx, txn)
result.Txn = nil
return nil
}

return txn, closer, nil
}

func (result *EvalResult) invalidDecisionErr() error {
return fmt.Errorf("illegal value for policy evaluation result: %T", result.Decision)
}
Expand Down
13 changes: 13 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,23 @@ func (p *envoyExtAuthzGrpcServer) Check(ctx context.Context, req *ext_authz_v3.C

func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*ext_authz_v3.CheckResponse, func() *rpc_status.Status, error) {
var err error
var evalErr error
start := time.Now()

result, stopeval, err := envoyauth.NewEvalResult()
if err != nil {
logrus.WithField("err", err).Error("Unable to start new evaluation.")
return nil, func() *rpc_status.Status { return nil }, err
}

txn, txnClose, err := result.GetTxn(ctx, p.Store())
if err != nil {
logrus.WithField("err", err).Error("Unable to start new storage transaction.")
return nil, func() *rpc_status.Status { return nil }, err
}

result.Txn = txn

logEntry := logrus.WithField("decision-id", result.DecisionID)

var input map[string]interface{}
Expand All @@ -281,11 +291,13 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*
stopeval()
logErr := p.log(ctx, input, result, err)
if logErr != nil {
_ = txnClose(ctx, logErr) // Ignore error
return &rpc_status.Status{
Code: int32(code.Code_UNKNOWN),
Message: logErr.Error(),
}
}
_ = txnClose(ctx, evalErr) // Ignore error
return nil
}

Expand All @@ -306,6 +318,7 @@ func (p *envoyExtAuthzGrpcServer) check(ctx context.Context, req interface{}) (*

err = envoyauth.Eval(ctx, p, inputValue, result)
if err != nil {
evalErr = err
return nil, stop, err
}

Expand Down
Loading