Skip to content

Commit

Permalink
loki.source.kubernetes: support clustering
Browse files Browse the repository at this point in the history
Add support for loki.source.kubernetes to distribute targets using
clustering.

Closes grafana#4502

Co-authored-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
rfratto and slim-bean committed Oct 26, 2023
1 parent 0d388a0 commit d762bc3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ Main (unreleased)
- Improve detection of rolled log files in `loki.source.kubernetes` and
`loki.source.podlogs` (@slim-bean).

- Support clustering in `loki.source.kubernetes` (@slim-bean).

v0.37.3 (2023-10-26)
-----------------

Expand Down
53 changes: 43 additions & 10 deletions component/loki/source/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"github.com/grafana/agent/component/discovery"
"github.com/grafana/agent/component/loki/source/kubernetes/kubetail"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/agent/service/cluster"
"k8s.io/client-go/kubernetes"
)

func init() {
component.Register(component.Registration{
Name: "loki.source.kubernetes",
Args: Arguments{},
Name: "loki.source.kubernetes",
Args: Arguments{},
NeedsServices: []string{cluster.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
Expand All @@ -40,6 +42,14 @@ type Arguments struct {

// Client settings to connect to Kubernetes.
Client commonk8s.ClientArguments `river:"client,block,optional"`

Clustering Clustering `river:"clustering,block,optional"`
}

// Clustering holds values that configure clustering-specific behavior.
type Clustering struct {
// TODO(@tpaschalis) Move this block to a shared place for all components using clustering.
Enabled bool `river:"enabled,attr"`
}

// DefaultArguments holds default settings for loki.source.kubernetes.
Expand All @@ -57,6 +67,7 @@ type Component struct {
log log.Logger
opts component.Options
positions positions.Positions
cluster cluster.Cluster

mut sync.Mutex
args Arguments
Expand All @@ -72,6 +83,7 @@ type Component struct {
var (
_ component.Component = (*Component)(nil)
_ component.DebugComponent = (*Component)(nil)
_ cluster.Component = (*Component)(nil)
)

// New creates a new loki.source.kubernetes component.
Expand All @@ -89,7 +101,13 @@ func New(o component.Options, args Arguments) (*Component, error) {
return nil, err
}

data, err := o.GetServiceData(cluster.ServiceName)
if err != nil {
return nil, err
}

c := &Component{
cluster: data.(cluster.Cluster),
log: o.Logger,
opts: o,
handler: loki.NewLogsReceiver(),
Expand Down Expand Up @@ -168,28 +186,43 @@ func (c *Component) Update(args component.Arguments) error {
// No-op: manager already exists and options didn't change.
}

// Convert input targets into targets to give to tailer.
targets := make([]*kubetail.Target, 0, len(newArgs.Targets))
c.resyncTargets(newArgs.Targets)
c.args = newArgs
return nil
}

func (c *Component) resyncTargets(targets []discovery.Target) {
distTargets := discovery.NewDistributedTargets(c.args.Clustering.Enabled, c.cluster, targets)
targets = distTargets.Get()

for _, inTarget := range newArgs.Targets {
lset := inTarget.Labels()
tailTargets := make([]*kubetail.Target, 0, len(targets))
for _, target := range targets {
lset := target.Labels()
processed, err := kubetail.PrepareLabels(lset, c.opts.ID)
if err != nil {
// TODO(rfratto): should this set the health of the component?
level.Error(c.log).Log("msg", "failed to process input target", "target", lset.String(), "err", err)
continue
}
targets = append(targets, kubetail.NewTarget(lset, processed))
tailTargets = append(tailTargets, kubetail.NewTarget(lset, processed))
}

// This will never fail because it only fails if the context gets canceled.
//
// TODO(rfratto): should we have a generous update timeout to prevent this
// from potentially hanging forever?
_ = c.tailer.SyncTargets(context.Background(), targets)
_ = c.tailer.SyncTargets(context.Background(), tailTargets)
}

c.args = newArgs
return nil
// NotifyClusterChange implements cluster.Component.
func (c *Component) NotifyClusterChange() {
c.mut.Lock()
defer c.mut.Unlock()

if !c.args.Clustering.Enabled {
return
}
c.resyncTargets(c.args.Targets)
}

// getTailerOptions gets tailer options from arguments. If args hasn't changed
Expand Down
18 changes: 18 additions & 0 deletions docs/sources/flow/reference/components/loki.source.kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ client > authorization | [authorization][] | Configure generic authorization to
client > oauth2 | [oauth2][] | Configure OAuth2 for authenticating to the endpoint. | no
client > oauth2 > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no
client > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no
clustering | [clustering][] | Configure the component for when the Agent is running in clustered mode. | no

The `>` symbol indicates deeper levels of nesting. For example, `client >
basic_auth` refers to a `basic_auth` block defined
Expand All @@ -92,6 +93,7 @@ inside a `client` block.
[authorization]: #authorization-block
[oauth2]: #oauth2-block
[tls_config]: #tls_config-block
[clustering]: #clustering-beta

### client block

Expand Down Expand Up @@ -135,6 +137,22 @@ Name | Type | Description | Default | Required

{{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" version="<AGENT VERSION>" >}}

### clustering (beta)

Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`enabled` | `bool` | Enables sharing targets with other cluster nodes. | `false` | yes

When the agent is [using clustering][], and `enabled` is set to true, then this
`loki.source.kubernetes` component instance opts-in to participating in the
cluster to distribute the load of log collection between all cluster nodes.

If the agent is _not_ running in clustered mode, then the block is a no-op and
`loki.source.kubernetes` collects logs from every target it receives in its
arguments.

[using clustering]: {{< relref "../../concepts/clustering.md" >}}

## Exported fields

`loki.source.kubernetes` does not export any fields.
Expand Down

0 comments on commit d762bc3

Please sign in to comment.