Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a roundrobin connector, that can help single thread components to scale #32853

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .chloggen/roundrobinconnector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: roundrobinconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a roundrobin connector, that can help single thread components to scale

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32853]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ connector/datadogconnector/ @open-telemetry/collect
connector/exceptionsconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @marctc
connector/failoverconnector/ @open-telemetry/collector-contrib-approvers @akats7 @djaglowski @fatsheep9146
connector/grafanacloudconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @rlankfo @jcreixell
connector/roundrobinconnector/ @open-telemetry/collector-contrib-approvers @bogdandrutu
connector/routingconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @mwear
connector/servicegraphconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @mapno
connector/spanmetricsconnector/ @open-telemetry/collector-contrib-approvers @portertech @Frapschen
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ body:
- connector/exceptions
- connector/failover
- connector/grafanacloud
- connector/roundrobin
- connector/routing
- connector/servicegraph
- connector/spanmetrics
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ body:
- connector/exceptions
- connector/failover
- connector/grafanacloud
- connector/roundrobin
- connector/routing
- connector/servicegraph
- connector/spanmetrics
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ body:
- connector/exceptions
- connector/failover
- connector/grafanacloud
- connector/roundrobin
- connector/routing
- connector/servicegraph
- connector/spanmetrics
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ body:
- connector/exceptions
- connector/failover
- connector/grafanacloud
- connector/roundrobin
- connector/routing
- connector/servicegraph
- connector/spanmetrics
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ connectors:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector v0.99.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector v0.99.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/grafanacloudconnector v0.99.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/roundrobinconnector v0.99.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector v0.99.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.99.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.99.0
Expand Down Expand Up @@ -434,6 +435,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector => ../../connector/exceptionsconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector => ../../connector/failoverconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/grafanacloudconnector => ../../connector/grafanacloudconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/roundrobinconnector => ../../connector/roundrobinconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector => ../../connector/routingconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector => ../../connector/servicegraphconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector => ../../connector/spanmetricsconnector
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/exceptionsconnector v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/grafanacloudconnector v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/roundrobinconnector v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.99.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.99.0
Expand Down Expand Up @@ -1170,6 +1171,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/fail

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/grafanacloudconnector => ../../connector/grafanacloudconnector

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/roundrobinconnector => ../../connector/roundrobinconnector

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector => ../../connector/routingconnector

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector => ../../connector/servicegraphconnector
Expand Down
1 change: 1 addition & 0 deletions connector/roundrobinconnector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
71 changes: 71 additions & 0 deletions connector/roundrobinconnector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Round-Robin Connector
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aconnector%2Froundrobin%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aconnector%2Froundrobin) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aconnector%2Froundrobin%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aconnector%2Froundrobin) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@bogdandrutu](https://www.github.com/bogdandrutu) |

[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib

## Supported Pipeline Types

| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] |
| ------------------------ | ------------------------ | ----------------- |
| traces | traces | [beta] |
| metrics | metrics | [beta] |
| logs | logs | [beta] |

[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels
<!-- end autogenerated section -->

The `roundrobin` connector can fork pipelines of the same type and equally split the load between them.

## Configuration

If you are not already familiar with connectors, you may find it helpful to first visit the [Connectors README].

The `roundrobin` connector does not have any configuration settings.

```yaml
receivers:
otlp:
exporters:
prometheusremotewrite/1:
prometheusremotewrite/2:
connectors:
roundrobin:
```

Preprocess data, then export using multiple exporter instances to scale the throughput if the exporter
does not support scale well (e.g. prometheusremotewrite).

```yaml
receivers:
otlp:
processors:
resourcedetection:
batch:
exporters:
prometheusremotewrite/1:
prometheusremotewrite/2:
connectors:
roundrobin:
service:
pipelines:
metrics:
receivers: [otlp]
processors: [resourcedetection, batch]
exporters: [roundrobin]
metrics/1:
receivers: [roundrobin]
exporters: [prometheusremotewrite/1]
metrics/2:
receivers: [roundrobin]
exporters: [prometheusremotewrite/2]
```

[Connectors README]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md
7 changes: 7 additions & 0 deletions connector/roundrobinconnector/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package roundrobinconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/roundrobinconnector"

// Config for the connector
type Config struct{}
86 changes: 86 additions & 0 deletions connector/roundrobinconnector/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package roundrobinconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/roundrobinconnector"

import (
"context"
"sync/atomic"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func allConsumers[T any](r router[T]) ([]T, error) {
pipeIDs := r.PipelineIDs()
consumers := make([]T, len(pipeIDs))
for i, pipeID := range pipeIDs {
cons, err := r.Consumer(pipeID)
if err != nil {
return nil, err
}
consumers[i] = cons
}
return consumers, nil
}

type router[T any] interface {
PipelineIDs() []component.ID
Consumer(pipelineIDs ...component.ID) (T, error)
}
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

func newLogs(nextConsumer consumer.Logs) (connector.Logs, error) {
nextConsumers, err := allConsumers[consumer.Logs](nextConsumer.(connector.LogsRouterAndConsumer))
if err != nil {
return nil, err
}
return &roundRobin{nextLogs: nextConsumers}, nil
}

func newMetrics(nextConsumer consumer.Metrics) (connector.Metrics, error) {
nextConsumers, err := allConsumers[consumer.Metrics](nextConsumer.(connector.MetricsRouterAndConsumer))
if err != nil {
return nil, err
}
return &roundRobin{nextMetrics: nextConsumers}, nil
}

func newTraces(nextConsumer consumer.Traces) (connector.Traces, error) {
nextConsumers, err := allConsumers[consumer.Traces](nextConsumer.(connector.TracesRouterAndConsumer))
if err != nil {
return nil, err
}
return &roundRobin{nextTraces: nextConsumers}, nil
}

// roundRobin is used to pass signals directly from one pipeline to one of the configured once in a round-robin mode.
// This is useful when there is a need to scale (shard) data processing and downstream components do not
// handle concurrent requests very well.
type roundRobin struct {
component.StartFunc
component.ShutdownFunc
nextConsumer atomic.Uint64
nextMetrics []consumer.Metrics
nextLogs []consumer.Logs
nextTraces []consumer.Traces
}

func (rr *roundRobin) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (rr *roundRobin) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return rr.nextLogs[rr.nextConsumer.Add(1)%uint64(len(rr.nextLogs))].ConsumeLogs(ctx, ld)
}

func (rr *roundRobin) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
return rr.nextMetrics[rr.nextConsumer.Add(1)%uint64(len(rr.nextMetrics))].ConsumeMetrics(ctx, md)
}

func (rr *roundRobin) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return rr.nextTraces[rr.nextConsumer.Add(1)%uint64(len(rr.nextTraces))].ConsumeTraces(ctx, td)
}
Loading