Skip to content

Commit

Permalink
[receiver/filelog] Implement specifying top n files to track when ord…
Browse files Browse the repository at this point in the history
…ering (#27844)

**Description:**
* Add a new `ordering_criteria.top_n` option, which allows a user to
specify the number of files to track after ordering.
  * Default is 1, which was the existing behavior.

**Link to tracking Issue:** #23788

**Testing:**
Unit tests added.

**Documentation:**
Added new parameter to existing documentation.
  • Loading branch information
BinaryFissionGames authored Oct 18, 2023
1 parent c2f343b commit c44ad3c
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 3 deletions.
22 changes: 22 additions & 0 deletions .chloggen/feat_top_n_file_sorting.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a new "top_n" option to specify the number of files to track when using ordering criteria

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23788]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: ["user"]
10 changes: 10 additions & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,16 @@ func TestUnmarshal(t *testing.T) {
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "ordering_criteria_top_n",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.OrderingCriteria = matcher.OrderingCriteria{
TopN: 10,
}
return newMockOperatorConfig(cfg)
}(),
},
},
}.Run(t)
}
Expand Down
23 changes: 20 additions & 3 deletions pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const (
sortTypeAlphabetical = "alphabetical"
)

const (
defaultOrderingCriteriaTopN = 1
)

type Criteria struct {
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`
Expand All @@ -26,6 +30,7 @@ type Criteria struct {

type OrderingCriteria struct {
Regex string `mapstructure:"regex,omitempty"`
TopN int `mapstructure:"top_n,omitempty"`
SortBy []Sort `mapstructure:"sort_by,omitempty"`
}

Expand Down Expand Up @@ -62,6 +67,14 @@ func New(c Criteria) (*Matcher, error) {
return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified")
}

if c.OrderingCriteria.TopN < 0 {
return nil, fmt.Errorf("'top_n' must be a positive integer")
}

if c.OrderingCriteria.TopN == 0 {
c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN
}

regex, err := regexp.Compile(c.OrderingCriteria.Regex)
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
Expand Down Expand Up @@ -97,6 +110,7 @@ func New(c Criteria) (*Matcher, error) {
include: c.Include,
exclude: c.Exclude,
regex: regex,
topN: c.OrderingCriteria.TopN,
filterOpts: filterOpts,
}, nil
}
Expand All @@ -105,6 +119,7 @@ type Matcher struct {
include []string
exclude []string
regex *regexp.Regexp
topN int
filterOpts []filter.Option
}

Expand All @@ -127,7 +142,9 @@ func (m Matcher) MatchFiles() ([]string, error) {
return result, errors.Join(err, errs)
}

// Return only the first item.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23788
return result[:1], errors.Join(err, errs)
if len(result) <= m.topN {
return result, errors.Join(err, errs)
}

return result[:m.topN], errors.Join(err, errs)
}
114 changes: 114 additions & 0 deletions pkg/stanza/fileconsumer/matcher/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,23 @@ func TestNew(t *testing.T) {
},
expectedErr: "compile regex: error parsing regexp: missing closing ]: `[a-z`",
},
{
name: "TopN is negative",
criteria: Criteria{
Include: []string{"*.log"},
OrderingCriteria: OrderingCriteria{
Regex: "[a-z]",
TopN: -1,
SortBy: []Sort{
{
SortType: "numeric",
RegexKey: "key",
},
},
},
},
expectedErr: "'top_n' must be a positive integer",
},
{
name: "SortTypeEmpty",
criteria: Criteria{
Expand Down Expand Up @@ -249,6 +266,46 @@ func TestMatcher(t *testing.T) {
},
expected: []string{"err.2023020612.log"},
},
{
name: "TopN > number of files",
files: []string{"err.2023020611.log", "err.2023020612.log"},
include: []string{"err.*.log"},
exclude: []string{},
filterCriteria: OrderingCriteria{
Regex: `err\.(?P<value>\d{4}\d{2}\d{2}\d{2}).*log`,
TopN: 3,
SortBy: []Sort{
{
SortType: sortTypeTimestamp,
RegexKey: "value",
Ascending: false,
Location: "UTC",
Layout: `%Y%m%d%H`,
},
},
},
expected: []string{"err.2023020612.log", "err.2023020611.log"},
},
{
name: "TopN == number of files",
files: []string{"err.2023020611.log", "err.2023020612.log"},
include: []string{"err.*.log"},
exclude: []string{},
filterCriteria: OrderingCriteria{
Regex: `err\.(?P<value>\d{4}\d{2}\d{2}\d{2}).*log`,
TopN: 2,
SortBy: []Sort{
{
SortType: sortTypeTimestamp,
RegexKey: "value",
Ascending: false,
Location: "UTC",
Layout: `%Y%m%d%H`,
},
},
},
expected: []string{"err.2023020612.log", "err.2023020611.log"},
},
{
name: "Timestamp Sorting Ascending",
files: []string{"err.2023020612.log", "err.2023020611.log", "err.2023020609.log", "err.2023020610.log"},
Expand Down Expand Up @@ -319,6 +376,24 @@ func TestMatcher(t *testing.T) {
},
expected: []string{"err.d.log"},
},
{
name: "Alphabetical Sorting - Top 2",
files: []string{"err.a.log", "err.d.log", "err.b.log", "err.c.log"},
include: []string{"err.*.log"},
exclude: []string{},
filterCriteria: OrderingCriteria{
Regex: `err\.(?P<value>[a-zA-Z]+).*log`,
TopN: 2,
SortBy: []Sort{
{
SortType: sortTypeAlphabetical,
RegexKey: "value",
Ascending: false,
},
},
},
expected: []string{"err.d.log", "err.c.log"},
},
{
name: "Alphabetical Sorting Ascending",
files: []string{"err.b.log", "err.a.log", "err.c.log", "err.d.log"},
Expand All @@ -336,6 +411,45 @@ func TestMatcher(t *testing.T) {
},
expected: []string{"err.a.log"},
},
{
name: "Multiple Sorting - timestamp priority sort - Top 4",
files: []string{
"err.b.1.2023020601.log",
"err.b.2.2023020601.log",
"err.a.1.2023020601.log",
"err.a.2.2023020601.log",
"err.b.1.2023020602.log",
"err.a.2.2023020602.log",
"err.b.2.2023020602.log",
"err.a.1.2023020602.log",
},
include: []string{"err.*.log"},
exclude: []string{},
filterCriteria: OrderingCriteria{
Regex: `err\.(?P<alpha>[a-zA-Z])\.(?P<number>\d+)\.(?P<time>\d{10})\.log`,
TopN: 4,
SortBy: []Sort{
{
SortType: sortTypeAlphabetical,
RegexKey: "alpha",
Ascending: false,
},
{
SortType: sortTypeNumeric,
RegexKey: "number",
Ascending: false,
},
{
SortType: sortTypeTimestamp,
RegexKey: "time",
Ascending: false,
Location: "UTC",
Layout: `%Y%m%d%H`,
},
},
},
expected: []string{"err.b.2.2023020602.log", "err.a.2.2023020602.log", "err.b.1.2023020602.log", "err.a.1.2023020602.log"},
},
{
name: "Multiple Sorting - timestamp priority sort",
files: []string{
Expand Down
4 changes: 4 additions & 0 deletions pkg/stanza/fileconsumer/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,7 @@ header_config:
pattern: "^#"
metadata_operators:
- type: "regex_parser"
ordering_criteria_top_n:
type: mock
ordering_criteria:
top_n: 10
1 change: 1 addition & 0 deletions receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Tails and parses logs from files.
| `retry_on_failure.max_interval` | `30s` | Upper bound on retry backoff [interval](#time-parameters). Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
| `retry_on_failure.max_elapsed_time` | `5m` | Maximum amount of [time](#time-parameters) (including retries) spent trying to send a logs batch to a downstream consumer. Once this value is reached, the data is discarded. Retrying never stops if set to `0`.
| `ordering_criteria.regex` | | Regular expression used for sorting, should contain a named capture groups that are to be used in `regex_key`. |
| `ordering_criteria.top_n` | 1 | The number of files to track when using file ordering. The top N files are tracked after applying the ordering criteria. |
| `ordering_criteria.sort_by.sort_type` | | Type of sorting to be performed (e.g., `numeric`, `alphabetical`, `timestamp`) |
| `ordering_criteria.sort_by.location` | | Relevant if `sort_type` is set to `timestamp`. Defines the location of the timestamp of the file. |
| `ordering_criteria.sort_by.format` | | Relevant if `sort_type` is set to `timestamp`. Defines the strptime format of the timestamp being sorted. |
Expand Down

0 comments on commit c44ad3c

Please sign in to comment.