Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: allow users to redact state fields in logs (
Browse files Browse the repository at this point in the history
…#34302)

This add logic, configuration and documentation for redacting parts of pre- and
post-processing CEL state when logging state to debug logs. The configuration
allows users to specify fields that should not be entered into logs and whether
these fields should be masked or deleted from the logged object.
  • Loading branch information
efd6 authored Jan 29, 2023
1 parent 48d3f04 commit 7925345
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]
- Add pagination support for Salesforce module. {issue}34057[34057] {pull}34065[34065]
- Allow users to redact sensitive data from CEL input debug logs. {pull}34302[34302]

*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,16 @@ filebeat.inputs:
})
----

[float]
==== `redact.fields`

This specifies fields in the `state` to be redacted prior to debug logging. Fields listed in this array will be either replaced with a `*` or deleted entirely from messages sent to debug logs.

[float]
==== `redact.delete`

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
11 changes: 11 additions & 0 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type config struct {
// be overwritten by any stored cursor, but will be
// available if no stored cursor exists.
State map[string]interface{} `config:"state"`
// Redact is the debug log state redaction configuration.
Redact redact `config:"redact"`

// Auth is the authentication config for connection to an HTTP
// API endpoint.
Expand All @@ -43,6 +45,15 @@ type config struct {
Resource *ResourceConfig `config:"resource" validate:"required"`
}

type redact struct {
// Fields indicates which fields to apply redaction to prior
// to logging.
Fields []string `config:"fields"`
// Delete indicates that fields should be completely deleted
// before logging rather than redaction with a "*".
Delete bool `config:"delete"`
}

func (c config) Validate() error {
if c.Interval <= 0 {
return errors.New("interval must be greater than 0")
Expand Down
105 changes: 103 additions & 2 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
}

// Process a set of event requests.
log.Debugw("request state", logp.Namespace("cel"), "state", state)
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete})
metrics.executions.Add(1)
start := time.Now()
state, err = evalWith(ctx, prg, state)
log.Debugw("response state", logp.Namespace("cel"), "state", state)
log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, mask: cfg.Redact.Fields, delete: cfg.Redact.Delete})
if err != nil {
switch {
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
Expand Down Expand Up @@ -950,3 +950,104 @@ func newInputMetrics(id string) *inputMetrics {
func (m *inputMetrics) Close() {
m.unregister()
}

// redactor implements lazy field redaction of sets of a mapstr.M.
type redactor struct {
state mapstr.M
mask []string // mask is the set of dotted paths to redact from state.
delete bool // if delete is true, delete redacted fields instead of showing a redaction.
}

// String renders the JSON corresponding to r.state after applying redaction
// operations.
func (r redactor) String() string {
if len(r.mask) == 0 {
return r.state.String()
}
c := make(mapstr.M, len(r.state))
cloneMap(c, r.state)
for _, mask := range r.mask {
if r.delete {
walkMap(c, mask, func(parent mapstr.M, key string) {
delete(parent, key)
})
continue
}
walkMap(c, mask, func(parent mapstr.M, key string) {
parent[key] = "*"
})
}
return c.String()
}

// cloneMap is an enhanced version of mapstr.M.Clone that handles cloning arrays
// within objects. Nested arrays are not handled.
func cloneMap(dst, src mapstr.M) {
for k, v := range src {
switch v := v.(type) {
case mapstr.M:
d := make(mapstr.M, len(v))
dst[k] = d
cloneMap(d, v)
case map[string]interface{}:
d := make(map[string]interface{}, len(v))
dst[k] = d
cloneMap(d, v)
case []mapstr.M:
a := make([]mapstr.M, 0, len(v))
for _, m := range v {
d := make(mapstr.M, len(m))
cloneMap(d, m)
a = append(a, d)
}
dst[k] = a
case []map[string]interface{}:
a := make([]map[string]interface{}, 0, len(v))
for _, m := range v {
d := make(map[string]interface{}, len(m))
cloneMap(d, m)
a = append(a, d)
}
dst[k] = a
default:
dst[k] = v
}
}
}

// walkMap walks to all ends of the provided path in m and applies fn to the
// final element of each walk. Nested arrays are not handled.
func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) {
key, rest, more := strings.Cut(path, ".")
v, ok := m[key]
if !ok {
return
}
if !more {
fn(m, key)
return
}
switch v := v.(type) {
case mapstr.M:
walkMap(v, rest, fn)
case map[string]interface{}:
walkMap(v, rest, fn)
case []mapstr.M:
for _, m := range v {
walkMap(m, rest, fn)
}
case []map[string]interface{}:
for _, m := range v {
walkMap(m, rest, fn)
}
case []interface{}:
for _, v := range v {
switch m := v.(type) {
case mapstr.M:
walkMap(m, rest, fn)
case map[string]interface{}:
walkMap(m, rest, fn)
}
}
}
}
110 changes: 110 additions & 0 deletions x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,3 +1358,113 @@ func paginationArrayHandler() http.HandlerFunc {
count++
}
}

var redactorTests = []struct {
name string
state mapstr.M
mask []string
delete bool

wantOrig string
wantRedact string
}{
{
name: "auth_no_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth"},
delete: false,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"auth":"*","other":"data"}`,
},
{
name: "auth_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth"},
delete: true,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"other":"data"}`,
},
{
name: "pass_no_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth.pass"},
delete: false,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"auth":{"pass":"*","user":"fred"},"other":"data"}`,
},
{
name: "pass_delete",
state: mapstr.M{
"auth": mapstr.M{
"user": "fred",
"pass": "top_secret",
},
"other": "data",
},
mask: []string{"auth.pass"},
delete: true,
wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`,
wantRedact: `{"auth":{"user":"fred"},"other":"data"}`,
},
{
name: "multi_cursor_no_delete",
state: mapstr.M{
"cursor": []mapstr.M{
{"key": "val_one", "other": "data"},
{"key": "val_two", "other": "data"},
},
"other": "data",
},
mask: []string{"cursor.key"},
delete: false,
wantOrig: `{"cursor":[{"key":"val_one","other":"data"},{"key":"val_two","other":"data"}],"other":"data"}`,
wantRedact: `{"cursor":[{"key":"*","other":"data"},{"key":"*","other":"data"}],"other":"data"}`,
},
{
name: "multi_cursor_delete",
state: mapstr.M{
"cursor": []mapstr.M{
{"key": "val_one", "other": "data"},
{"key": "val_two", "other": "data"},
},
"other": "data",
},
mask: []string{"cursor.key"},
delete: true,
wantOrig: `{"cursor":[{"key":"val_one","other":"data"},{"key":"val_two","other":"data"}],"other":"data"}`,
wantRedact: `{"cursor":[{"other":"data"},{"other":"data"}],"other":"data"}`,
},
}

func TestRedactor(t *testing.T) {
for _, test := range redactorTests {
t.Run(test.name, func(t *testing.T) {
got := fmt.Sprint(redactor{state: test.state, mask: test.mask, delete: test.delete})
orig := fmt.Sprint(test.state)
if orig != test.wantOrig {
t.Errorf("unexpected original state after redaction:\n--- got\n--- want\n%s", cmp.Diff(orig, test.wantOrig))
}
if got != test.wantRedact {
t.Errorf("unexpected redaction:\n--- got\n--- want\n%s", cmp.Diff(got, test.wantRedact))
}
})
}
}

0 comments on commit 7925345

Please sign in to comment.