Skip to content

Commit

Permalink
Merge branch 'master' into opcua-subs
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Sep 28, 2022
2 parents f7e93c7 + fc8a300 commit 7397478
Show file tree
Hide file tree
Showing 265 changed files with 2,889 additions and 1,008 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ commands:
equal: [ windows, << parameters.os >> ]
steps:
- run: rm -rf /c/Go
- run: choco feature enable -n allowGlobalConfirmation
- restore_cache:
key: windows-go-<< parameters.cache_version >>-{{ checksum "go.sum" }}
- run: 'sh ./scripts/installgo_windows.sh'
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ on:
schedule:
# Trigger every day at 16:00 UTC
- cron: '0 16 * * *'
permissions:
contents: read # to fetch code (actions/checkout)
pull-requests: read # to fetch pull requests (golangci/golangci-lint-action)

jobs:
golangci-pr:
if: github.ref != 'refs/heads/master'
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ on:
###############
# Set the Job #
###############
permissions: {}

jobs:
build:
# Name the Job
permissions:
contents: read # to fetch code (actions/checkout)
statuses: write # to mark status of each linter run (github/super-linter)

name: Lint Code Base
# Set the agent to run on
runs-on: ubuntu-latest
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
<!-- markdownlint-disable MD024 -->
# Changelog

## v1.24.1 [2022-09-19]

### Bugfixes

- [#11787](https://github.com/influxdata/telegraf/pull/11787) Clear error message when provided config is not a text file
- [#11835](https://github.com/influxdata/telegraf/pull/11835) Enable global confirmation for installing mingw
- [#10797](https://github.com/influxdata/telegraf/pull/10797) `inputs.ceph` Modernize Ceph input plugin metrics
- [#11785](https://github.com/influxdata/telegraf/pull/11785) `inputs.modbus` Do not fail if a single slave reports errors
- [#11827](https://github.com/influxdata/telegraf/pull/11827) `inputs.ntpq` Handle pools with &#34;-&#34; when
- [#11825](https://github.com/influxdata/telegraf/pull/11825) `parsers.csv` Remove direct checks for the parser type
- [#11781](https://github.com/influxdata/telegraf/pull/11781) `parsers.xpath` Add array index when expanding names.
- [#11815](https://github.com/influxdata/telegraf/pull/11815) `parsers` Memory leak for plugins using ParserFunc.
- [#11826](https://github.com/influxdata/telegraf/pull/11826) `parsers` Unwrap parser and remove some special handling

### Features

- [#11228](https://github.com/influxdata/telegraf/pull/11228) `processors.parser` Add option to parse tags

### Dependency Updates

- [#11788](https://github.com/influxdata/telegraf/pull/11788) `deps` Bump cloud.google.com/go/pubsub from 1.24.0 to 1.25.1
- [#11794](https://github.com/influxdata/telegraf/pull/11794) `deps` Bump github.com/urfave/cli/v2 from 2.14.1 to 2.16.3
- [#11789](https://github.com/influxdata/telegraf/pull/11789) `deps` Bump github.com/aws/aws-sdk-go-v2/service/ec2
- [#11799](https://github.com/influxdata/telegraf/pull/11799) `deps` Bump github.com/wavefronthq/wavefront-sdk-go
- [#11796](https://github.com/influxdata/telegraf/pull/11796) `deps` Bump cloud.google.com/go/bigquery from 1.33.0 to 1.40.0

## v1.24.0 [2022-09-12]

### Bugfixes
Expand Down
7 changes: 0 additions & 7 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,6 @@ func (a *Agent) initPlugins() error {
input.LogName(), err)
}
}
for _, parser := range a.Config.Parsers {
err := parser.Init()
if err != nil {
return fmt.Errorf("could not initialize parser %s::%s: %v",
parser.Config.DataFormat, parser.Config.Parent, err)
}
}
for _, processor := range a.Config.Processors {
err := processor.Init()
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion agent/agent_posix.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build !windows
// +build !windows

package agent

Expand Down
1 change: 0 additions & 1 deletion agent/agent_windows.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build windows
// +build windows

package agent

Expand Down
1 change: 0 additions & 1 deletion cmd/telegraf/main_win_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build windows
// +build windows

package main

Expand Down
1 change: 0 additions & 1 deletion cmd/telegraf/telegraf_posix.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build !windows
// +build !windows

package main

Expand Down
1 change: 0 additions & 1 deletion cmd/telegraf/telegraf_windows.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build windows
// +build windows

//go:generate goversioninfo -icon=../../assets/windows/tiger.ico

Expand Down
130 changes: 73 additions & 57 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ type Config struct {
Inputs []*models.RunningInput
Outputs []*models.RunningOutput
Aggregators []*models.RunningAggregator
Parsers []*models.RunningParser
// Processors have a slice wrapper type because they need to be sorted
Processors models.RunningProcessors
AggProcessors models.RunningProcessors
// Parsers are created by their inputs during gather. Config doesn't keep track of them
// like the other plugins because they need to be garbage collected (See issue #11809)

Deprecations map[string][]int64
version *semver.Version
Expand All @@ -98,7 +99,6 @@ func NewConfig() *Config {
Tags: make(map[string]string),
Inputs: make([]*models.RunningInput, 0),
Outputs: make([]*models.RunningOutput, 0),
Parsers: make([]*models.RunningParser, 0),
Processors: make([]*models.RunningProcessor, 0),
AggProcessors: make([]*models.RunningProcessor, 0),
InputFilters: make([]string, 0),
Expand Down Expand Up @@ -242,15 +242,6 @@ func (c *Config) AggregatorNames() []string {
return PluginNameCounts(name)
}

// ParserNames returns a list of strings of the configured parsers.
func (c *Config) ParserNames() []string {
var name []string
for _, parser := range c.Parsers {
name = append(name, parser.Config.DataFormat)
}
return PluginNameCounts(name)
}

// ProcessorNames returns a list of strings of the configured processors.
func (c *Config) ProcessorNames() []string {
var name []string
Expand Down Expand Up @@ -696,18 +687,23 @@ func (c *Config) probeParser(table *ast.Table) bool {
return ok
}

func (c *Config) addParser(parentname string, table *ast.Table) (*models.RunningParser, error) {
func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) (*models.RunningParser, error) {
var dataformat string
c.getFieldString(table, "data_format", &dataformat)

if dataformat == "" {
if parentname == "exec" {
if parentcategory == "inputs" && parentname == "exec" {
// Legacy support, exec plugin originally parsed JSON by default.
dataformat = "json"
} else {
dataformat = "influx"
}
}
var influxParserType string
c.getFieldString(table, "influx_parser_type", &influxParserType)
if dataformat == "influx" && influxParserType == "upstream" {
dataformat = "influx_upstream"
}

creator, ok := parsers.Parsers[dataformat]
if !ok {
Expand All @@ -725,9 +721,8 @@ func (c *Config) addParser(parentname string, table *ast.Table) (*models.Running
}

running := models.NewRunningParser(parser, conf)
c.Parsers = append(c.Parsers, running)

return running, nil
err = running.Init()
return running, err
}

func (c *Config) addProcessor(name string, table *ast.Table) error {
Expand All @@ -740,51 +735,82 @@ func (c *Config) addProcessor(name string, table *ast.Table) error {
}
return fmt.Errorf("Undefined but requested processor: %s", name)
}
streamingProcessor := creator()

// For processors with parsers we need to compute the set of
// options that is not covered by both, the parser and the processor.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
// missing entries at the end.
missCount := make(map[string]int)
c.setLocalMissingTomlFieldTracker(missCount)
defer c.resetMissingTomlFieldTracker()

processorConfig, err := c.buildProcessor(name, table)
if err != nil {
return err
}

rf, err := c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
var processor interface{}
processor = streamingProcessor
if p, ok := streamingProcessor.(unwrappable); ok {
processor = p.Unwrap()
}

// If the (underlying) processor has a SetParser or SetParserFunc function,
// it can accept arbitrary data-formats, so build the requested parser and
// set it.
if t, ok := processor.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("processors", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}

if t, ok := processor.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
return c.addParser("processors", name, table)
})
}

// Setup the processor
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
return err
}

rf := models.NewRunningProcessor(streamingProcessor, processorConfig)
c.Processors = append(c.Processors, rf)

// save a copy for the aggregator
rf, err = c.newRunningProcessor(creator, processorConfig, table)
if err != nil {
// Save a copy for the aggregator
if err := c.setupProcessorOptions(processorConfig.Name, streamingProcessor, table); err != nil {
return err
}

rf = models.NewRunningProcessor(streamingProcessor, processorConfig)
c.AggProcessors = append(c.AggProcessors, rf)

return nil
}

func (c *Config) newRunningProcessor(
creator processors.StreamingCreator,
processorConfig *models.ProcessorConfig,
table *ast.Table,
) (*models.RunningProcessor, error) {
processor := creator()

func (c *Config) setupProcessorOptions(name string, processor telegraf.StreamingProcessor, table *ast.Table) error {
if p, ok := processor.(unwrappable); ok {
if err := c.toml.UnmarshalTable(table, p.Unwrap()); err != nil {
return nil, err
}
} else {
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, err
unwrapped := p.Unwrap()
if err := c.toml.UnmarshalTable(table, unwrapped); err != nil {
return fmt.Errorf("unmarshalling unwrappable failed: %w", err)
}
return c.printUserDeprecation("processors", name, unwrapped)
}

if err := c.printUserDeprecation("processors", processorConfig.Name, processor); err != nil {
return nil, err
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return fmt.Errorf("unmarshalling failed: %w", err)
}

rf := models.NewRunningProcessor(processor, processorConfig)
return rf, nil
return c.printUserDeprecation("processors", name, processor)
}

func (c *Config) addOutput(name string, table *ast.Table) error {
Expand Down Expand Up @@ -865,8 +891,8 @@ func (c *Config) addInput(name string, table *ast.Table) error {

// If the input has a SetParser or SetParserFunc function, it can accept
// arbitrary data-formats, so build the requested parser and set it.
if t, ok := input.(telegraf.ParserInput); ok {
parser, err := c.addParser(name, table)
if t, ok := input.(telegraf.ParserPlugin); ok {
parser, err := c.addParser("inputs", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
Expand All @@ -875,40 +901,30 @@ func (c *Config) addInput(name string, table *ast.Table) error {

// Keep the old interface for backward compatibility
if t, ok := input.(parsers.ParserInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserInput.
parser, err := c.addParser(name, table)
// DEPRECATED: Please switch your plugin to telegraf.ParserPlugin.
parser, err := c.addParser("inputs", name, table)
if err != nil {
return fmt.Errorf("adding parser failed: %w", err)
}
t.SetParser(parser)
}

if t, ok := input.(telegraf.ParserFuncInput); ok {
if t, ok := input.(telegraf.ParserFuncPlugin); ok {
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (telegraf.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
return c.addParser("inputs", name, table)
})
}

if t, ok := input.(parsers.ParserFuncInput); ok {
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncInput.
// DEPRECATED: Please switch your plugin to telegraf.ParserFuncPlugin.
if !c.probeParser(table) {
return errors.New("parser not found")
}
t.SetParserFunc(func() (parsers.Parser, error) {
parser, err := c.addParser(name, table)
if err != nil {
return nil, err
}
err = parser.Init()
return parser, err
return c.addParser("inputs", name, table)
})
}

Expand Down Expand Up @@ -1233,7 +1249,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"tagdrop", "tagexclude", "taginclude", "tagpass", "tags":

// Parser options to ignore
case "data_type":
case "data_type", "influx_parser_type":

// Serializer options to ignore
case "prefix", "template", "templates",
Expand Down
Loading

0 comments on commit 7397478

Please sign in to comment.