Skip to content

Commit

Permalink
SDK: support partial eval (#4240)
Browse files Browse the repository at this point in the history
Adding the ability to partially evaluate when using the the SDK as a go library.
This allows for utilizing the existing OPA configuration (e.g. bundle, decisions,
etc) when partially evaluating.

Signed-off-by: Kurt Roekle <kroekle@gmail.com>
  • Loading branch information
kroekle authored Mar 11, 2022
1 parent 6aa34bd commit f42b2db
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 80 deletions.
61 changes: 36 additions & 25 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,21 @@ type Logger interface {
// the struct. Any changes here MUST be reflected in the AST()
// implementation below.
type EventV1 struct {
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
MappedResult *interface{} `json:"mapped_result,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`

inputAST ast.Value
}
Expand Down Expand Up @@ -85,6 +86,7 @@ var pathKey = ast.StringTerm("path")
var queryKey = ast.StringTerm("query")
var inputKey = ast.StringTerm("input")
var resultKey = ast.StringTerm("result")
var mappedResultKey = ast.StringTerm("mapped_result")
var erasedKey = ast.StringTerm("erased")
var maskedKey = ast.StringTerm("masked")
var errorKey = ast.StringTerm("error")
Expand Down Expand Up @@ -149,6 +151,14 @@ func (e *EventV1) AST() (ast.Value, error) {
event.Insert(resultKey, ast.NewTerm(results))
}

if e.MappedResult != nil {
mResults, err := roundtripJSONToAST(e.MappedResult)
if err != nil {
return nil, err
}
event.Insert(mappedResultKey, ast.NewTerm(mResults))
}

if len(e.Erased) > 0 {
erased := make([]*ast.Term, len(e.Erased))
for i, v := range e.Erased {
Expand Down Expand Up @@ -547,17 +557,18 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
}

event := EventV1{
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
MappedResult: decision.MappedResults,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
}

if decision.Metrics != nil {
Expand Down
17 changes: 17 additions & 0 deletions sdk/RawMapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sdk

import (
"github.com/open-policy-agent/opa/rego"
)

type RawMapper struct {
}

func (e *RawMapper) MapResults(pq *rego.PartialQueries) (interface{}, error) {

return pq, nil
}

func (e *RawMapper) ResultToJSON(results interface{}) (interface{}, error) {
return results, nil
}
217 changes: 176 additions & 41 deletions sdk/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,64 @@ func (opa *OPA) Stop(ctx context.Context) {
// Decision returns a named decision. This function is threadsafe.
func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*DecisionResult, error) {

record := server.Info{
Timestamp: options.Now,
Path: options.Path,
Input: &options.Input,
}

result, err := opa.executeTransaction(
ctx,
&record,
func(result *DecisionResult) {
result.Result, record.InputAST, record.Bundles, record.Error = evaluate(ctx, evalArgs{
runtime: opa.state.manager.Info,
printHook: opa.state.manager.PrintHook(),
compiler: opa.state.manager.GetCompiler(),
store: opa.state.manager.Store,
txn: record.Txn,
queryCache: opa.state.queryCache,
interQueryCache: opa.state.interQueryBuiltinCache,
now: record.Timestamp,
path: record.Path,
input: *record.Input,
m: record.Metrics,
})
if record.Error == nil {
record.Results = &result.Result
}
},
)
if err != nil {
return nil, err
}

return result, record.Error
}

// DecisionOptions contains parameters for query evaluation.
type DecisionOptions struct {
Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc.
Path string // specifies name of policy decision to evaluate (e.g., example/allow)
Input interface{} // specifies value of the input document to evaluate policy with
}

// DecisionResult contains the output of query evaluation.
type DecisionResult struct {
ID string // provides a globally unique identifier for this decision (which is included in the decision log.)
Result interface{} // provides the output of query evaluation.
}

func newDecisionResult() (*DecisionResult, error) {
id, err := uuid.New(rand.Reader)
if err != nil {
return nil, err
}
result := &DecisionResult{ID: id}
return result, nil
}

func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, work func(result *DecisionResult)) (*DecisionResult, error) {
m := metrics.New()
m.Timer(metrics.SDKDecisionEval).Start()

Expand All @@ -227,13 +285,8 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio
s := *opa.state
opa.mtx.Unlock()

record := server.Info{
DecisionID: result.ID,
Timestamp: options.Now,
Path: options.Path,
Input: &options.Input,
Metrics: m,
}
record.DecisionID = result.ID
record.Metrics = m

if record.Timestamp.IsZero() {
record.Timestamp = time.Now().UTC()
Expand All @@ -247,55 +300,93 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio

if record.Error == nil {
defer s.manager.Store.Abort(ctx, record.Txn)
result.Result, record.InputAST, record.Bundles, record.Error = evaluate(ctx, evalArgs{
runtime: s.manager.Info,
printHook: s.manager.PrintHook(),
compiler: s.manager.GetCompiler(),
store: s.manager.Store,
txn: record.Txn,
queryCache: s.queryCache,
interQueryCache: s.interQueryBuiltinCache,
now: record.Timestamp,
path: record.Path,
input: *record.Input,
m: record.Metrics,
})
if record.Error == nil {
record.Results = &result.Result
}
work(result)
}

m.Timer(metrics.SDKDecisionEval).Stop()

if logger := logs.Lookup(s.manager); logger != nil {
if err := logger.Log(ctx, &record); err != nil {
if err := logger.Log(ctx, record); err != nil {
return result, fmt.Errorf("decision log: %w", err)
}
}

return result, record.Error
return result, nil
}

// DecisionOptions contains parameters for query evaluation.
type DecisionOptions struct {
Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc.
Path string // specifies name of policy decision to evaluate (e.g., example/allow)
Input interface{} // specifies value of the input document to evaluate policy with
}
// Partial returns a named decision. This function is threadsafe.
func (opa *OPA) Partial(ctx context.Context, options PartialOptions) (*PartialResult, error) {

// DecisionResult contains the output of query evaluation.
type DecisionResult struct {
ID string // provides a globally unique identifier for this decision (which is included in the decision log.)
Result interface{} // provides the output of query evaluation.
}
if options.Mapper == nil {
options.Mapper = &RawMapper{}
}

func newDecisionResult() (*DecisionResult, error) {
id, err := uuid.New(rand.Reader)
record := server.Info{
Timestamp: options.Now,
Input: &options.Input,
Query: options.Query,
}

var pq *rego.PartialQueries
decision, err := opa.executeTransaction(
ctx,
&record,
func(result *DecisionResult) {
pq, record.InputAST, record.Bundles, record.Error = partial(ctx, partialEvalArgs{
runtime: opa.state.manager.Info,
printHook: opa.state.manager.PrintHook(),
compiler: opa.state.manager.GetCompiler(),
store: opa.state.manager.Store,
txn: record.Txn,
now: record.Timestamp,
query: record.Query,
unknowns: options.Unknowns,
input: *record.Input,
m: record.Metrics,
})
if record.Error == nil {
result.Result, record.Error = options.Mapper.MapResults(pq)
var pqAst interface{}
if record.Error == nil {
var mappedResults interface{}
mappedResults, record.Error = options.Mapper.ResultToJSON(result.Result)
record.MappedResults = &mappedResults
pqAst = pq
record.Results = &pqAst
}
}
},
)
if err != nil {
return nil, err
}
result := &DecisionResult{ID: id}
return result, nil

return &PartialResult{
ID: decision.ID,
Result: decision.Result,
AST: pq,
}, record.Error
}

type PartialQueryMapper interface {
// The first interface being returned is the type that will be used for further processing
MapResults(pq *rego.PartialQueries) (interface{}, error)
// This should be able to take the Result object from MapResults and return a type that can be logged as JSON
ResultToJSON(result interface{}) (interface{}, error)
}

// PartialOptions contains parameters for partial query evaluation.
type PartialOptions struct {
Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc.
Input interface{} // specifies value of the input document to evaluate policy with
Query string // specifies the query to be partially evaluated
Unknowns []string // specifies the unknown elements of the policy
Mapper PartialQueryMapper // specifies the mapper to use when processing results
}

type PartialResult struct {
ID string // decision ID
Result interface{} // mapped result
AST *rego.PartialQueries // raw result
}

// Error represents an internal error in the SDK.
Expand Down Expand Up @@ -393,6 +484,50 @@ func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[s
return rs[0].Expressions[0].Value, inputAST, bundles, nil
}

type partialEvalArgs struct {
runtime *ast.Term
compiler *ast.Compiler
printHook print.Hook
store storage.Store
txn storage.Transaction
unknowns []string
query string
now time.Time
input interface{}
m metrics.Metrics
}

func partial(ctx context.Context, args partialEvalArgs) (*rego.PartialQueries, ast.Value, map[string]server.BundleInfo, error) {

bundles, err := bundles(ctx, args.store, args.txn)
if err != nil {
return nil, nil, nil, err
}

inputAST, err := ast.InterfaceToValue(args.input)
if err != nil {
return nil, nil, bundles, err
}
re := rego.New(
rego.Time(args.now),
rego.Metrics(args.m),
rego.Store(args.store),
rego.Compiler(args.compiler),
rego.Transaction(args.txn),
rego.Runtime(args.runtime),
rego.Input(args.input),
rego.Query(args.query),
rego.Unknowns(args.unknowns),
rego.PrintHook(args.printHook),
)

pq, err := re.Partial(ctx)
if err != nil {
return nil, nil, bundles, err
}
return pq, inputAST, bundles, err
}

type queryCache struct {
sync.Mutex
cache map[string]*rego.PreparedEvalQuery
Expand Down
Loading

0 comments on commit f42b2db

Please sign in to comment.