From 2973d4a414a6fb0267ae3e1732b51b94846785f4 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 26 Oct 2023 12:10:25 -0400 Subject: [PATCH] loki.source.podlogs: support clustering Add support for loki.source.podlogs to distribute targets using clustering. --- CHANGELOG.md | 2 + component/loki/source/podlogs/podlogs.go | 34 ++++++++++- component/loki/source/podlogs/reconciler.go | 60 +++++++++++++++++-- .../components/loki.source.kubernetes.md | 2 +- .../components/loki.source.podlogs.md | 17 ++++++ 5 files changed, 106 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73b8021e0580..e1a59dec5a93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,8 @@ Main (unreleased) - Support clustering in `loki.source.kubernetes` (@slim-bean). +- Support clustering in `loki.source.podlogs` (@rfratto). + v0.37.3 (2023-10-26) ----------------- diff --git a/component/loki/source/podlogs/podlogs.go b/component/loki/source/podlogs/podlogs.go index f051b84856ed..5a748942faf9 100644 --- a/component/loki/source/podlogs/podlogs.go +++ b/component/loki/source/podlogs/podlogs.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/agent/component/loki/source/kubernetes" "github.com/grafana/agent/component/loki/source/kubernetes/kubetail" "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/service/cluster" "github.com/oklog/run" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -25,8 +26,9 @@ import ( func init() { component.Register(component.Registration{ - Name: "loki.source.podlogs", - Args: Arguments{}, + Name: "loki.source.podlogs", + Args: Arguments{}, + NeedsServices: []string{cluster.ServiceName}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { return New(opts, args.(Arguments)) @@ -44,6 +46,14 @@ type Arguments struct { Selector config.LabelSelector `river:"selector,block,optional"` NamespaceSelector config.LabelSelector `river:"namespace_selector,block,optional"` + + Clustering Clustering `river:"clustering,block,optional"` +} + +// Clustering holds values that configure clustering-specific behavior. +type Clustering struct { + // TODO(@tpaschalis) Move this block to a shared place for all components using clustering. + Enabled bool `river:"enabled,attr"` } // DefaultArguments holds default settings for loki.source.kubernetes. @@ -80,6 +90,7 @@ type Component struct { var ( _ component.Component = (*Component)(nil) _ component.DebugComponent = (*Component)(nil) + _ cluster.Component = (*Component)(nil) ) // New creates a new loki.source.podlogs component. @@ -96,9 +107,14 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, err } + data, err := o.GetServiceData(cluster.ServiceName) + if err != nil { + return nil, err + } + var ( tailer = kubetail.NewManager(o.Logger, nil) - reconciler = newReconciler(o.Logger, tailer) + reconciler = newReconciler(o.Logger, tailer, data.(cluster.Cluster)) controller = newController(o.Logger, reconciler) ) @@ -202,6 +218,17 @@ func (c *Component) Update(args component.Arguments) error { return nil } +// NotifyClusterChange implements cluster.Component. +func (c *Component) NotifyClusterChange() { + c.mut.Lock() + defer c.mut.Unlock() + + if !c.args.Clustering.Enabled { + return + } + c.controller.RequestReconcile() +} + // updateTailer updates the state of the tailer. mut must be held when calling. func (c *Component) updateTailer(args Arguments) error { if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil { @@ -254,6 +281,7 @@ func (c *Component) updateReconciler(args Arguments) error { } c.reconciler.UpdateSelectors(sel, nsSel) + c.reconciler.SetDistribute(args.Clustering.Enabled) // Request a reconcile so the new selectors get applied. c.controller.RequestReconcile() diff --git a/component/loki/source/podlogs/reconciler.go b/component/loki/source/podlogs/reconciler.go index fd6db061fa21..cc25022dc28a 100644 --- a/component/loki/source/podlogs/reconciler.go +++ b/component/loki/source/podlogs/reconciler.go @@ -12,6 +12,8 @@ import ( "github.com/grafana/agent/component/loki/source/kubernetes/kubetail" monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2" "github.com/grafana/agent/pkg/flow/logging/level" + "github.com/grafana/agent/service/cluster" + "github.com/grafana/ckit/shard" "github.com/prometheus/common/model" promlabels "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" @@ -25,12 +27,14 @@ import ( // The reconciler reconciles the state of PodLogs on Kubernetes with targets to // collect logs from. type reconciler struct { - log log.Logger - tailer *kubetail.Manager + log log.Logger + tailer *kubetail.Manager + cluster cluster.Cluster reconcileMut sync.RWMutex podLogsSelector labels.Selector podLogsNamespaceSelector labels.Selector + shouldDistribute bool debugMut sync.RWMutex debugInfo []DiscoveredPodLogs @@ -38,10 +42,11 @@ type reconciler struct { // newReconciler creates a new reconciler which synchronizes targets with the // provided tailer whenever Reconcile is called. -func newReconciler(l log.Logger, tailer *kubetail.Manager) *reconciler { +func newReconciler(l log.Logger, tailer *kubetail.Manager, cluster cluster.Cluster) *reconciler { return &reconciler{ - log: l, - tailer: tailer, + log: l, + tailer: tailer, + cluster: cluster, podLogsSelector: labels.Everything(), podLogsNamespaceSelector: labels.Everything(), @@ -57,6 +62,21 @@ func (r *reconciler) UpdateSelectors(podLogs, namespace labels.Selector) { r.podLogsNamespaceSelector = namespace } +// SetDistribute configures whether targets are distributed amongst the cluster. +func (r *reconciler) SetDistribute(distribute bool) { + r.reconcileMut.Lock() + defer r.reconcileMut.Unlock() + + r.shouldDistribute = distribute +} + +func (r *reconciler) getShouldDistribute() bool { + r.reconcileMut.RLock() + defer r.reconcileMut.RUnlock() + + return r.shouldDistribute +} + // Reconcile synchronizes the set of running kubetail targets with the set of // discovered PodLogs. func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { @@ -90,6 +110,11 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { newDebugInfo = append(newDebugInfo, discoveredPodLogs) } + // Distribute targets if clustering is enabled. + if r.getShouldDistribute() { + newTasks = distributeTargets(r.cluster, newTasks) + } + if err := r.tailer.SyncTargets(ctx, newTasks); err != nil { level.Error(r.log).Log("msg", "failed to apply new tailers to run", "err", err) } @@ -101,6 +126,31 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { return nil } +func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetail.Target { + if c == nil { + return targets + } + + res := make([]*kubetail.Target, 0, (len(targets)+1)/len(c.Peers())) + + // TODO(@tpaschalis): Make sure OpReadWrite is the correct operation; + // eg. this determines how clustering behaves when nodes are shutting down. + for _, target := range targets { + peers, err := c.Lookup(shard.StringKey(target.Labels().String()), 1, shard.OpReadWrite) + if err != nil { + // This can only fail in case we ask for more owners than the + // available peers. This will never happen, but in any case we fall + // back to owning the target ourselves. + res = append(res, target) + } + if peers[0].Self { + res = append(res, target) + } + } + + return res +} + func (r *reconciler) reconcilePodLogs(ctx context.Context, cli client.Client, podLogs *monitoringv1alpha2.PodLogs) ([]*kubetail.Target, DiscoveredPodLogs) { var targets []*kubetail.Target diff --git a/docs/sources/flow/reference/components/loki.source.kubernetes.md b/docs/sources/flow/reference/components/loki.source.kubernetes.md index c64ed4a72f21..48db37ab6e70 100644 --- a/docs/sources/flow/reference/components/loki.source.kubernetes.md +++ b/docs/sources/flow/reference/components/loki.source.kubernetes.md @@ -141,7 +141,7 @@ Name | Type | Description | Default | Required Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- -`enabled` | `bool` | Enables sharing targets with other cluster nodes. | `false` | yes +`enabled` | `bool` | Distribute log collection with other cluster nodes. | | yes When the agent is [using clustering][], and `enabled` is set to true, then this `loki.source.kubernetes` component instance opts-in to participating in the diff --git a/docs/sources/flow/reference/components/loki.source.podlogs.md b/docs/sources/flow/reference/components/loki.source.podlogs.md index 24411d84c1b8..f5f1b00fbd73 100644 --- a/docs/sources/flow/reference/components/loki.source.podlogs.md +++ b/docs/sources/flow/reference/components/loki.source.podlogs.md @@ -143,6 +143,7 @@ selector | [selector][] | Label selector for which `PodLogs` to discover. | no selector > match_expression | [match_expression][] | Label selector expression for which `PodLogs` to discover. | no namespace_selector | [selector][] | Label selector for which namespaces to discover `PodLogs` in. | no namespace_selector > match_expression | [match_expression][] | Label selector expression for which namespaces to discover `PodLogs` in. | no +clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no The `>` symbol indicates deeper levels of nesting. For example, `client > basic_auth` refers to a `basic_auth` block defined @@ -155,6 +156,7 @@ inside a `client` block. [tls_config]: #tls_config-block [selector]: #selector-block [match_expression]: #match_expression-block +[clustering]: #clustering-beta ### client block @@ -233,6 +235,21 @@ The `operator` argument must be one of the following strings: Both `selector` and `namespace_selector` can make use of multiple `match_expression` inner blocks which are treated as AND clauses. +### clustering (beta) + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`enabled` | `bool` | Distribute log collection with other cluster nodes. | | yes + +When the agent is [using clustering][], and `enabled` is set to true, then this +`loki.source.podlogs` component instance opts-in to participating in the +cluster to distribute the load of log collection between all cluster nodes. + +If the agent is _not_ running in clustered mode, then the block is a no-op and +`loki.source.podlogs` collects logs based on every PodLogs resource discovered. + +[using clustering]: {{< relref "../../concepts/clustering.md" >}} + ## Exported fields `loki.source.podlogs` does not export any fields.