diff --git a/CHANGELOG.md b/CHANGELOG.md index 13aacdf3da41..f29ae0462fe0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,13 @@ Main (unreleased) - Added an `add_metric_suffixes` option to `otelcol.exporter.prometheus` in flow mode, which configures whether to add type and unit suffixes to metrics names. (@mar4uk) +- Improve detection of rolled log files in `loki.source.kubernetes` and + `loki.source.podlogs` (@slim-bean). + +- Support clustering in `loki.source.kubernetes` (@slim-bean). + +- Support clustering in `loki.source.podlogs` (@rfratto). + v0.37.3 (2023-10-26) ----------------- diff --git a/component/discovery/discovery.go b/component/discovery/discovery.go index 7e366c1dd26e..cacd9559070d 100644 --- a/component/discovery/discovery.go +++ b/component/discovery/discovery.go @@ -45,8 +45,6 @@ func (t *DistributedTargets) Get() []Target { res := make([]Target, 0, (len(t.targets)+1)/len(t.cluster.Peers())) - // TODO(@tpaschalis): Make sure OpReadWrite is the correct operation; - // eg. this determines how clustering behaves when nodes are shutting down. for _, tgt := range t.targets { peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite) if err != nil { diff --git a/component/loki/source/kubernetes/kubernetes.go b/component/loki/source/kubernetes/kubernetes.go index 859710dbc3e8..ce11017c6f87 100644 --- a/component/loki/source/kubernetes/kubernetes.go +++ b/component/loki/source/kubernetes/kubernetes.go @@ -18,13 +18,15 @@ import ( "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/loki/source/kubernetes/kubetail" "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/service/cluster" "k8s.io/client-go/kubernetes" ) func init() { component.Register(component.Registration{ - Name: "loki.source.kubernetes", - Args: Arguments{}, + Name: "loki.source.kubernetes", + Args: Arguments{}, + NeedsServices: []string{cluster.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) @@ -40,6 +42,8 @@ type Arguments struct { // Client settings to connect to Kubernetes. Client commonk8s.ClientArguments `river:"client,block,optional"` + + Clustering cluster.ComponentBlock `river:"clustering,block,optional"` } // DefaultArguments holds default settings for loki.source.kubernetes. @@ -57,6 +61,7 @@ type Component struct { log log.Logger opts component.Options positions positions.Positions + cluster cluster.Cluster mut sync.Mutex args Arguments @@ -72,6 +77,7 @@ type Component struct { var ( _ component.Component = (*Component)(nil) _ component.DebugComponent = (*Component)(nil) + _ cluster.Component = (*Component)(nil) ) // New creates a new loki.source.kubernetes component. @@ -89,7 +95,13 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, err } + data, err := o.GetServiceData(cluster.ServiceName) + if err != nil { + return nil, err + } + c := &Component{ + cluster: data.(cluster.Cluster), log: o.Logger, opts: o, handler: loki.NewLogsReceiver(), @@ -168,28 +180,43 @@ func (c *Component) Update(args component.Arguments) error { // No-op: manager already exists and options didn't change. } - // Convert input targets into targets to give to tailer. - targets := make([]*kubetail.Target, 0, len(newArgs.Targets)) + c.resyncTargets(newArgs.Targets) + c.args = newArgs + return nil +} + +func (c *Component) resyncTargets(targets []discovery.Target) { + distTargets := discovery.NewDistributedTargets(c.args.Clustering.Enabled, c.cluster, targets) + targets = distTargets.Get() - for _, inTarget := range newArgs.Targets { - lset := inTarget.Labels() + tailTargets := make([]*kubetail.Target, 0, len(targets)) + for _, target := range targets { + lset := target.Labels() processed, err := kubetail.PrepareLabels(lset, c.opts.ID) if err != nil { // TODO(rfratto): should this set the health of the component? level.Error(c.log).Log("msg", "failed to process input target", "target", lset.String(), "err", err) continue } - targets = append(targets, kubetail.NewTarget(lset, processed)) + tailTargets = append(tailTargets, kubetail.NewTarget(lset, processed)) } // This will never fail because it only fails if the context gets canceled. // // TODO(rfratto): should we have a generous update timeout to prevent this // from potentially hanging forever? - _ = c.tailer.SyncTargets(context.Background(), targets) + _ = c.tailer.SyncTargets(context.Background(), tailTargets) +} - c.args = newArgs - return nil +// NotifyClusterChange implements cluster.Component. +func (c *Component) NotifyClusterChange() { + c.mut.Lock() + defer c.mut.Unlock() + + if !c.args.Clustering.Enabled { + return + } + c.resyncTargets(c.args.Targets) } // getTailerOptions gets tailer options from arguments. If args hasn't changed diff --git a/component/loki/source/kubernetes/kubetail/tail_utils.go b/component/loki/source/kubernetes/kubetail/tail_utils.go new file mode 100644 index 000000000000..57b3a5bd4d6a --- /dev/null +++ b/component/loki/source/kubernetes/kubetail/tail_utils.go @@ -0,0 +1,89 @@ +package kubetail + +import ( + "sync" + "time" +) + +// rollingAverageCalculator calculates a rolling average between points in +// time. +// +// rollingAverageCalculator stores a circular buffer where the difference +// between timestamps are kept. +type rollingAverageCalculator struct { + mtx sync.Mutex + window []time.Duration + windowSize int + + minEntries int + minDuration time.Duration + defaultDuration time.Duration + + currentIndex int + prevTimestamp time.Time +} + +func newRollingAverageCalculator(windowSize, minEntries int, minDuration, defaultDuration time.Duration) *rollingAverageCalculator { + return &rollingAverageCalculator{ + windowSize: windowSize, + window: make([]time.Duration, windowSize), + minEntries: minEntries, + minDuration: minDuration, + defaultDuration: defaultDuration, + currentIndex: -1, + } +} + +// AddTimestamp adds a new timestamp to the rollingAverageCalculator. If there +// is a previous timestamp, the difference between timestamps is calculated and +// stored in the window. +func (r *rollingAverageCalculator) AddTimestamp(timestamp time.Time) { + r.mtx.Lock() + defer func() { + r.prevTimestamp = timestamp + r.mtx.Unlock() + }() + + // First timestamp + if r.currentIndex == -1 && r.prevTimestamp.Equal(time.Time{}) { + return + } + + r.currentIndex++ + if r.currentIndex >= r.windowSize { + r.currentIndex = 0 + } + + r.window[r.currentIndex] = timestamp.Sub(r.prevTimestamp) +} + +// GetAverage calculates the average of all the durations in the window. +func (r *rollingAverageCalculator) GetAverage() time.Duration { + r.mtx.Lock() + defer r.mtx.Unlock() + + var total time.Duration + count := 0 + for _, v := range r.window { + if v != 0 { + total += v + count++ + } + } + if count == 0 || count < r.minEntries { + return r.defaultDuration + } + d := total / time.Duration(count) + if d < r.minDuration { + return r.minDuration + } + return d +} + +// GetLast gets the last timestamp added to the rollingAverageCalculator. +func (r *rollingAverageCalculator) GetLast() time.Time { + r.mtx.Lock() + defer r.mtx.Unlock() + + return r.prevTimestamp +} diff --git a/component/loki/source/kubernetes/kubetail/tail_utils_test.go b/component/loki/source/kubernetes/kubetail/tail_utils_test.go new file mode 100644 index 000000000000..0913090b07f1 --- /dev/null +++ b/component/loki/source/kubernetes/kubetail/tail_utils_test.go @@ -0,0 +1,86 @@ +package kubetail + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test(t *testing.T) { + tests := []struct { + name string + windowSize int + minEntries int + input []int64 + expected time.Duration + }{ + { + name: "empty, expect default", + windowSize: 10, + input: []int64{}, + expected: time.Duration(10), + }, + { + name: "one sample, not enough, expect default", + windowSize: 5, + input: []int64{10}, + expected: time.Duration(10), + }, + { + name: "partially full", + windowSize: 10, + input: []int64{10, 20, 30, 40, 50}, + expected: time.Duration(10), + }, + { + name: "completely full", + windowSize: 5, + input: []int64{10, 20, 30, 40, 50, 60}, + expected: time.Duration(10), + }, + { + name: "rollover simple", + windowSize: 5, + input: []int64{10, 20, 30, 40, 50, 60}, + expected: time.Duration(10), + }, + { + name: "rollover complex: make sure first value is ignored", + windowSize: 5, + input: []int64{0, 40, 50, 60, 70, 80, 90}, + expected: time.Duration(10), + }, + { + name: "complex", + windowSize: 5, + // 40 +1 +4 +45 +5 = 95, 95/5 = 19 + input: []int64{10, 50, 51, 55, 100, 105}, + expected: time.Duration(19), + }, + { + name: "complex 2", + windowSize: 10, + // outside of window | + // 40 +1 +4 +45 +|5 +5 +90 +100 +150 +300 +5 +45 +50 +149 = 899 + input: []int64{10, 50, 51, 55, 100, 105, 110, 200, 300, 450, 750, 755, 800, 850, 999}, + expected: time.Duration(89), // Integer math result is truncated not rounded. + }, + { + name: "below min duration", + windowSize: 5, + input: []int64{0, 1, 2, 3, 4, 5, 6}, + expected: time.Duration(2), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c := newRollingAverageCalculator(test.windowSize, test.minEntries, time.Duration(2), time.Duration(10)) + for _, v := range test.input { + c.AddTimestamp(time.Unix(0, v)) + } + avg := c.GetAverage() + assert.Equal(t, test.expected, avg) + }) + } +} diff --git a/component/loki/source/kubernetes/kubetail/tailer.go b/component/loki/source/kubernetes/kubetail/tailer.go index 7b447db751e5..6ea2a2d9b3fb 100644 --- a/component/loki/source/kubernetes/kubetail/tailer.go +++ b/component/loki/source/kubernetes/kubetail/tailer.go @@ -30,6 +30,8 @@ type tailerTask struct { var _ runner.Task = (*tailerTask)(nil) +const maxTailerLifetime = 1 * time.Hour + func (tt *tailerTask) Hash() uint64 { return tt.Target.Hash() } func (tt *tailerTask) Equals(other runner.Task) bool { @@ -127,7 +129,7 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { // Set a maximum lifetime of the tail to ensure that connections are // reestablished. This avoids an issue where the Kubernetes API server stops // responding with new logs while the connection is kept open. - ctx, cancel := context.WithTimeout(ctx, 1*time.Hour) + ctx, cancel := context.WithTimeout(ctx, maxTailerLifetime) defer cancel() var ( @@ -167,9 +169,51 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { if err != nil { return err } + + // Create a new rolling average calculator to determine the average delta + // time between log entries. + // + // Here, we track the most recent 10,000 delta times to compute a fairly + // accurate average. If there are less than 100 deltas stored, the average + // time defaults to 1h. + // + // The computed average will never be less than the minimum of 2s. + calc := newRollingAverageCalculator(10000, 100, 2*time.Second, maxTailerLifetime) + go func() { - <-ctx.Done() - _ = stream.Close() + rolledFileTicker := time.NewTicker(1 * time.Second) + defer func() { + rolledFileTicker.Stop() + _ = stream.Close() + }() + for { + select { + case <-ctx.Done(): + return + case <-rolledFileTicker.C: + // Versions of Kubernetes which do not contain + // kubernetes/kubernetes#115702 will fail to detect rolled log files + // and stop sending logs to us. + // + // To work around this, we use a rolling average to determine how + // frequent we usually expect to see entries. If 3x the normal delta has + // elapsed, we'll restart the tailer. + // + // False positives here are acceptable, but false negatives mean that + // we'll have a larger spike of missing logs until we detect a rolled + // file. + avg := calc.GetAverage() + last := calc.GetLast() + if last.IsZero() { + continue + } + s := time.Since(last) + if s > avg*3 { + level.Info(t.log).Log("msg", "have not seen a log line in 3x average time between lines, closing and re-opening tailer", "rolling_average", avg, "time_since_last", s) + return + } + } + } }() level.Info(t.log).Log("msg", "opened log stream", "start time", lastReadTime) @@ -183,6 +227,8 @@ func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { // Try processing the line before handling the error, since data may still // be returned alongside an EOF. if len(line) != 0 { + calc.AddTimestamp(time.Now()) + entryTimestamp, entryLine := parseKubernetesLog(line) if !entryTimestamp.After(lastReadTime) { continue diff --git a/component/loki/source/podlogs/podlogs.go b/component/loki/source/podlogs/podlogs.go index f051b84856ed..8fa4b48b96c0 100644 --- a/component/loki/source/podlogs/podlogs.go +++ b/component/loki/source/podlogs/podlogs.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/agent/component/loki/source/kubernetes" "github.com/grafana/agent/component/loki/source/kubernetes/kubetail" "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/service/cluster" "github.com/oklog/run" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -25,8 +26,9 @@ import ( func init() { component.Register(component.Registration{ - Name: "loki.source.podlogs", - Args: Arguments{}, + Name: "loki.source.podlogs", + Args: Arguments{}, + NeedsServices: []string{cluster.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) @@ -44,6 +46,8 @@ type Arguments struct { Selector config.LabelSelector `river:"selector,block,optional"` NamespaceSelector config.LabelSelector `river:"namespace_selector,block,optional"` + + Clustering cluster.ComponentBlock `river:"clustering,block,optional"` } // DefaultArguments holds default settings for loki.source.kubernetes. @@ -80,6 +84,7 @@ type Component struct { var ( _ component.Component = (*Component)(nil) _ component.DebugComponent = (*Component)(nil) + _ cluster.Component = (*Component)(nil) ) // New creates a new loki.source.podlogs component. @@ -96,9 +101,14 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, err } + data, err := o.GetServiceData(cluster.ServiceName) + if err != nil { + return nil, err + } + var ( tailer = kubetail.NewManager(o.Logger, nil) - reconciler = newReconciler(o.Logger, tailer) + reconciler = newReconciler(o.Logger, tailer, data.(cluster.Cluster)) controller = newController(o.Logger, reconciler) ) @@ -202,6 +212,17 @@ func (c *Component) Update(args component.Arguments) error { return nil } +// NotifyClusterChange implements cluster.Component. +func (c *Component) NotifyClusterChange() { + c.mut.Lock() + defer c.mut.Unlock() + + if !c.args.Clustering.Enabled { + return + } + c.controller.RequestReconcile() +} + // updateTailer updates the state of the tailer. mut must be held when calling. func (c *Component) updateTailer(args Arguments) error { if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil { @@ -254,6 +275,7 @@ func (c *Component) updateReconciler(args Arguments) error { } c.reconciler.UpdateSelectors(sel, nsSel) + c.reconciler.SetDistribute(args.Clustering.Enabled) // Request a reconcile so the new selectors get applied. c.controller.RequestReconcile() diff --git a/component/loki/source/podlogs/reconciler.go b/component/loki/source/podlogs/reconciler.go index fd6db061fa21..4d2ec87495f2 100644 --- a/component/loki/source/podlogs/reconciler.go +++ b/component/loki/source/podlogs/reconciler.go @@ -12,6 +12,8 @@ import ( "github.com/grafana/agent/component/loki/source/kubernetes/kubetail" monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2" "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/service/cluster" + "github.com/grafana/ckit/shard" "github.com/prometheus/common/model" promlabels "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -25,12 +27,14 @@ import ( // The reconciler reconciles the state of PodLogs on Kubernetes with targets to // collect logs from. type reconciler struct { - log log.Logger - tailer *kubetail.Manager + log log.Logger + tailer *kubetail.Manager + cluster cluster.Cluster reconcileMut sync.RWMutex podLogsSelector labels.Selector podLogsNamespaceSelector labels.Selector + shouldDistribute bool debugMut sync.RWMutex debugInfo []DiscoveredPodLogs @@ -38,10 +42,11 @@ type reconciler struct { // newReconciler creates a new reconciler which synchronizes targets with the // provided tailer whenever Reconcile is called. -func newReconciler(l log.Logger, tailer *kubetail.Manager) *reconciler { +func newReconciler(l log.Logger, tailer *kubetail.Manager, cluster cluster.Cluster) *reconciler { return &reconciler{ - log: l, - tailer: tailer, + log: l, + tailer: tailer, + cluster: cluster, podLogsSelector: labels.Everything(), podLogsNamespaceSelector: labels.Everything(), @@ -57,6 +62,21 @@ func (r *reconciler) UpdateSelectors(podLogs, namespace labels.Selector) { r.podLogsNamespaceSelector = namespace } +// SetDistribute configures whether targets are distributed amongst the cluster. +func (r *reconciler) SetDistribute(distribute bool) { + r.reconcileMut.Lock() + defer r.reconcileMut.Unlock() + + r.shouldDistribute = distribute +} + +func (r *reconciler) getShouldDistribute() bool { + r.reconcileMut.RLock() + defer r.reconcileMut.RUnlock() + + return r.shouldDistribute +} + // Reconcile synchronizes the set of running kubetail targets with the set of // discovered PodLogs. func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { @@ -90,6 +110,11 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { newDebugInfo = append(newDebugInfo, discoveredPodLogs) } + // Distribute targets if clustering is enabled. + if r.getShouldDistribute() { + newTasks = distributeTargets(r.cluster, newTasks) + } + if err := r.tailer.SyncTargets(ctx, newTasks); err != nil { level.Error(r.log).Log("msg", "failed to apply new tailers to run", "err", err) } @@ -101,6 +126,29 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { return nil } +func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetail.Target { + if c == nil { + return targets + } + + res := make([]*kubetail.Target, 0, (len(targets)+1)/len(c.Peers())) + + for _, target := range targets { + peers, err := c.Lookup(shard.StringKey(target.Labels().String()), 1, shard.OpReadWrite) + if err != nil { + // This can only fail in case we ask for more owners than the + // available peers. This will never happen, but in any case we fall + // back to owning the target ourselves. + res = append(res, target) + } + if peers[0].Self { + res = append(res, target) + } + } + + return res +} + func (r *reconciler) reconcilePodLogs(ctx context.Context, cli client.Client, podLogs *monitoringv1alpha2.PodLogs) ([]*kubetail.Target, DiscoveredPodLogs) { var targets []*kubetail.Target diff --git a/component/prometheus/interceptor.go b/component/prometheus/interceptor.go index 222e608362f9..7fc10f06b8f3 100644 --- a/component/prometheus/interceptor.go +++ b/component/prometheus/interceptor.go @@ -105,6 +105,9 @@ func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int if a.interceptor.onAppend != nil { return a.interceptor.onAppend(ref, l, t, v, a.child) } + if a.child == nil { + return 0, nil + } return a.child.Append(ref, l, t, v) } @@ -138,6 +141,9 @@ func (a *interceptappender) AppendExemplar( if a.interceptor.onAppendExemplar != nil { return a.interceptor.onAppendExemplar(ref, l, e, a.child) } + if a.child == nil { + return 0, nil + } return a.child.AppendExemplar(ref, l, e) } @@ -155,6 +161,9 @@ func (a *interceptappender) UpdateMetadata( if a.interceptor.onUpdateMetadata != nil { return a.interceptor.onUpdateMetadata(ref, l, m, a.child) } + if a.child == nil { + return 0, nil + } return a.child.UpdateMetadata(ref, l, m) } @@ -173,5 +182,8 @@ func (a *interceptappender) AppendHistogram( if a.interceptor.onAppendHistogram != nil { return a.interceptor.onAppendHistogram(ref, l, t, h, fh, a.child) } + if a.child == nil { + return 0, nil + } return a.child.AppendHistogram(ref, l, t, h, fh) } diff --git a/component/prometheus/operator/types.go b/component/prometheus/operator/types.go index 9efb2b8611e8..c4273b4a8b88 100644 --- a/component/prometheus/operator/types.go +++ b/component/prometheus/operator/types.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/agent/component/common/kubernetes" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/prometheus/scrape" + "github.com/grafana/agent/service/cluster" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" @@ -26,19 +27,13 @@ type Arguments struct { // LabelSelector allows filtering discovered monitor resources by labels LabelSelector *config.LabelSelector `river:"selector,block,optional"` - Clustering Clustering `river:"clustering,block,optional"` + Clustering cluster.ComponentBlock `river:"clustering,block,optional"` RelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"` Scrape ScrapeOptions `river:"scrape,block,optional"` } -// Clustering holds values that configure clustering-specific behavior. -type Clustering struct { - // TODO(@tpaschalis) Move this block to a shared place for all components using clustering. - Enabled bool `river:"enabled,attr"` -} - // ScrapeOptions holds values that configure scraping behavior. type ScrapeOptions struct { // DefaultScrapeInterval is the default interval to scrape targets. diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index 9161c5915902..750f71bd9aa0 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -88,13 +88,7 @@ type Arguments struct { ExtraMetrics bool `river:"extra_metrics,attr,optional"` EnableProtobufNegotiation bool `river:"enable_protobuf_negotiation,attr,optional"` - Clustering Clustering `river:"clustering,block,optional"` -} - -// Clustering holds values that configure clustering-specific behavior. -type Clustering struct { - // TODO(@tpaschalis) Move this block to a shared place for all components using clustering. - Enabled bool `river:"enabled,attr"` + Clustering cluster.ComponentBlock `river:"clustering,block,optional"` } // SetToDefault implements river.Defaulter. diff --git a/component/pyroscope/scrape/scrape.go b/component/pyroscope/scrape/scrape.go index cd807fbb8b24..846fee049e5e 100644 --- a/component/pyroscope/scrape/scrape.go +++ b/component/pyroscope/scrape/scrape.go @@ -81,7 +81,7 @@ type Arguments struct { ProfilingConfig ProfilingConfig `river:"profiling_config,block,optional"` - Clustering scrape.Clustering `river:"clustering,block,optional"` + Clustering cluster.ComponentBlock `river:"clustering,block,optional"` } type ProfilingConfig struct { diff --git a/converter/internal/prometheusconvert/component/scrape.go b/converter/internal/prometheusconvert/component/scrape.go index 096276e36057..651e4e2c5bc1 100644 --- a/converter/internal/prometheusconvert/component/scrape.go +++ b/converter/internal/prometheusconvert/component/scrape.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/agent/converter/diag" "github.com/grafana/agent/converter/internal/common" "github.com/grafana/agent/converter/internal/prometheusconvert/build" + "github.com/grafana/agent/service/cluster" prom_config "github.com/prometheus/prometheus/config" prom_discovery "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/storage" @@ -61,7 +62,7 @@ func toScrapeArguments(scrapeConfig *prom_config.ScrapeConfig, forwardTo []stora HTTPClientConfig: *common.ToHttpClientConfig(&scrapeConfig.HTTPClientConfig), ExtraMetrics: false, EnableProtobufNegotiation: false, - Clustering: scrape.Clustering{Enabled: false}, + Clustering: cluster.ComponentBlock{Enabled: false}, } } diff --git a/docs/sources/flow/reference/components/loki.source.kubernetes.md b/docs/sources/flow/reference/components/loki.source.kubernetes.md index e845215fcded..48db37ab6e70 100644 --- a/docs/sources/flow/reference/components/loki.source.kubernetes.md +++ b/docs/sources/flow/reference/components/loki.source.kubernetes.md @@ -82,6 +82,7 @@ client > authorization | [authorization][] | Configure generic authorization to client > oauth2 | [oauth2][] | Configure OAuth2 for authenticating to the endpoint. | no client > oauth2 > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no client > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no +clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no The `>` symbol indicates deeper levels of nesting. For example, `client > basic_auth` refers to a `basic_auth` block defined @@ -92,6 +93,7 @@ inside a `client` block. [authorization]: #authorization-block [oauth2]: #oauth2-block [tls_config]: #tls_config-block +[clustering]: #clustering-beta ### client block @@ -135,6 +137,22 @@ Name | Type | Description | Default | Required {{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" version="" >}} +### clustering (beta) + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`enabled` | `bool` | Distribute log collection with other cluster nodes. | | yes + +When the agent is [using clustering][], and `enabled` is set to true, then this +`loki.source.kubernetes` component instance opts-in to participating in the +cluster to distribute the load of log collection between all cluster nodes. + +If the agent is _not_ running in clustered mode, then the block is a no-op and +`loki.source.kubernetes` collects logs from every target it receives in its +arguments. + +[using clustering]: {{< relref "../../concepts/clustering.md" >}} + ## Exported fields `loki.source.kubernetes` does not export any fields. diff --git a/docs/sources/flow/reference/components/loki.source.podlogs.md b/docs/sources/flow/reference/components/loki.source.podlogs.md index 24411d84c1b8..f5f1b00fbd73 100644 --- a/docs/sources/flow/reference/components/loki.source.podlogs.md +++ b/docs/sources/flow/reference/components/loki.source.podlogs.md @@ -143,6 +143,7 @@ selector | [selector][] | Label selector for which `PodLogs` to discover. | no selector > match_expression | [match_expression][] | Label selector expression for which `PodLogs` to discover. | no namespace_selector | [selector][] | Label selector for which namespaces to discover `PodLogs` in. | no namespace_selector > match_expression | [match_expression][] | Label selector expression for which namespaces to discover `PodLogs` in. | no +clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no The `>` symbol indicates deeper levels of nesting. For example, `client > basic_auth` refers to a `basic_auth` block defined @@ -155,6 +156,7 @@ inside a `client` block. [tls_config]: #tls_config-block [selector]: #selector-block [match_expression]: #match_expression-block +[clustering]: #clustering-beta ### client block @@ -233,6 +235,21 @@ The `operator` argument must be one of the following strings: Both `selector` and `namespace_selector` can make use of multiple `match_expression` inner blocks which are treated as AND clauses. +### clustering (beta) + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`enabled` | `bool` | Distribute log collection with other cluster nodes. | | yes + +When the agent is [using clustering][], and `enabled` is set to true, then this +`loki.source.podlogs` component instance opts-in to participating in the +cluster to distribute the load of log collection between all cluster nodes. + +If the agent is _not_ running in clustered mode, then the block is a no-op and +`loki.source.podlogs` collects logs based on every PodLogs resource discovered. + +[using clustering]: {{< relref "../../concepts/clustering.md" >}} + ## Exported fields `loki.source.podlogs` does not export any fields. diff --git a/service/cluster/cluster.go b/service/cluster/cluster.go index cf95165ea321..398a0db92979 100644 --- a/service/cluster/cluster.go +++ b/service/cluster/cluster.go @@ -355,6 +355,13 @@ type Component interface { NotifyClusterChange() } +// ComponentBlock holds common arguments for clustering settings within a +// component. ComponentBlock is intended to be exposed as a block called +// "clustering". +type ComponentBlock struct { + Enabled bool `river:"enabled,attr"` +} + // Cluster is a read-only view of a cluster. type Cluster interface { // Lookup determines the set of replicationFactor owners for a given key.