Skip to content

Commit

Permalink
loki.source.podlogs: support clustering
Browse files Browse the repository at this point in the history
Add support for loki.source.podlogs to distribute targets using
clustering.
  • Loading branch information
rfratto committed Oct 26, 2023
1 parent 976cc58 commit 2973d4a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Main (unreleased)

- Support clustering in `loki.source.kubernetes` (@slim-bean).

- Support clustering in `loki.source.podlogs` (@rfratto).

v0.37.3 (2023-10-26)
-----------------

Expand Down
34 changes: 31 additions & 3 deletions component/loki/source/podlogs/podlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"github.com/grafana/agent/component/loki/source/kubernetes"
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/service/cluster"
"github.com/oklog/run"
kubeclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.podlogs",
Args: Arguments{},
Name: "loki.source.podlogs",
Args: Arguments{},
NeedsServices: []string{cluster.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
Expand All @@ -44,6 +46,14 @@ type Arguments struct {

Selector config.LabelSelector `river:"selector,block,optional"`
NamespaceSelector config.LabelSelector `river:"namespace_selector,block,optional"`

Clustering Clustering `river:"clustering,block,optional"`
}

// Clustering holds values that configure clustering-specific behavior.
type Clustering struct {
// TODO(@tpaschalis) Move this block to a shared place for all components using clustering.
Enabled bool `river:"enabled,attr"`
}

// DefaultArguments holds default settings for loki.source.kubernetes.
Expand Down Expand Up @@ -80,6 +90,7 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.DebugComponent = (*Component)(nil)
_ cluster.Component = (*Component)(nil)
)

// New creates a new loki.source.podlogs component.
Expand All @@ -96,9 +107,14 @@ func New(o component.Options, args Arguments) (*Component, error) {
return nil, err
}

data, err := o.GetServiceData(cluster.ServiceName)
if err != nil {
return nil, err
}

var (
tailer = kubetail.NewManager(o.Logger, nil)
reconciler = newReconciler(o.Logger, tailer)
reconciler = newReconciler(o.Logger, tailer, data.(cluster.Cluster))
controller = newController(o.Logger, reconciler)
)

Expand Down Expand Up @@ -202,6 +218,17 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements cluster.Component.
func (c *Component) NotifyClusterChange() {
c.mut.Lock()
defer c.mut.Unlock()

if !c.args.Clustering.Enabled {
return
}
c.controller.RequestReconcile()
}

// updateTailer updates the state of the tailer. mut must be held when calling.
func (c *Component) updateTailer(args Arguments) error {
if reflect.DeepEqual(c.args.Client, args.Client) && c.lastOptions != nil {
Expand Down Expand Up @@ -254,6 +281,7 @@ func (c *Component) updateReconciler(args Arguments) error {
}

c.reconciler.UpdateSelectors(sel, nsSel)
c.reconciler.SetDistribute(args.Clustering.Enabled)

// Request a reconcile so the new selectors get applied.
c.controller.RequestReconcile()
Expand Down
60 changes: 55 additions & 5 deletions component/loki/source/podlogs/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
monitoringv1alpha2 "github.com/grafana/agent/component/loki/source/podlogs/internal/apis/monitoring/v1alpha2"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
promlabels "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
Expand All @@ -25,23 +27,26 @@ import (
// The reconciler reconciles the state of PodLogs on Kubernetes with targets to
// collect logs from.
type reconciler struct {
log log.Logger
tailer *kubetail.Manager
log log.Logger
tailer *kubetail.Manager
cluster cluster.Cluster

reconcileMut sync.RWMutex
podLogsSelector labels.Selector
podLogsNamespaceSelector labels.Selector
shouldDistribute bool

debugMut sync.RWMutex
debugInfo []DiscoveredPodLogs
}

// newReconciler creates a new reconciler which synchronizes targets with the
// provided tailer whenever Reconcile is called.
func newReconciler(l log.Logger, tailer *kubetail.Manager) *reconciler {
func newReconciler(l log.Logger, tailer *kubetail.Manager, cluster cluster.Cluster) *reconciler {
return &reconciler{
log: l,
tailer: tailer,
log: l,
tailer: tailer,
cluster: cluster,

podLogsSelector: labels.Everything(),
podLogsNamespaceSelector: labels.Everything(),
Expand All @@ -57,6 +62,21 @@ func (r *reconciler) UpdateSelectors(podLogs, namespace labels.Selector) {
r.podLogsNamespaceSelector = namespace
}

// SetDistribute configures whether targets are distributed amongst the cluster.
func (r *reconciler) SetDistribute(distribute bool) {
r.reconcileMut.Lock()
defer r.reconcileMut.Unlock()

r.shouldDistribute = distribute
}

func (r *reconciler) getShouldDistribute() bool {
r.reconcileMut.RLock()
defer r.reconcileMut.RUnlock()

return r.shouldDistribute
}

// Reconcile synchronizes the set of running kubetail targets with the set of
// discovered PodLogs.
func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error {
Expand Down Expand Up @@ -90,6 +110,11 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error {
newDebugInfo = append(newDebugInfo, discoveredPodLogs)
}

// Distribute targets if clustering is enabled.
if r.getShouldDistribute() {
newTasks = distributeTargets(r.cluster, newTasks)
}

if err := r.tailer.SyncTargets(ctx, newTasks); err != nil {
level.Error(r.log).Log("msg", "failed to apply new tailers to run", "err", err)
}
Expand All @@ -101,6 +126,31 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error {
return nil
}

func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetail.Target {
if c == nil {
return targets
}

res := make([]*kubetail.Target, 0, (len(targets)+1)/len(c.Peers()))

// TODO(@tpaschalis): Make sure OpReadWrite is the correct operation;
// eg. this determines how clustering behaves when nodes are shutting down.
for _, target := range targets {
peers, err := c.Lookup(shard.StringKey(target.Labels().String()), 1, shard.OpReadWrite)
if err != nil {
// This can only fail in case we ask for more owners than the
// available peers. This will never happen, but in any case we fall
// back to owning the target ourselves.
res = append(res, target)
}
if peers[0].Self {
res = append(res, target)
}
}

return res
}

func (r *reconciler) reconcilePodLogs(ctx context.Context, cli client.Client, podLogs *monitoringv1alpha2.PodLogs) ([]*kubetail.Target, DiscoveredPodLogs) {
var targets []*kubetail.Target

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Name | Type | Description | Default | Required

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`enabled` | `bool` | Enables sharing targets with other cluster nodes. | `false` | yes
`enabled` | `bool` | Distribute log collection with other cluster nodes. | | yes

When the agent is [using clustering][], and `enabled` is set to true, then this
`loki.source.kubernetes` component instance opts-in to participating in the
Expand Down
17 changes: 17 additions & 0 deletions docs/sources/flow/reference/components/loki.source.podlogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ selector | [selector][] | Label selector for which `PodLogs` to discover. | no
selector > match_expression | [match_expression][] | Label selector expression for which `PodLogs` to discover. | no
namespace_selector | [selector][] | Label selector for which namespaces to discover `PodLogs` in. | no
namespace_selector > match_expression | [match_expression][] | Label selector expression for which namespaces to discover `PodLogs` in. | no
clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no

The `>` symbol indicates deeper levels of nesting. For example, `client >
basic_auth` refers to a `basic_auth` block defined
Expand All @@ -155,6 +156,7 @@ inside a `client` block.
[tls_config]: #tls_config-block
[selector]: #selector-block
[match_expression]: #match_expression-block
[clustering]: #clustering-beta

### client block

Expand Down Expand Up @@ -233,6 +235,21 @@ The `operator` argument must be one of the following strings:
Both `selector` and `namespace_selector` can make use of multiple
`match_expression` inner blocks which are treated as AND clauses.

### clustering (beta)

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`enabled` | `bool` | Distribute log collection with other cluster nodes. | | yes

When the agent is [using clustering][], and `enabled` is set to true, then this
`loki.source.podlogs` component instance opts-in to participating in the
cluster to distribute the load of log collection between all cluster nodes.

If the agent is _not_ running in clustered mode, then the block is a no-op and
`loki.source.podlogs` collects logs based on every PodLogs resource discovered.

[using clustering]: {{< relref "../../concepts/clustering.md" >}}

## Exported fields

`loki.source.podlogs` does not export any fields.
Expand Down

0 comments on commit 2973d4a

Please sign in to comment.