Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow: Improve Kubernetes log collection #5623

Merged
merged 6 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Main (unreleased)
- Fix validation issue with ServiceMonitors when scrape timeout is greater than interval. (@captncraig)

- Static mode's spanmetrics processor will now prune histograms when the dimension cache is pruned.
Dimension cache was always pruned but histograms were not being pruned. This caused metric series
Dimension cache was always pruned but histograms were not being pruned. This caused metric series
created by the spanmetrics processor to grow unbounded. Only static mode has this issue. Flow mode's
`otelcol.connector.spanmetrics` does not have this bug. (@nijave)

Expand All @@ -74,9 +74,16 @@ Main (unreleased)
- The `loki.source.docker` component now allows connecting to Docker daemons
over HTTP(S) and setting up TLS credentials. (@tpaschalis)

- Added an `add_metric_suffixes` option to `otelcol.exporter.prometheus` in flow mode,
- Added an `add_metric_suffixes` option to `otelcol.exporter.prometheus` in flow mode,
which configures whether to add type and unit suffixes to metrics names. (@mar4uk)

- Improve detection of rolled log files in `loki.source.kubernetes` and
`loki.source.podlogs` (@slim-bean).

- Support clustering in `loki.source.kubernetes` (@slim-bean).

- Support clustering in `loki.source.podlogs` (@rfratto).

v0.37.3 (2023-10-26)
-----------------

Expand All @@ -98,9 +105,9 @@ v0.37.3 (2023-10-26)

- Upgrade OpenTelemetry Collector packages to version 0.87 (@ptodev):
- `otelcol.receiver.kafka` has a new `header_extraction` block to extract headers from Kafka records.
- `otelcol.receiver.kafka` has a new `version` argument to change the version of
- `otelcol.receiver.kafka` has a new `version` argument to change the version of
the SASL Protocol for SASL authentication.

v0.37.2 (2023-10-16)
-----------------

Expand Down
2 changes: 0 additions & 2 deletions component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (t *DistributedTargets) Get() []Target {

res := make([]Target, 0, (len(t.targets)+1)/len(t.cluster.Peers()))

// TODO(@tpaschalis): Make sure OpReadWrite is the correct operation;
// eg. this determines how clustering behaves when nodes are shutting down.
for _, tgt := range t.targets {
peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
if err != nil {
Expand Down
47 changes: 37 additions & 10 deletions component/loki/source/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/grafana/agent/component/discovery"
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/service/cluster"
"k8s.io/client-go/kubernetes"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.kubernetes",
Args: Arguments{},
Name: "loki.source.kubernetes",
Args: Arguments{},
NeedsServices: []string{cluster.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
Expand All @@ -40,6 +42,8 @@ type Arguments struct {

// Client settings to connect to Kubernetes.
Client commonk8s.ClientArguments `river:"client,block,optional"`

Clustering cluster.ComponentBlock `river:"clustering,block,optional"`
}

// DefaultArguments holds default settings for loki.source.kubernetes.
Expand All @@ -57,6 +61,7 @@ type Component struct {
log log.Logger
opts component.Options
positions positions.Positions
cluster cluster.Cluster

mut sync.Mutex
args Arguments
Expand All @@ -72,6 +77,7 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.DebugComponent = (*Component)(nil)
_ cluster.Component = (*Component)(nil)
)

// New creates a new loki.source.kubernetes component.
Expand All @@ -89,7 +95,13 @@ func New(o component.Options, args Arguments) (*Component, error) {
return nil, err
}

data, err := o.GetServiceData(cluster.ServiceName)
if err != nil {
return nil, err
}

c := &Component{
cluster: data.(cluster.Cluster),
log: o.Logger,
opts: o,
handler: loki.NewLogsReceiver(),
Expand Down Expand Up @@ -168,28 +180,43 @@ func (c *Component) Update(args component.Arguments) error {
// No-op: manager already exists and options didn't change.
}

// Convert input targets into targets to give to tailer.
targets := make([]*kubetail.Target, 0, len(newArgs.Targets))
c.resyncTargets(newArgs.Targets)
c.args = newArgs
return nil
}

func (c *Component) resyncTargets(targets []discovery.Target) {
distTargets := discovery.NewDistributedTargets(c.args.Clustering.Enabled, c.cluster, targets)
targets = distTargets.Get()

for _, inTarget := range newArgs.Targets {
lset := inTarget.Labels()
tailTargets := make([]*kubetail.Target, 0, len(targets))
for _, target := range targets {
lset := target.Labels()
processed, err := kubetail.PrepareLabels(lset, c.opts.ID)
if err != nil {
// TODO(rfratto): should this set the health of the component?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what situations will PrepareLabels error? Is there a case where we discard a target and increment a counter or something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it fails based on validations:

// Validation of lset fails if there is no label indicating the pod namespace,
// pod name, or container name.

In this particular case, the component would've been given a bad target.

level.Error(c.log).Log("msg", "failed to process input target", "target", lset.String(), "err", err)
continue
}
targets = append(targets, kubetail.NewTarget(lset, processed))
tailTargets = append(tailTargets, kubetail.NewTarget(lset, processed))
}

// This will never fail because it only fails if the context gets canceled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is called while holding the component mutex, I'd say yes, we should have a timeout. Even if we can only log it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm nervous about that; maybe it should be detached from the mutex instead.

If it times out and fails, that means that the targets won't ever converge unless there's another discovery update or the cluster updates again. While discovery updates are common for us, it might not be common in more stable environments.

So, the risk of a timeout causing targets to never converge is worrying me. WDYT?

//
// TODO(rfratto): should we have a generous update timeout to prevent this
// from potentially hanging forever?
_ = c.tailer.SyncTargets(context.Background(), targets)
_ = c.tailer.SyncTargets(context.Background(), tailTargets)
}

c.args = newArgs
return nil
// NotifyClusterChange implements cluster.Component.
func (c *Component) NotifyClusterChange() {
c.mut.Lock()
defer c.mut.Unlock()

if !c.args.Clustering.Enabled {
return
}
c.resyncTargets(c.args.Targets)
}

// getTailerOptions gets tailer options from arguments. If args hasn't changed
Expand Down
89 changes: 89 additions & 0 deletions component/loki/source/kubernetes/kubetail/tail_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kubetail

import (
"sync"
"time"
)

// rollingAverageCalculator calculates a rolling average between points in
// time.
//
// rollingAverageCalculator stores a circular buffer where the difference
// between timestamps are kept.
type rollingAverageCalculator struct {
mtx sync.Mutex
window []time.Duration
windowSize int

minEntries int
minDuration time.Duration
defaultDuration time.Duration

currentIndex int
prevTimestamp time.Time
}

func newRollingAverageCalculator(windowSize, minEntries int, minDuration, defaultDuration time.Duration) *rollingAverageCalculator {
return &rollingAverageCalculator{
windowSize: windowSize,
window: make([]time.Duration, windowSize),
minEntries: minEntries,
minDuration: minDuration,
defaultDuration: defaultDuration,
currentIndex: -1,
}
}

// AddTimestamp adds a new timestamp to the rollingAverageCalculator. If there
// is a previous timestamp, the difference between timestamps is calculated and
// stored in the window.
func (r *rollingAverageCalculator) AddTimestamp(timestamp time.Time) {
r.mtx.Lock()
defer func() {
r.prevTimestamp = timestamp
r.mtx.Unlock()
}()

// First timestamp
if r.currentIndex == -1 && r.prevTimestamp.Equal(time.Time{}) {
return
}

r.currentIndex++
if r.currentIndex >= r.windowSize {
r.currentIndex = 0
}

r.window[r.currentIndex] = timestamp.Sub(r.prevTimestamp)
}

// GetAverage calculates the average of all the durations in the window.
func (r *rollingAverageCalculator) GetAverage() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()

var total time.Duration
count := 0
for _, v := range r.window {
if v != 0 {
total += v
count++
}
}
if count == 0 || count < r.minEntries {
return r.defaultDuration
}
d := total / time.Duration(count)
if d < r.minDuration {
return r.minDuration
}
return d
}

// GetLast gets the last timestamp added to the rollingAverageCalculator.
func (r *rollingAverageCalculator) GetLast() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()

return r.prevTimestamp
}
86 changes: 86 additions & 0 deletions component/loki/source/kubernetes/kubetail/tail_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kubetail

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test(t *testing.T) {
tests := []struct {
name string
windowSize int
minEntries int
input []int64
expected time.Duration
}{
{
name: "empty, expect default",
windowSize: 10,
input: []int64{},
expected: time.Duration(10),
},
{
name: "one sample, not enough, expect default",
windowSize: 5,
input: []int64{10},
expected: time.Duration(10),
},
{
name: "partially full",
windowSize: 10,
input: []int64{10, 20, 30, 40, 50},
expected: time.Duration(10),
},
{
name: "completely full",
windowSize: 5,
input: []int64{10, 20, 30, 40, 50, 60},
expected: time.Duration(10),
},
{
name: "rollover simple",
windowSize: 5,
input: []int64{10, 20, 30, 40, 50, 60},
expected: time.Duration(10),
},
{
name: "rollover complex: make sure first value is ignored",
windowSize: 5,
input: []int64{0, 40, 50, 60, 70, 80, 90},
expected: time.Duration(10),
},
{
name: "complex",
windowSize: 5,
// 40 +1 +4 +45 +5 = 95, 95/5 = 19
input: []int64{10, 50, 51, 55, 100, 105},
expected: time.Duration(19),
},
{
name: "complex 2",
windowSize: 10,
// outside of window |
// 40 +1 +4 +45 +|5 +5 +90 +100 +150 +300 +5 +45 +50 +149 = 899
input: []int64{10, 50, 51, 55, 100, 105, 110, 200, 300, 450, 750, 755, 800, 850, 999},
expected: time.Duration(89), // Integer math result is truncated not rounded.
},
{
name: "below min duration",
windowSize: 5,
input: []int64{0, 1, 2, 3, 4, 5, 6},
expected: time.Duration(2),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := newRollingAverageCalculator(test.windowSize, test.minEntries, time.Duration(2), time.Duration(10))
for _, v := range test.input {
c.AddTimestamp(time.Unix(0, v))
}
avg := c.GetAverage()
assert.Equal(t, test.expected, avg)
})
}
}
Loading