Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: add evaluation state dump debugging feature (
Browse files Browse the repository at this point in the history
#41335)

This allows a program to conditionally dump a complete set of evaluation states
from a program in the case that it fails, allowing a trace of execution leading
to the failure.
  • Loading branch information
efd6 authored Dec 3, 2024
1 parent 444b8e4 commit 0631d8b
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]

*Auditbeat*

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,26 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[float]
==== `failure_dump.enabled`

It is possible to log CEL program evaluation failures to a local file-system for debugging configurations.
This option is enabled by setting `failure_dump.enabled` to true and setting the `failure_dump.filename` value.
To delete existing failure dumps, set `failure_dump.enabled` to false without unsetting the filename option.

Enabling this option compromises security and should only be used for debugging.

[float]
==== `failure_dump.filename`

This specifies a directory path to write failure dumps to. If it is not empty and a CEL program evaluation fails,
the complete set of states for the CEL program's evaluation will be written as a JSON file, along with the error
that was reported. This option should only be used when debugging a failure as it imposes a significant performance
impact on the input and may potentially use large quantities of memory to hold the full set of states. If a failure
dump is configured, it is recommended that data input sizes be reduced to avoid excessive memory consumption, and
making dumps that are intractable to analysis. To delete existing failure dumps, set `failure_dump.enabled` to
false without unsetting the filename option.

[float]
=== Metrics

Expand Down
19 changes: 18 additions & 1 deletion x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type config struct {
// Resource is the configuration for establishing an
// HTTP request or for locating a local resource.
Resource *ResourceConfig `config:"resource" validate:"required"`

// FailureDump configures failure dump behaviour.
FailureDump *dumpConfig `config:"failure_dump"`
}

type redact struct {
Expand All @@ -69,6 +72,19 @@ type redact struct {
Delete bool `config:"delete"`
}

// dumpConfig configures the CEL program to retain
// the full evaluation state using the cel.OptTrackState
// option. The state is written to a file in the path if
// the evaluation fails.
type dumpConfig struct {
Enabled *bool `config:"enabled"`
Filename string `config:"filename"`
}

func (t *dumpConfig) enabled() bool {
return t != nil && (t.Enabled == nil || *t.Enabled)
}

func (c config) Validate() error {
if c.Redact == nil {
logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " +
Expand All @@ -89,7 +105,8 @@ func (c config) Validate() error {
if len(c.Regexps) != 0 {
patterns = map[string]*regexp.Regexp{".": nil}
}
_, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil)
wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != ""
_, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump)
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
Expand Down
84 changes: 78 additions & 6 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package cel
import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -166,7 +167,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
Password: cfg.Auth.Basic.Password,
}
}
prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace)
wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != ""
prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,12 +253,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
metrics.executions.Add(1)
start := i.now().In(time.UTC)
state, err = evalWith(ctx, prg, ast, state, start)
state, err = evalWith(ctx, prg, ast, state, start, wantDump)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
var dump dumpError
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
return err
case errors.As(err, &dump):
path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName))
dir := filepath.Dir(path)
base := filepath.Base(path)
ext := filepath.Ext(base)
prefix := strings.TrimSuffix(base, ext)
path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext)
log.Debugw("writing failure dump file", "path", path)
err := dump.writeToFile(path)
if err != nil {
log.Errorw("failed to write failure dump", "path", path, "error", err)
}
}
log.Errorw("failed evaluation", "error", err)
env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
Expand Down Expand Up @@ -785,6 +800,26 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin
}
}
}
if !cfg.FailureDump.enabled() && cfg.FailureDump != nil && cfg.FailureDump.Filename != "" {
// We have a fail-dump name, but we are not enabled,
// so remove all dumps we own.
err = os.Remove(cfg.FailureDump.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.FailureDump.Filename, "error", err)
}
ext := filepath.Ext(cfg.FailureDump.Filename)
base := strings.TrimSuffix(cfg.FailureDump.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
}

if reg != nil {
c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg)
Expand Down Expand Up @@ -1004,7 +1039,7 @@ func getEnv(allowed []string) map[string]string {
return env
}

func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, *cel.Ast, error) {
func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details bool) (cel.Program, *cel.Ast, error) {
xml, err := lib.XML(nil, xsd)
if err != nil {
return nil, nil, fmt.Errorf("failed to build xml type hints: %w", err)
Expand Down Expand Up @@ -1043,7 +1078,11 @@ func newProgram(ctx context.Context, src, root string, vars map[string]string, c
return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err())
}

prg, err := env.Program(ast)
var progOpts []cel.ProgramOption
if details {
progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)}
}
prg, err := env.Program(ast, progOpts...)
if err != nil {
return nil, nil, fmt.Errorf("failed program instantiation: %w", err)
}
Expand All @@ -1065,8 +1104,8 @@ func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, an
}
}

func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{
func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time, details bool) (map[string]interface{}, error) {
out, det, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
Expand All @@ -1081,6 +1120,9 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri
})
if err != nil {
err = lib.DecoratedError{AST: ast, Err: err}
if details {
err = dumpError{error: err, dump: lib.NewDump(ast, det)}
}
}
if e := ctx.Err(); e != nil {
err = e
Expand Down Expand Up @@ -1109,6 +1151,36 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri
}
}

// dumpError is an evaluation state dump associated with an error.
type dumpError struct {
error
dump *lib.Dump
}

func (e dumpError) writeToFile(path string) (err error) {
err = os.MkdirAll(filepath.Dir(path), 0o700)
if err != nil {
return err
}
f, err := os.Create(path)
if err != nil {
return err
}
defer func() {
err = errors.Join(err, f.Sync(), f.Close())
}()
enc := json.NewEncoder(f)
enc.SetEscapeHTML(false)
type dump struct {
Error string `json:"error"`
State []lib.NodeValue `json:"state"`
}
return enc.Encode(dump{
Error: e.Error(),
State: e.dump.NodeValues(),
})
}

// clearWantMore sets the state to not request additional work in a periodic evaluation.
// It leaves state intact if there is no "want_more" element, and sets the element to false
// if there is. This is necessary instead of just doing delete(state, "want_more") as
Expand Down
123 changes: 109 additions & 14 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var inputTests = []struct {
want []map[string]interface{}
wantCursor []map[string]interface{}
wantErr error
prepare func() error
wantFile string
wantNoFile string
}{
Expand Down Expand Up @@ -1685,6 +1686,88 @@ var inputTests = []struct {
},
}},
},
{
name: "dump_no_error",
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"value": try(debug("divide by zero", 0/0))}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
"failure_dump": map[string]interface{}{
"enabled": true,
"filename": "failure_dumps/dump.json",
},
},
time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) },
wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-08T00-00-00.000.json"),
want: []map[string]interface{}{{
"message": map[string]interface{}{
"value": "division by zero",
},
}},
},
{
name: "dump_error",
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
"failure_dump": map[string]interface{}{
"enabled": true,
"filename": "failure_dumps/dump.json",
},
},
time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
wantFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case.
want: []map[string]interface{}{
{
"error": map[string]interface{}{
"message": `failed eval: ERROR: <input>:1:58: division by zero
| {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}
| .........................................................^`,
},
},
},
},
{
name: "dump_error_delete",
config: map[string]interface{}{
"interval": 1,
"program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`,
"state": nil,
"resource": map[string]interface{}{
"url": "",
},
"failure_dump": map[string]interface{}{
"enabled": false, // We have a name but are disabled, so delete.
"filename": "failure_dumps/dump.json",
},
},
time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) },
prepare: func() error {
// Make a file that the configuration should delete.
err := os.MkdirAll("failure_dumps", 0o700)
if err != nil {
return err
}
return os.WriteFile(filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), nil, 0o600)
},
wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case.
want: []map[string]interface{}{
{
"error": map[string]interface{}{
"message": `failed eval: ERROR: <input>:1:58: division by zero
| {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}
| .........................................................^`,
},
},
},
},

// not yet done from httpjson (some are redundant since they are compositional products).
//
Expand All @@ -1708,6 +1791,11 @@ func TestInput(t *testing.T) {
os.Setenv("CELTESTENVVAR", "TESTVALUE")
os.Setenv("DISALLOWEDCELTESTENVVAR", "DISALLOWEDTESTVALUE")

err := os.RemoveAll("failure_dumps")
if err != nil {
t.Fatalf("failed to remove failure_dumps directory: %v", err)
}

logp.TestingSetup()
for _, test := range inputTests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -1718,6 +1806,13 @@ func TestInput(t *testing.T) {
t.Skip("skipping remote endpoint test")
}

if test.prepare != nil {
err := test.prepare()
if err != nil {
t.Fatalf("unexpected from prepare(): %v", err)
}
}

if test.server != nil {
test.server(t, test.handler, test.config)
}
Expand Down Expand Up @@ -1770,6 +1865,20 @@ func TestInput(t *testing.T) {
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr)
}
if test.wantFile != "" {
if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil {
t.Errorf("expected log file not found: %v", err)
}
}
if test.wantNoFile != "" {
paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
if err != nil {
t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
}
if len(paths) != 0 {
t.Errorf("unexpected files found: %v", paths)
}
}
if test.wantErr != nil {
return
}
Expand Down Expand Up @@ -1802,20 +1911,6 @@ func TestInput(t *testing.T) {
t.Errorf("unexpected cursor for event %d: got:- want:+\n%s", i, cmp.Diff(got, test.wantCursor[i]))
}
}
if test.wantFile != "" {
if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil {
t.Errorf("expected log file not found: %v", err)
}
}
if test.wantNoFile != "" {
paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile))
if err != nil {
t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err)
}
if len(paths) != 0 {
t.Errorf("unexpected files found: %v", paths)
}
}
})
}
}
Expand Down

0 comments on commit 0631d8b

Please sign in to comment.