Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #22849 to 7.x: Add 'expand_keys' option to JSON input/processor #23104

Merged
merged 1 commit into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for ephemeral containers in kubernetes autodiscover and `add_kubernetes_metadata`. {pull}22389[22389] {pull}22439[22439]
- Added support for wildcard fields and keyword fallback in beats setup commands. {pull}22521[22521]
- Fix polling node when it is not ready and monitor by hostname {pull}22666[22666]
- Add `expand_keys` option to `decode_json_fields` processor and `json` input, to recusively de-dot and expand json keys into hierarchical object structures {pull}22849[22849]
- Update k8s client and release k8s leader lock gracefully {pull}22919[22919]
- Add tini as init system in docker images {pull}22137[22137]
- Added "detect_mime_type" processor for detecting mime types {pull}22940[22940]
Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
7 changes: 6 additions & 1 deletion filebeat/docs/inputs/input-common-harvester-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ level in the output document. The default is false.
values from the decoded JSON object overwrite the fields that {beatname_uc}
normally adds (type, source, offset, etc.) in case of conflicts.

*`expand_keys`*:: If this setting is enabled, {beatname_uc} will recursively
de-dot keys in the decoded JSON, and expand them into a hierarchical object
structure. For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
This setting should be enabled when the input is produced by an
https://github.com/elastic/ecs-logging[ECS logger].

*`add_error_key`*:: If this setting is enabled, {beatname_uc} adds a
"error.message" and "error.type: json" key in case of JSON unmarshalling errors
or when a `message_key` is defined in the configuration but cannot be used.
Expand All @@ -206,4 +212,3 @@ Options that control how {beatname_uc} deals with log messages that span
multiple lines. See <<multiline-examples>> for more information about
configuring multiline options.


5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ filebeat.inputs:
# in case of conflicts.
#json.overwrite_keys: false

# If this setting is enabled, then keys in the decoded JSON object will be recursively
# de-dotted, and expanded into a hierarchical object structure.
# For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
#json.expand_keys: false

# If this setting is enabled, Filebeat adds a "error.message" and "error.key: json" key in case of JSON
# unmarshaling errors or when a text key is defined in the configuration but cannot
# be used.
Expand Down
114 changes: 114 additions & 0 deletions libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
)

// expandFields de-dots the keys in m by expanding them in-place into a
// nested object structure, merging objects as necessary. If there are any
// conflicts (i.e. a common prefix where one field is an object and another
// is a non-object), an error will be returned.
//
// Note that expandFields is destructive, and in the case of an error the
// map may be left in a semi-expanded state.
func expandFields(m common.MapStr) error {
for k, v := range m {
newMap, newIsMap := getMap(v)
if newIsMap {
if err := expandFields(newMap); err != nil {
return errors.Wrapf(err, "error expanding %q", k)
}
}
if dot := strings.IndexRune(k, '.'); dot < 0 {
continue
}

// Delete the dotted key.
delete(m, k)

// Put expands k, returning the original value if any.
//
// If v is a map then we will merge with an existing map if any,
// otherwise there must not be an existing value.
old, err := m.Put(k, v)
if err != nil {
// Put will return an error if we attempt to insert into a non-object value.
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if old == nil {
continue
}
if !newIsMap {
return fmt.Errorf("cannot expand %q: found existing (%T) value", k, old)
} else {
oldMap, oldIsMap := getMap(old)
if !oldIsMap {
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if err := mergeObjects(newMap, oldMap); err != nil {
return errors.Wrapf(err, "cannot expand %q", k)
}
}
}
return nil
}

// mergeObjects deep merges the elements of rhs into lhs.
//
// mergeObjects will recursively combine the entries of
// objects with the same key in each object. If there exist
// two entries with the same key in each object which
// are not both objects, then an error will result.
func mergeObjects(lhs, rhs common.MapStr) error {
for k, rhsValue := range rhs {
lhsValue, ok := lhs[k]
if !ok {
lhs[k] = rhsValue
continue
}
lhsMap, ok := getMap(lhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, lhsValue)
}
rhsMap, ok := getMap(rhsValue)
if !ok {
return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue)
}
if err := mergeObjects(lhsMap, rhsMap); err != nil {
return errors.Wrapf(err, "cannot merge %q", k)
}
}
return nil
}

func getMap(v interface{}) (map[string]interface{}, bool) {
switch v := v.(type) {
case map[string]interface{}:
return v, true
case common.MapStr:
return v, true
}
return nil, false
}
133 changes: 133 additions & 0 deletions libbeat/common/jsontransform/expand_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
)

func TestExpand(t *testing.T) {
type data struct {
Event common.MapStr
Expected common.MapStr
Err string
}
tests := []data{
{
Event: common.MapStr{
"hello.world": 15,
},
Expected: common.MapStr{
"hello": common.MapStr{
"world": 15,
},
},
},
{
Event: common.MapStr{
"test": 15,
},
Expected: common.MapStr{
"test": 15,
},
},
{
Event: common.MapStr{
"test": 15,
"hello.there": 1,
"hello.world.ok": "test",
"elastic.for": "search",
},
Expected: common.MapStr{
"test": 15,
"hello": common.MapStr{
"there": 1,
"world": common.MapStr{
"ok": "test",
},
},
"elastic": common.MapStr{
"for": "search",
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"ok": 1,
},
"root.shared": "yes",
"root.one.two.three": 4,
},
Expected: common.MapStr{
"root": common.MapStr{
"ok": 1,
"shared": "yes",
"one": common.MapStr{"two": common.MapStr{"three": 4}},
},
},
},
{
Event: common.MapStr{
"root": common.MapStr{
"seven": 1,
},
"root.seven.eight": 2,
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": 1,
"a": common.MapStr{
"b": 2,
},
},
Err: `cannot expand .*`,
},
{
Event: common.MapStr{
"a.b": common.MapStr{
"c": common.MapStr{
"d": 1,
},
},
"a.b.c": common.MapStr{
"d": 2,
},
},
Err: `cannot expand .*`,
},
}

for _, test := range tests {
err := expandFields(test.Event)
if test.Err != "" {
require.Error(t, err)
assert.Regexp(t, test.Err, err.Error())
continue
}
require.NoError(t, err)
assert.Equal(t, test.Expected, test.Event)
}
}
9 changes: 8 additions & 1 deletion libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,15 @@ import (
)

// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) {
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) {
logger := logp.NewLogger("jsonhelper")
if expandKeys {
if err := expandFields(keys); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey)
return
}
}
if !overwriteKeys {
// @timestamp and @metadata fields are root-level fields. We remove them so they
// don't become part of event.Fields.
Expand Down
42 changes: 41 additions & 1 deletion libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestWriteJSONKeys(t *testing.T) {

tests := map[string]struct {
keys map[string]interface{}
expandKeys bool
overwriteKeys bool
expectedMetadata common.MapStr
expectedTimestamp time.Time
Expand Down Expand Up @@ -117,6 +118,45 @@ func TestWriteJSONKeys(t *testing.T) {
"top_c": "COMPLETELY_NEW_c",
},
},
"expand_true": {
expandKeys: true,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": common.MapStr{
"inner_e": "COMPLETELY_NEW_e",
},
},
},
},
"expand_false": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
},
}

for name, test := range tests {
Expand All @@ -127,7 +167,7 @@ func TestWriteJSONKeys(t *testing.T) {
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.overwriteKeys, false)
WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
Expand Down
Loading