Skip to content

Commit

Permalink
Flow: Improve Kubernetes log collection (#5623)
Browse files Browse the repository at this point in the history
* component/prometheus: fix panic in interceptor when child isn't set

This commit fixes a panic in prometheus.Interceptor where an interceptor
which doesn't forward samples to another appendable panics when
appending data.

Co-authored-by: Edward Welch <edward.welch@grafana.com>

* loki.source.kubernetes: improve detection of rolled log files

Versions of Kubernetes that do not contain kubernetes/kubernetes#115702
will fail to detect rolled log files, causing the API to stop sending
logs to the agent for processing.

To work around this, this commit intorduces a rolling average calculator
to determine the average delta between log entries per target. If 3x the
normal delta time has elapsed since the last entry, the tailer is
restarted.

False positives here are acceptable, but false negatives mean that log
lines may not appear for an extended period of time until the rolling
detection succeeds.

Closes #5040

Co-authored-by: Edward Welch <edward.welch@grafana.com>

* loki.source.kubernetes: support clustering

Add support for loki.source.kubernetes to distribute targets using
clustering.

Closes #4502

Co-authored-by: Edward Welch <edward.welch@grafana.com>

* loki.source.podlogs: support clustering

Add support for loki.source.podlogs to distribute targets using
clustering.

* service/cluster: add common block for clustering arguments

* remove irrelevant TODO comment

#5623 (comment)

---------

Co-authored-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
rfratto and slim-bean authored Oct 30, 2023
1 parent c9c27e9 commit 81c4b05
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 39 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ Main (unreleased)
- Added an `add_metric_suffixes` option to `otelcol.exporter.prometheus` in flow mode,
which configures whether to add type and unit suffixes to metrics names. (@mar4uk)

- Improve detection of rolled log files in `loki.source.kubernetes` and
`loki.source.podlogs` (@slim-bean).

- Support clustering in `loki.source.kubernetes` (@slim-bean).

- Support clustering in `loki.source.podlogs` (@rfratto).

v0.37.3 (2023-10-26)
-----------------

Expand Down
2 changes: 0 additions & 2 deletions component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func (t *DistributedTargets) Get() []Target {

res := make([]Target, 0, (len(t.targets)+1)/len(t.cluster.Peers()))

// TODO(@tpaschalis): Make sure OpReadWrite is the correct operation;
// eg. this determines how clustering behaves when nodes are shutting down.
for _, tgt := range t.targets {
peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
if err != nil {
Expand Down
47 changes: 37 additions & 10 deletions component/loki/source/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/grafana/agent/component/discovery"
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/service/cluster"
"k8s.io/client-go/kubernetes"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.kubernetes",
Args: Arguments{},
Name: "loki.source.kubernetes",
Args: Arguments{},
NeedsServices: []string{cluster.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
Expand All @@ -40,6 +42,8 @@ type Arguments struct {

// Client settings to connect to Kubernetes.
Client commonk8s.ClientArguments `river:"client,block,optional"`

Clustering cluster.ComponentBlock `river:"clustering,block,optional"`
}

// DefaultArguments holds default settings for loki.source.kubernetes.
Expand All @@ -57,6 +61,7 @@ type Component struct {
log log.Logger
opts component.Options
positions positions.Positions
cluster cluster.Cluster

mut sync.Mutex
args Arguments
Expand All @@ -72,6 +77,7 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.DebugComponent = (*Component)(nil)
_ cluster.Component = (*Component)(nil)
)

// New creates a new loki.source.kubernetes component.
Expand All @@ -89,7 +95,13 @@ func New(o component.Options, args Arguments) (*Component, error) {
return nil, err
}

data, err := o.GetServiceData(cluster.ServiceName)
if err != nil {
return nil, err
}

c := &Component{
cluster: data.(cluster.Cluster),
log: o.Logger,
opts: o,
handler: loki.NewLogsReceiver(),
Expand Down Expand Up @@ -168,28 +180,43 @@ func (c *Component) Update(args component.Arguments) error {
// No-op: manager already exists and options didn't change.
}

// Convert input targets into targets to give to tailer.
targets := make([]*kubetail.Target, 0, len(newArgs.Targets))
c.resyncTargets(newArgs.Targets)
c.args = newArgs
return nil
}

func (c *Component) resyncTargets(targets []discovery.Target) {
distTargets := discovery.NewDistributedTargets(c.args.Clustering.Enabled, c.cluster, targets)
targets = distTargets.Get()

for _, inTarget := range newArgs.Targets {
lset := inTarget.Labels()
tailTargets := make([]*kubetail.Target, 0, len(targets))
for _, target := range targets {
lset := target.Labels()
processed, err := kubetail.PrepareLabels(lset, c.opts.ID)
if err != nil {
// TODO(rfratto): should this set the health of the component?
level.Error(c.log).Log("msg", "failed to process input target", "target", lset.String(), "err", err)
continue
}
targets = append(targets, kubetail.NewTarget(lset, processed))
tailTargets = append(tailTargets, kubetail.NewTarget(lset, processed))
}

// This will never fail because it only fails if the context gets canceled.
//
// TODO(rfratto): should we have a generous update timeout to prevent this
// from potentially hanging forever?
_ = c.tailer.SyncTargets(context.Background(), targets)
_ = c.tailer.SyncTargets(context.Background(), tailTargets)
}

c.args = newArgs
return nil
// NotifyClusterChange implements cluster.Component.
func (c *Component) NotifyClusterChange() {
c.mut.Lock()
defer c.mut.Unlock()

if !c.args.Clustering.Enabled {
return
}
c.resyncTargets(c.args.Targets)
}

// getTailerOptions gets tailer options from arguments. If args hasn't changed
Expand Down
89 changes: 89 additions & 0 deletions component/loki/source/kubernetes/kubetail/tail_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kubetail

import (
"sync"
"time"
)

// rollingAverageCalculator calculates a rolling average between points in
// time.
//
// rollingAverageCalculator stores a circular buffer where the difference
// between timestamps are kept.
type rollingAverageCalculator struct {
mtx sync.Mutex
window []time.Duration
windowSize int

minEntries int
minDuration time.Duration
defaultDuration time.Duration

currentIndex int
prevTimestamp time.Time
}

func newRollingAverageCalculator(windowSize, minEntries int, minDuration, defaultDuration time.Duration) *rollingAverageCalculator {
return &rollingAverageCalculator{
windowSize: windowSize,
window: make([]time.Duration, windowSize),
minEntries: minEntries,
minDuration: minDuration,
defaultDuration: defaultDuration,
currentIndex: -1,
}
}

// AddTimestamp adds a new timestamp to the rollingAverageCalculator. If there
// is a previous timestamp, the difference between timestamps is calculated and
// stored in the window.
func (r *rollingAverageCalculator) AddTimestamp(timestamp time.Time) {
r.mtx.Lock()
defer func() {
r.prevTimestamp = timestamp
r.mtx.Unlock()
}()

// First timestamp
if r.currentIndex == -1 && r.prevTimestamp.Equal(time.Time{}) {
return
}

r.currentIndex++
if r.currentIndex >= r.windowSize {
r.currentIndex = 0
}

r.window[r.currentIndex] = timestamp.Sub(r.prevTimestamp)
}

// GetAverage calculates the average of all the durations in the window.
func (r *rollingAverageCalculator) GetAverage() time.Duration {
r.mtx.Lock()
defer r.mtx.Unlock()

var total time.Duration
count := 0
for _, v := range r.window {
if v != 0 {
total += v
count++
}
}
if count == 0 || count < r.minEntries {
return r.defaultDuration
}
d := total / time.Duration(count)
if d < r.minDuration {
return r.minDuration
}
return d
}

// GetLast gets the last timestamp added to the rollingAverageCalculator.
func (r *rollingAverageCalculator) GetLast() time.Time {
r.mtx.Lock()
defer r.mtx.Unlock()

return r.prevTimestamp
}
86 changes: 86 additions & 0 deletions component/loki/source/kubernetes/kubetail/tail_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kubetail

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test(t *testing.T) {
tests := []struct {
name string
windowSize int
minEntries int
input []int64
expected time.Duration
}{
{
name: "empty, expect default",
windowSize: 10,
input: []int64{},
expected: time.Duration(10),
},
{
name: "one sample, not enough, expect default",
windowSize: 5,
input: []int64{10},
expected: time.Duration(10),
},
{
name: "partially full",
windowSize: 10,
input: []int64{10, 20, 30, 40, 50},
expected: time.Duration(10),
},
{
name: "completely full",
windowSize: 5,
input: []int64{10, 20, 30, 40, 50, 60},
expected: time.Duration(10),
},
{
name: "rollover simple",
windowSize: 5,
input: []int64{10, 20, 30, 40, 50, 60},
expected: time.Duration(10),
},
{
name: "rollover complex: make sure first value is ignored",
windowSize: 5,
input: []int64{0, 40, 50, 60, 70, 80, 90},
expected: time.Duration(10),
},
{
name: "complex",
windowSize: 5,
// 40 +1 +4 +45 +5 = 95, 95/5 = 19
input: []int64{10, 50, 51, 55, 100, 105},
expected: time.Duration(19),
},
{
name: "complex 2",
windowSize: 10,
// outside of window |
// 40 +1 +4 +45 +|5 +5 +90 +100 +150 +300 +5 +45 +50 +149 = 899
input: []int64{10, 50, 51, 55, 100, 105, 110, 200, 300, 450, 750, 755, 800, 850, 999},
expected: time.Duration(89), // Integer math result is truncated not rounded.
},
{
name: "below min duration",
windowSize: 5,
input: []int64{0, 1, 2, 3, 4, 5, 6},
expected: time.Duration(2),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := newRollingAverageCalculator(test.windowSize, test.minEntries, time.Duration(2), time.Duration(10))
for _, v := range test.input {
c.AddTimestamp(time.Unix(0, v))
}
avg := c.GetAverage()
assert.Equal(t, test.expected, avg)
})
}
}
52 changes: 49 additions & 3 deletions component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type tailerTask struct {

var _ runner.Task = (*tailerTask)(nil)

const maxTailerLifetime = 1 * time.Hour

func (tt *tailerTask) Hash() uint64 { return tt.Target.Hash() }

func (tt *tailerTask) Equals(other runner.Task) bool {
Expand Down Expand Up @@ -127,7 +129,7 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
// Set a maximum lifetime of the tail to ensure that connections are
// reestablished. This avoids an issue where the Kubernetes API server stops
// responding with new logs while the connection is kept open.
ctx, cancel := context.WithTimeout(ctx, 1*time.Hour)
ctx, cancel := context.WithTimeout(ctx, maxTailerLifetime)
defer cancel()

var (
Expand Down Expand Up @@ -167,9 +169,51 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
if err != nil {
return err
}

// Create a new rolling average calculator to determine the average delta
// time between log entries.
//
// Here, we track the most recent 10,000 delta times to compute a fairly
// accurate average. If there are less than 100 deltas stored, the average
// time defaults to 1h.
//
// The computed average will never be less than the minimum of 2s.
calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime)

go func() {
<-ctx.Done()
_ = stream.Close()
rolledFileTicker := time.NewTicker(1 * time.Second)
defer func() {
rolledFileTicker.Stop()
_ = stream.Close()
}()
for {
select {
case <-ctx.Done():
return
case <-rolledFileTicker.C:
// Versions of Kubernetes which do not contain
// kubernetes/kubernetes#115702 will fail to detect rolled log files
// and stop sending logs to us.
//
// To work around this, we use a rolling average to determine how
// frequent we usually expect to see entries. If 3x the normal delta has
// elapsed, we'll restart the tailer.
//
// False positives here are acceptable, but false negatives mean that
// we'll have a larger spike of missing logs until we detect a rolled
// file.
avg := calc.GetAverage()
last := calc.GetLast()
if last.IsZero() {
continue
}
s := time.Since(last)
if s > avg*3 {
level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s)
return
}
}
}
}()

level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime)
Expand All @@ -183,6 +227,8 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
// Try processing the line before handling the error, since data may still
// be returned alongside an EOF.
if len(line) != 0 {
calc.AddTimestamp(time.Now())

entryTimestamp, entryLine := parseKubernetesLog(line)
if !entryTimestamp.After(lastReadTime) {
continue
Expand Down
Loading

0 comments on commit 81c4b05

Please sign in to comment.