Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding partial eval to SDK #4240

Merged
merged 3 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions plugins/logs/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,21 @@ type Logger interface {
// the struct. Any changes here MUST be reflected in the AST()
// implementation below.
type EventV1 struct {
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`
Labels map[string]string `json:"labels"`
DecisionID string `json:"decision_id"`
Revision string `json:"revision,omitempty"` // Deprecated: Use Bundles instead
Bundles map[string]BundleInfoV1 `json:"bundles,omitempty"`
Path string `json:"path,omitempty"`
Query string `json:"query,omitempty"`
Input *interface{} `json:"input,omitempty"`
Result *interface{} `json:"result,omitempty"`
MappedResult *interface{} `json:"mapped_result,omitempty"`
Erased []string `json:"erased,omitempty"`
Masked []string `json:"masked,omitempty"`
Error error `json:"error,omitempty"`
RequestedBy string `json:"requested_by,omitempty"`
Timestamp time.Time `json:"timestamp"`
Metrics map[string]interface{} `json:"metrics,omitempty"`

inputAST ast.Value
}
Expand Down Expand Up @@ -85,6 +86,7 @@ var pathKey = ast.StringTerm("path")
var queryKey = ast.StringTerm("query")
var inputKey = ast.StringTerm("input")
var resultKey = ast.StringTerm("result")
var mappedResultKey = ast.StringTerm("mapped_result")
var erasedKey = ast.StringTerm("erased")
var maskedKey = ast.StringTerm("masked")
var errorKey = ast.StringTerm("error")
Expand Down Expand Up @@ -149,6 +151,14 @@ func (e *EventV1) AST() (ast.Value, error) {
event.Insert(resultKey, ast.NewTerm(results))
}

if e.MappedResult != nil {
mResults, err := roundtripJSONToAST(e.MappedResult)
if err != nil {
return nil, err
}
event.Insert(mappedResultKey, ast.NewTerm(mResults))
}

if len(e.Erased) > 0 {
erased := make([]*ast.Term, len(e.Erased))
for i, v := range e.Erased {
Expand Down Expand Up @@ -547,17 +557,18 @@ func (p *Plugin) Log(ctx context.Context, decision *server.Info) error {
}

event := EventV1{
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
Labels: p.manager.Labels(),
DecisionID: decision.DecisionID,
Revision: decision.Revision,
Bundles: bundles,
Path: decision.Path,
Query: decision.Query,
Input: decision.Input,
Result: decision.Results,
MappedResult: decision.MappedResults,
RequestedBy: decision.RemoteAddr,
Timestamp: decision.Timestamp,
inputAST: decision.InputAST,
}

if decision.Metrics != nil {
Expand Down
17 changes: 17 additions & 0 deletions sdk/RawMapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sdk

import (
"github.com/open-policy-agent/opa/rego"
)

type RawMapper struct {
}

func (e *RawMapper) MapResults(pq *rego.PartialQueries) (interface{}, error) {

return pq, nil
}

func (e *RawMapper) ResultToJSON(results interface{}) (interface{}, error) {
return results, nil
}
217 changes: 176 additions & 41 deletions sdk/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,64 @@ func (opa *OPA) Stop(ctx context.Context) {
// Decision returns a named decision. This function is threadsafe.
func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*DecisionResult, error) {

record := server.Info{
Timestamp: options.Now,
Path: options.Path,
Input: &options.Input,
}

result, err := opa.executeTransaction(
ctx,
&record,
func(result *DecisionResult) {
result.Result, record.InputAST, record.Bundles, record.Error = evaluate(ctx, evalArgs{
runtime: opa.state.manager.Info,
printHook: opa.state.manager.PrintHook(),
compiler: opa.state.manager.GetCompiler(),
store: opa.state.manager.Store,
txn: record.Txn,
queryCache: opa.state.queryCache,
interQueryCache: opa.state.interQueryBuiltinCache,
now: record.Timestamp,
path: record.Path,
input: *record.Input,
m: record.Metrics,
})
if record.Error == nil {
record.Results = &result.Result
}
},
)
if err != nil {
return nil, err
}

return result, record.Error
}

// DecisionOptions contains parameters for query evaluation.
type DecisionOptions struct {
Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc.
Path string // specifies name of policy decision to evaluate (e.g., example/allow)
Input interface{} // specifies value of the input document to evaluate policy with
}

// DecisionResult contains the output of query evaluation.
type DecisionResult struct {
ID string // provides a globally unique identifier for this decision (which is included in the decision log.)
Result interface{} // provides the output of query evaluation.
}

func newDecisionResult() (*DecisionResult, error) {
id, err := uuid.New(rand.Reader)
if err != nil {
return nil, err
}
result := &DecisionResult{ID: id}
return result, nil
}

func (opa *OPA) executeTransaction(ctx context.Context, record *server.Info, work func(result *DecisionResult)) (*DecisionResult, error) {
m := metrics.New()
m.Timer(metrics.SDKDecisionEval).Start()

Expand All @@ -227,13 +285,8 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio
s := *opa.state
opa.mtx.Unlock()

record := server.Info{
DecisionID: result.ID,
Timestamp: options.Now,
Path: options.Path,
Input: &options.Input,
Metrics: m,
}
record.DecisionID = result.ID
record.Metrics = m

if record.Timestamp.IsZero() {
record.Timestamp = time.Now().UTC()
Expand All @@ -247,55 +300,93 @@ func (opa *OPA) Decision(ctx context.Context, options DecisionOptions) (*Decisio

if record.Error == nil {
defer s.manager.Store.Abort(ctx, record.Txn)
result.Result, record.InputAST, record.Bundles, record.Error = evaluate(ctx, evalArgs{
runtime: s.manager.Info,
printHook: s.manager.PrintHook(),
compiler: s.manager.GetCompiler(),
store: s.manager.Store,
txn: record.Txn,
queryCache: s.queryCache,
interQueryCache: s.interQueryBuiltinCache,
now: record.Timestamp,
path: record.Path,
input: *record.Input,
m: record.Metrics,
})
if record.Error == nil {
record.Results = &result.Result
}
work(result)
}

m.Timer(metrics.SDKDecisionEval).Stop()

if logger := logs.Lookup(s.manager); logger != nil {
if err := logger.Log(ctx, &record); err != nil {
if err := logger.Log(ctx, record); err != nil {
return result, fmt.Errorf("decision log: %w", err)
}
}

return result, record.Error
return result, nil
}

// DecisionOptions contains parameters for query evaluation.
type DecisionOptions struct {
Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc.
Path string // specifies name of policy decision to evaluate (e.g., example/allow)
Input interface{} // specifies value of the input document to evaluate policy with
}
// Partial returns a named decision. This function is threadsafe.
func (opa *OPA) Partial(ctx context.Context, options PartialOptions) (*PartialResult, error) {

// DecisionResult contains the output of query evaluation.
type DecisionResult struct {
ID string // provides a globally unique identifier for this decision (which is included in the decision log.)
Result interface{} // provides the output of query evaluation.
}
if options.Mapper == nil {
options.Mapper = &RawMapper{}
}

func newDecisionResult() (*DecisionResult, error) {
id, err := uuid.New(rand.Reader)
record := server.Info{
Timestamp: options.Now,
Input: &options.Input,
Query: options.Query,
}

var pq *rego.PartialQueries
decision, err := opa.executeTransaction(
ctx,
&record,
func(result *DecisionResult) {
pq, record.InputAST, record.Bundles, record.Error = partial(ctx, partialEvalArgs{
runtime: opa.state.manager.Info,
printHook: opa.state.manager.PrintHook(),
compiler: opa.state.manager.GetCompiler(),
store: opa.state.manager.Store,
txn: record.Txn,
now: record.Timestamp,
query: record.Query,
unknowns: options.Unknowns,
input: *record.Input,
m: record.Metrics,
})
if record.Error == nil {
result.Result, record.Error = options.Mapper.MapResults(pq)
var pqAst interface{}
if record.Error == nil {
var mappedResults interface{}
mappedResults, record.Error = options.Mapper.ResultToJSON(result.Result)
record.MappedResults = &mappedResults
pqAst = pq
record.Results = &pqAst
}
}
},
)
if err != nil {
return nil, err
}
result := &DecisionResult{ID: id}
return result, nil

return &PartialResult{
ID: decision.ID,
Result: decision.Result,
AST: pq,
}, record.Error
}

type PartialQueryMapper interface {
// The first interface being returned is the type that will be used for further processing
MapResults(pq *rego.PartialQueries) (interface{}, error)
// This should be able to take the Result object from MapResults and return a type that can be logged as JSON
ResultToJSON(result interface{}) (interface{}, error)
}

// PartialOptions contains parameters for partial query evaluation.
type PartialOptions struct {
Now time.Time // specifies wallclock time used for time.now_ns(), decision log timestamp, etc.
Input interface{} // specifies value of the input document to evaluate policy with
Query string // specifies the query to be partially evaluated
Unknowns []string // specifies the unknown elements of the policy
Mapper PartialQueryMapper // specifies the mapper to use when processing results
}

type PartialResult struct {
ID string // decision ID
Result interface{} // mapped result
AST *rego.PartialQueries // raw result
}

// Error represents an internal error in the SDK.
Expand Down Expand Up @@ -393,6 +484,50 @@ func evaluate(ctx context.Context, args evalArgs) (interface{}, ast.Value, map[s
return rs[0].Expressions[0].Value, inputAST, bundles, nil
}

type partialEvalArgs struct {
runtime *ast.Term
compiler *ast.Compiler
printHook print.Hook
store storage.Store
txn storage.Transaction
unknowns []string
query string
now time.Time
input interface{}
m metrics.Metrics
}

func partial(ctx context.Context, args partialEvalArgs) (*rego.PartialQueries, ast.Value, map[string]server.BundleInfo, error) {

bundles, err := bundles(ctx, args.store, args.txn)
if err != nil {
return nil, nil, nil, err
}

inputAST, err := ast.InterfaceToValue(args.input)
if err != nil {
return nil, nil, bundles, err
}
re := rego.New(
kroekle marked this conversation as resolved.
Show resolved Hide resolved
rego.Time(args.now),
rego.Metrics(args.m),
rego.Store(args.store),
rego.Compiler(args.compiler),
rego.Transaction(args.txn),
rego.Runtime(args.runtime),
rego.Input(args.input),
rego.Query(args.query),
rego.Unknowns(args.unknowns),
rego.PrintHook(args.printHook),
)

pq, err := re.Partial(ctx)
if err != nil {
return nil, nil, bundles, err
}
return pq, inputAST, bundles, err
}

type queryCache struct {
sync.Mutex
cache map[string]*rego.PreparedEvalQuery
Expand Down
Loading