Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix matching of leaf fields for objects #2054

Merged
merged 10 commits into from
Sep 4, 2024
3 changes: 3 additions & 0 deletions internal/fields/testdata/fields/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@
- array
- name: user.group.id
type: keyword
- name: attributes
type: object
object_type: keyword
4 changes: 4 additions & 0 deletions internal/fields/testdata/no-subobjects.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"attributes.id": "foo",
"attributes.status": "ok"
}
6 changes: 6 additions & 0 deletions internal/fields/testdata/subobjects.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"attributes": {
"id": "42",
"status": "ok"
}
}
53 changes: 41 additions & 12 deletions internal/fields/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,19 +828,13 @@ func isFieldTypeFlattened(key string, fieldDefinitions []FieldDefinition) bool {
}

func couldBeMultifield(key string, fieldDefinitions []FieldDefinition) bool {
lastDotIndex := strings.LastIndex(key, ".")
if lastDotIndex < 0 {
// Field at the root level cannot be a multifield.
return false
}
parentKey := key[:lastDotIndex]
parent := FindElementDefinition(parentKey, fieldDefinitions)
parent := findParentElementDefinition(key, fieldDefinitions)
if parent == nil {
// Parent is not defined, so not sure what this can be.
return false
}
switch parent.Type {
case "", "group", "nested", "group-nested", "object":
case "", "group", "nested", "object":
// Objects cannot have multifields.
return false
}
Expand All @@ -861,8 +855,8 @@ func isArrayOfObjects(val any) bool {
return false
}

func findElementDefinitionForRoot(root, searchedKey string, FieldDefinitions []FieldDefinition) *FieldDefinition {
for _, def := range FieldDefinitions {
func findElementDefinitionForRoot(root, searchedKey string, fieldDefinitions []FieldDefinition) *FieldDefinition {
for _, def := range fieldDefinitions {
key := strings.TrimLeft(root+"."+def.Name, ".")
if compareKeys(key, def, searchedKey) {
return &def
Expand All @@ -878,6 +872,19 @@ func findElementDefinitionForRoot(root, searchedKey string, FieldDefinitions []F
return fd
}
}

if root == "" {
// No definition found, check if the parent is an object with object type.
parent := findParentElementDefinition(searchedKey, fieldDefinitions)
if parent != nil && parent.Type == "object" && parent.ObjectType != "" {
fd := *parent
fd.Name = searchedKey
fd.Type = parent.ObjectType
fd.ObjectType = ""
return &fd
}
}

return nil
}

Expand All @@ -886,6 +893,16 @@ func FindElementDefinition(searchedKey string, fieldDefinitions []FieldDefinitio
return findElementDefinitionForRoot("", searchedKey, fieldDefinitions)
}

func findParentElementDefinition(key string, fieldDefinitions []FieldDefinition) *FieldDefinition {
lastDotIndex := strings.LastIndex(key, ".")
if lastDotIndex < 0 {
// Field at the root level cannot be a multifield.
return nil
}
parentKey := key[:lastDotIndex]
return FindElementDefinition(parentKey, fieldDefinitions)
}

// compareKeys checks if `searchedKey` matches with the given `key`. `key` can contain
// wildcards (`*`), that match any sequence of characters in `searchedKey` different to dots.
func compareKeys(key string, def FieldDefinition, searchedKey string) bool {
Expand Down Expand Up @@ -1100,7 +1117,7 @@ func (v *Validator) parseSingleElementValue(key string, definition FieldDefiniti
return fmt.Errorf("the IP %q is not one of the allowed test IPs (see: https://github.com/elastic/elastic-package/blob/main/internal/fields/_static/allowed_geo_ips.txt)", valStr)
}
// Groups should only contain nested fields, not single values.
case "group", "nested":
case "group", "nested", "object":
switch val := val.(type) {
case map[string]any:
// This is probably an element from an array of objects,
Expand All @@ -1124,7 +1141,19 @@ func (v *Validator) parseSingleElementValue(key string, definition FieldDefiniti
// The document contains a null, let's consider this like an empty array.
return nil
default:
return fmt.Errorf("field %q is a group of fields, it cannot store values", key)
switch {
case definition.Type == "object" && definition.ObjectType != "":
// This is the leaf element of an object without wildcards in the name, adapt the definition and try again.
definition.Name = definition.Name + ".*"
definition.Type = definition.ObjectType
definition.ObjectType = ""
return v.parseSingleElementValue(key, definition, val, doc)
case definition.Type == "object" && definition.ObjectType == "":
// Legacy mapping, ambiguous definition not allowed by recent versions of the spec, ignore it.
return nil
}

return fmt.Errorf("field %q is a group of fields of type %s, it cannot store values", key, definition.Type)
}
// Numbers should have been parsed as float64, otherwise they are not numbers.
case "float", "long", "double":
Expand Down
19 changes: 19 additions & 0 deletions internal/fields/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ func TestValidate_WithFlattenedFields(t *testing.T) {
require.Empty(t, errs)
}

func TestValidate_ObjectTypeWithoutWildcard(t *testing.T) {
validator, err := CreateValidatorForDirectory("testdata",
WithDisabledDependencyManagement())
require.NoError(t, err)
require.NotNil(t, validator)

t.Run("subobjects", func(t *testing.T) {
e := readSampleEvent(t, "testdata/subobjects.json")
errs := validator.ValidateDocumentBody(e)
require.Empty(t, errs)
})

t.Run("no-subobjects", func(t *testing.T) {
e := readSampleEvent(t, "testdata/no-subobjects.json")
errs := validator.ValidateDocumentBody(e)
require.Empty(t, errs)
})
}

func TestValidate_WithNumericKeywordFields(t *testing.T) {
validator, err := CreateValidatorForDirectory("testdata",
WithNumericKeywordFields([]string{
Expand Down