Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for retry and sending queue for Jaeger exporter #1401

Merged
merged 1 commit into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions exporter/jaegerexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ connection. See [grpc.WithInsecure()](https://godoc.org/google.golang.org/grpc#W
of authority (e.g. :authority header field) in requests (typically used for testing).
- `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers.
See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md).
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `retry_on_failure`
- `disabled` (default = false)
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `disabled` is `true`
- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `disabled` is `true`
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `disabled` is `true`
- `sending_queue`
- `disabled` (default = true)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `disabled` is `true`
- `queue_size` (default = 5000): Maximum number of batches kept in memory before data; ignored if `disabled` is `true`;
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.

Example:

Expand Down
6 changes: 5 additions & 1 deletion exporter/jaegerexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package jaegerexporter
import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Jaeger gRPC exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
}
45 changes: 32 additions & 13 deletions exporter/jaegerexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -40,20 +43,36 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)

e0 := cfg.Exporters["jaeger"]

// Endpoint doesn't have a default value so set it directly.
defaultCfg := factory.CreateDefaultConfig().(*Config)
defaultCfg.Endpoint = "some.target:55678"
defaultCfg.GRPCClientSettings.Endpoint = defaultCfg.Endpoint
defaultCfg.GRPCClientSettings.TLSSetting = configtls.TLSClientSetting{
Insecure: true,
}
assert.Equal(t, defaultCfg, e0)
assert.Equal(t, e0, factory.CreateDefaultConfig())

e1 := cfg.Exporters["jaeger/2"]
assert.Equal(t, "jaeger/2", e1.(*Config).Name())
assert.Equal(t, "a.new.target:1234", e1.(*Config).Endpoint)
assert.Equal(t, "round_robin", e1.(*Config).GRPCClientSettings.BalancerName)
assert.Equal(t, e1,
&Config{
ExporterSettings: configmodels.ExporterSettings{
NameVal: "jaeger/2",
TypeVal: "jaeger",
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Disabled: false,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Disabled: false,
NumConsumers: 2,
QueueSize: 10,
},
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "a.new.target:1234",
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
})

params := component.ExporterCreateParams{Logger: zap.NewNop()}
te, err := factory.CreateTraceExporter(context.Background(), params, e1)
require.NoError(t, err)
Expand Down
19 changes: 12 additions & 7 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,34 @@ import (
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

// New returns a new Jaeger gRPC exporter.
// newTraceExporter returns a new Jaeger gRPC exporter.
// The exporter name is the name to be used in the observability of the exporter.
// The collectorEndpoint should be of the form "hostname:14250" (a gRPC target).
func New(config *Config) (component.TraceExporter, error) {
func newTraceExporter(cfg *Config) (component.TraceExporter, error) {

opts, err := config.GRPCClientSettings.ToDialOptions()
opts, err := cfg.GRPCClientSettings.ToDialOptions()
if err != nil {
return nil, err
}

client, err := grpc.Dial(config.GRPCClientSettings.Endpoint, opts...)
client, err := grpc.Dial(cfg.GRPCClientSettings.Endpoint, opts...)
if err != nil {
return nil, err
}

collectorServiceClient := jaegerproto.NewCollectorServiceClient(client)
s := &protoGRPCSender{
client: collectorServiceClient,
metadata: metadata.New(config.GRPCClientSettings.Headers),
waitForReady: config.WaitForReady,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
waitForReady: cfg.WaitForReady,
}

exp, err := exporterhelper.NewTraceExporter(config, s.pushTraceData)
exp, err := exporterhelper.NewTraceExporter(
cfg, s.pushTraceData,
exporterhelper.WithTimeout(cfg.TimeoutSettings),
exporterhelper.WithRetry(cfg.RetrySettings),
exporterhelper.WithQueue(cfg.QueueSettings),
)

return exp, err
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/jaegerexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func TestNew(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(&tt.args.config)
got, err := newTraceExporter(&tt.args.config)
if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("newTraceExporter() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got == nil {
Expand Down
8 changes: 7 additions & 1 deletion exporter/jaegerexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ func NewFactory() component.ExporterFactory {
}

func createDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the blocker for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to keep PRs focused, and also I need to deprecate the queue retry processor before enabling this on all the exporters

qs := exporterhelper.CreateDefaultQueueSettings()
qs.Disabled = true
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
GRPCClientSettings: configgrpc.GRPCClientSettings{
// We almost read 0 bytes, so no need to tune ReadBufferSize.
WriteBufferSize: 512 * 1024,
Expand All @@ -65,7 +71,7 @@ func createTraceExporter(
return nil, err
}

exp, err := New(expCfg)
exp, err := newTraceExporter(expCfg)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions exporter/jaegerexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ processors:

exporters:
jaeger:
endpoint: "some.target:55678"
insecure: true
jaeger/2:
endpoint: "a.new.target:1234"
balancer_name: "round_robin"

timeout: 10s
sending_queue:
disabled: false
num_consumers: 2
queue_size: 10
retry_on_failure:
disabled: false
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m

service:
pipelines:
Expand Down
2 changes: 2 additions & 0 deletions testbed/testbed/senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func NewJaegerGRPCDataSender(host string, port int) *JaegerGRPCDataSender {
func (je *JaegerGRPCDataSender) Start() error {
factory := jaegerexporter.NewFactory()
cfg := factory.CreateDefaultConfig().(*jaegerexporter.Config)
// Disable retries, we should push data and if error just log it.
cfg.RetrySettings.Disabled = true
cfg.Endpoint = fmt.Sprintf("%s:%d", je.Host, je.Port)
cfg.TLSSetting = configtls.TLSClientSetting{
Insecure: true,
Expand Down