-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[processor/logdeduplicationprocessor] Add logdedupprocessor (#34465)
**Description:** Starts the donation of the [logdedupprocessor](https://github.com/observIQ/bindplane-agent/tree/release/v1.58.0/processor/logdeduplicationprocessor) from ObserveIQ's Bindplane agent on behalf of @BinaryFissionGames. **Link to tracking Issue:** - Closes #34118 **Testing:** Includes unit tests. **Documentation:** --------- Co-authored-by: Brandon Johnson <binaryfissiongames@gmail.com> Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
- Loading branch information
1 parent
afb845a
commit b92eb43
Showing
26 changed files
with
2,174 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: new_component | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: logdedupeprocessor | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Add new logdedupeprocessor processor that deduplicates log entries. | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [34118] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Validating CODEOWNERS rules …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# Log DeDuplication Processor | ||
This processor is used to deduplicate logs by detecting identical logs over a range of time and emitting a single log with the count of logs that were deduplicated. | ||
|
||
## Supported pipelines | ||
- Logs | ||
|
||
## How It Works | ||
1. The user configures the log deduplication processor in the desired logs pipeline. | ||
2. All logs sent to the processor and aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes. | ||
3. After the interval, the processor emits a single log with the count of logs that were deduplicated. The emitted log will have the same body, resource attributes, severity, and log attributes as the original log. The emitted log will also have the following new attributes: | ||
|
||
- `log_count`: The count of logs that were deduplicated over the interval. The name of the attribute is configurable via the `log_count_attribute` parameter. | ||
- `first_observed_timestamp`: The timestamp of the first log that was observed during the aggregation interval. | ||
- `last_observed_timestamp`: The timestamp of the last log that was observed during the aggregation interval. | ||
|
||
**Note**: The `ObservedTimestamp` and `Timestamp` of the emitted log will be the time that the aggregated log was emitted and will not be the same as the `ObservedTimestamp` and `Timestamp` of the original logs. | ||
|
||
## Configuration | ||
| Field | Type | Default | Description | | ||
| --- | --- | --- | --- | | ||
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. | | ||
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. | | ||
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. The available locations depend on the local IANA Time Zone database. [This page](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) contains many examples, such as `America/New_York`. | | ||
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. | | ||
|
||
|
||
### Example Config | ||
The following config is an example configuration for the log deduplication processor. It is configured with an aggregation interval of `60 seconds`, a timezone of `America/Los_Angeles`, and a log count attribute of `dedup_count`. It has no fields being excluded. | ||
```yaml | ||
receivers: | ||
filelog: | ||
include: [./example/*.log] | ||
processors: | ||
logdedup: | ||
interval: 60s | ||
log_count_attribute: dedup_count | ||
timezone: 'America/Los_Angeles' | ||
exporters: | ||
googlecloud: | ||
|
||
service: | ||
pipelines: | ||
logs: | ||
receivers: [filelog] | ||
processors: [logdedup] | ||
exporters: [googlecloud] | ||
``` | ||
### Example Config with Excluded Fields | ||
The following config is an example configuration that excludes the following fields from being considered when searching for duplicate logs: | ||
- `timestamp` field from the body | ||
- `host.name` field from attributes | ||
- `ip` nested attribute inside a map attribute named `src` | ||
|
||
```yaml | ||
receivers: | ||
filelog: | ||
include: [./example/*.log] | ||
processors: | ||
logdedup: | ||
exclude_fields: | ||
- body.timestamp | ||
- attributes.host\.name | ||
- attributes.src.ip | ||
exporters: | ||
googlecloud: | ||
service: | ||
pipelines: | ||
logs: | ||
receivers: [filelog] | ||
processors: [logdedup] | ||
exporters: [googlecloud] | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// Package logdeduplicationprocessor provides a processor that counts logs as metrics. | ||
package logdeduplicationprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdeduplicationprocessor" | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
) | ||
|
||
// Config defaults | ||
const ( | ||
// defaultInterval is the default export interval. | ||
defaultInterval = 10 * time.Second | ||
|
||
// defaultLogCountAttribute is the default log count attribute | ||
defaultLogCountAttribute = "log_count" | ||
|
||
// defaultTimezone is the default timezone | ||
defaultTimezone = "UTC" | ||
|
||
// bodyField is the name of the body field | ||
bodyField = "body" | ||
|
||
// attributeField is the name of the attribute field | ||
attributeField = "attributes" | ||
) | ||
|
||
// Config errors | ||
var ( | ||
errInvalidLogCountAttribute = errors.New("log_count_attribute must be set") | ||
errInvalidInterval = errors.New("interval must be greater than 0") | ||
errCannotExcludeBody = errors.New("cannot exclude the entire body") | ||
) | ||
|
||
// Config is the config of the processor. | ||
type Config struct { | ||
LogCountAttribute string `mapstructure:"log_count_attribute"` | ||
Interval time.Duration `mapstructure:"interval"` | ||
Timezone string `mapstructure:"timezone"` | ||
ExcludeFields []string `mapstructure:"exclude_fields"` | ||
} | ||
|
||
// createDefaultConfig returns the default config for the processor. | ||
func createDefaultConfig() component.Config { | ||
return &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: defaultInterval, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{}, | ||
} | ||
} | ||
|
||
// Validate validates the configuration | ||
func (c Config) Validate() error { | ||
if c.Interval <= 0 { | ||
return errInvalidInterval | ||
} | ||
|
||
if c.LogCountAttribute == "" { | ||
return errInvalidLogCountAttribute | ||
} | ||
|
||
_, err := time.LoadLocation(c.Timezone) | ||
if err != nil { | ||
return fmt.Errorf("timezone is invalid: %w", err) | ||
} | ||
|
||
return c.validateExcludeFields() | ||
} | ||
|
||
// validateExcludeFields validates that all the exclude fields | ||
func (c Config) validateExcludeFields() error { | ||
knownExcludeFields := make(map[string]struct{}) | ||
|
||
for _, field := range c.ExcludeFields { | ||
// Special check to make sure the entire body is not excluded | ||
if field == bodyField { | ||
return errCannotExcludeBody | ||
} | ||
|
||
// Split and ensure the field starts with `body` or `attributes` | ||
parts := strings.Split(field, fieldDelimiter) | ||
if parts[0] != bodyField && parts[0] != attributeField { | ||
return fmt.Errorf("an excludefield must start with %s or %s", bodyField, attributeField) | ||
} | ||
|
||
// If a field is valid make sure we haven't already seen it | ||
if _, ok := knownExcludeFields[field]; ok { | ||
return fmt.Errorf("duplicate exclude_field %s", field) | ||
} | ||
|
||
knownExcludeFields[field] = struct{}{} | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package logdeduplicationprocessor | ||
|
||
import ( | ||
"errors" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestCreateDefaultProcessorConfig(t *testing.T) { | ||
cfg := createDefaultConfig().(*Config) | ||
require.Equal(t, defaultInterval, cfg.Interval) | ||
require.Equal(t, defaultLogCountAttribute, cfg.LogCountAttribute) | ||
require.Equal(t, defaultTimezone, cfg.Timezone) | ||
require.Equal(t, []string{}, cfg.ExcludeFields) | ||
} | ||
|
||
func TestValidateConfig(t *testing.T) { | ||
testCases := []struct { | ||
desc string | ||
cfg *Config | ||
expectedErr error | ||
}{ | ||
{ | ||
desc: "invalid LogCountAttribute config", | ||
cfg: &Config{ | ||
LogCountAttribute: "", | ||
Interval: defaultInterval, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{}, | ||
}, | ||
expectedErr: errInvalidLogCountAttribute, | ||
}, | ||
{ | ||
desc: "invalid Interval config", | ||
cfg: &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: -1, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{}, | ||
}, | ||
expectedErr: errInvalidInterval, | ||
}, | ||
{ | ||
desc: "invalid Timezone config", | ||
cfg: &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: defaultInterval, | ||
Timezone: "not a timezone", | ||
ExcludeFields: []string{}, | ||
}, | ||
expectedErr: errors.New("timezone is invalid"), | ||
}, | ||
{ | ||
desc: "invalid exclude entire body", | ||
cfg: &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: defaultInterval, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{bodyField}, | ||
}, | ||
expectedErr: errCannotExcludeBody, | ||
}, | ||
{ | ||
desc: "invalid exclude field body", | ||
cfg: &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: defaultInterval, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{"not.value"}, | ||
}, | ||
expectedErr: errors.New("an excludefield must start with"), | ||
}, | ||
{ | ||
desc: "invalid duplice exclude field", | ||
cfg: &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: defaultInterval, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{"body.thing", "body.thing"}, | ||
}, | ||
expectedErr: errors.New("duplicate exclude_field"), | ||
}, | ||
{ | ||
desc: "valid config", | ||
cfg: &Config{ | ||
LogCountAttribute: defaultLogCountAttribute, | ||
Interval: defaultInterval, | ||
Timezone: defaultTimezone, | ||
ExcludeFields: []string{"body.thing", "attributes.otherthing"}, | ||
}, | ||
expectedErr: nil, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.desc, func(t *testing.T) { | ||
err := tc.cfg.Validate() | ||
if tc.expectedErr != nil { | ||
require.ErrorContains(t, err, tc.expectedErr.Error()) | ||
} else { | ||
require.NoError(t, err) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.