diff --git a/cmd/agent/app/agent_test.go b/cmd/agent/app/agent_test.go index a9422e579a0..0d9612f1d20 100644 --- a/cmd/agent/app/agent_test.go +++ b/cmd/agent/app/agent_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" jmetrics "github.com/jaegertracing/jaeger/pkg/metrics" @@ -33,7 +34,7 @@ import ( func TestAgentStartError(t *testing.T) { cfg := &Builder{} - agent, err := cfg.CreateAgent(zap.NewNop()) + agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) require.NoError(t, err) agent.httpServer.Addr = "bad-address" assert.Error(t, agent.Run()) @@ -100,7 +101,8 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) { }, } logger, logBuf := testutils.NewLogger() - agent, err := cfg.CreateAgent(logger) + f, _ := cfg.Metrics.CreateMetricsFactory("jaeger") + agent, err := cfg.CreateAgent(fakeCollectorProxy{}, logger, f) require.NoError(t, err) ch := make(chan error, 2) go func() { diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 284f534f34f..e779313ac18 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -17,18 +17,15 @@ package app import ( "fmt" "net/http" - "time" "github.com/apache/thrift/lib/go/thrift" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" - "github.com/uber/tchannel-go" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" - tchreporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" jmetrics "github.com/jaegertracing/jaeger/pkg/metrics" @@ -37,11 +34,9 @@ import ( ) const ( - defaultQueueSize = 1000 - defaultMaxPacketSize = 65000 - defaultServerWorkers = 10 - defaultMinPeers = 3 - defaultConnCheckTimeout = 250 * time.Millisecond + defaultQueueSize = 1000 + defaultMaxPacketSize = 65000 + defaultServerWorkers = 10 defaultHTTPServerHostPort = ":5778" @@ -67,16 +62,19 @@ var ( } ) +// CollectorProxy provides access to Reporter and ClientConfigManager +type CollectorProxy interface { + GetReporter() reporter.Reporter + GetManager() httpserver.ClientConfigManager +} + // Builder Struct to hold configurations type Builder struct { Processors []ProcessorConfiguration `yaml:"processors"` HTTPServer HTTPServerConfiguration `yaml:"httpServer"` Metrics jmetrics.Builder `yaml:"metrics"` - tchreporter.Builder `yaml:",inline"` - - otherReporters []reporter.Reporter - metricsFactory metrics.Factory + reporters []reporter.Reporter } // ProcessorConfiguration holds config for a processor that receives spans from Server @@ -100,62 +98,35 @@ type HTTPServerConfiguration struct { } // WithReporter adds auxiliary reporters. -func (b *Builder) WithReporter(r reporter.Reporter) *Builder { - b.otherReporters = append(b.otherReporters, r) - return b -} - -// WithMetricsFactory sets an externally initialized metrics factory. -func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder { - b.metricsFactory = mf +func (b *Builder) WithReporter(r ...reporter.Reporter) *Builder { + b.reporters = append(b.reporters, r...) return b } -func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (*tchreporter.Reporter, error) { - return b.CreateReporter(mFactory, logger) -} - -func (b *Builder) getMetricsFactory() (metrics.Factory, error) { - if b.metricsFactory != nil { - return b.metricsFactory, nil - } - - baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger") +// CreateAgent creates the Agent +func (b *Builder) CreateAgent(primaryProxy CollectorProxy, logger *zap.Logger, mFactory metrics.Factory) (*Agent, error) { + r := b.getReporter(primaryProxy) + processors, err := b.getProcessors(r, mFactory, logger) if err != nil { return nil, err } - - return baseFactory.Namespace("agent", nil), nil + server := b.HTTPServer.getHTTPServer(primaryProxy.GetManager(), mFactory, &b.Metrics) + return NewAgent(processors, server, logger), nil } -// CreateAgent creates the Agent -func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { - mFactory, err := b.getMetricsFactory() - if err != nil { - return nil, errors.Wrap(err, "cannot create metrics factory") - } - mainReporter, err := b.createMainReporter(mFactory, logger) - if err != nil { - return nil, errors.Wrap(err, "cannot create main Reporter") +func (b *Builder) getReporter(primaryProxy CollectorProxy) reporter.Reporter { + if len(b.reporters) == 0 { + return primaryProxy.GetReporter() } - var rep reporter.Reporter = mainReporter - if len(b.otherReporters) > 0 { - reps := append([]reporter.Reporter{mainReporter}, b.otherReporters...) - rep = reporter.NewMultiReporter(reps...) + rep := make([]reporter.Reporter, len(b.reporters)+1) + rep[0] = primaryProxy.GetReporter() + for i, r := range b.reporters { + rep[i+1] = r } - processors, err := b.GetProcessors(rep, mFactory, logger) - if err != nil { - return nil, err - } - httpServer := b.HTTPServer.GetHTTPServer(b.CollectorServiceName, mainReporter.Channel(), mFactory) - if h := b.Metrics.Handler(); mFactory != nil && h != nil { - httpServer.Handler.(*http.ServeMux).Handle(b.Metrics.HTTPRoute, h) - } - return NewAgent(processors, httpServer, logger), nil + return reporter.NewMultiReporter(rep...) } -// GetProcessors creates Processors with attached Reporter -func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) { +func (b *Builder) getProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) { retMe := make([]processors.Processor, len(b.Processors)) for idx, cfg := range b.Processors { protoFactory, ok := protocolFactoryMap[cfg.Protocol] @@ -185,12 +156,15 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, } // GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries. -func (c HTTPServerConfiguration) GetHTTPServer(svc string, channel *tchannel.Channel, mFactory metrics.Factory) *http.Server { - mgr := httpserver.NewCollectorProxy(svc, channel, mFactory) +func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) *http.Server { if c.HostPort == "" { c.HostPort = defaultHTTPServerHostPort } - return httpserver.NewHTTPServer(c.HostPort, mgr, mFactory) + server := httpserver.NewHTTPServer(c.HostPort, manager, mFactory) + if h := mBuilder.Handler(); mFactory != nil && h != nil { + server.Handler.(*http.ServeMux).Handle(mBuilder.HTTPRoute, h) + } + return server } // GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 955e425e9b5..89f0de528c0 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -16,6 +16,7 @@ package app import ( "errors" + "fmt" "strings" "testing" @@ -25,7 +26,11 @@ import ( "go.uber.org/zap" "gopkg.in/yaml.v2" + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/thrift-gen/baggage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/sampling" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -51,14 +56,6 @@ processors: httpServer: hostPort: 4.4.4.4:5778 - -collectorHostPorts: - - 127.0.0.1:14267 - - 127.0.0.1:14268 - - 127.0.0.1:14269 - -collectorServiceName: some-collector-service -minPeers: 4 ` func TestBuilderFromConfig(t *testing.T) { @@ -101,59 +98,24 @@ func TestBuilderFromConfig(t *testing.T) { }, }, cfg.Processors[2]) assert.Equal(t, "4.4.4.4:5778", cfg.HTTPServer.HostPort) - - assert.Equal(t, 4, cfg.DiscoveryMinPeers) - assert.Equal(t, "some-collector-service", cfg.CollectorServiceName) - assert.Equal( - t, - []string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"}, - cfg.CollectorHostPorts) } func TestBuilderWithExtraReporter(t *testing.T) { cfg := &Builder{} - cfg.WithReporter(fakeReporter{}) - agent, err := cfg.CreateAgent(zap.NewNop()) + agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) } -func TestBuilderMetrics(t *testing.T) { - mf := metrics.NullFactory - b := new(Builder).WithMetricsFactory(mf) - mf2, err := b.getMetricsFactory() - assert.NoError(t, err) - assert.Equal(t, mf, mf2) -} - func TestBuilderMetricsHandler(t *testing.T) { b := &Builder{} b.Metrics.Backend = "expvar" b.Metrics.HTTPRoute = "/expvar" - factory, err := b.Metrics.CreateMetricsFactory("test") - assert.NoError(t, err) - assert.NotNil(t, factory) - b.metricsFactory = factory - agent, err := b.CreateAgent(zap.NewNop()) + agent, err := b.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) } -func TestBuilderMetricsError(t *testing.T) { - b := &Builder{} - b.Metrics.Backend = "invalid" - _, err := b.CreateAgent(zap.NewNop()) - assert.EqualError(t, err, "cannot create metrics factory: unknown metrics backend specified") -} - -func TestBuilderWithDiscoveryError(t *testing.T) { - cfg := &Builder{} - cfg.WithDiscoverer(fakeDiscoverer{}) - agent, err := cfg.CreateAgent(zap.NewNop()) - assert.EqualError(t, err, "cannot create main Reporter: cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified") - assert.Nil(t, agent) -} - func TestBuilderWithProcessorErrors(t *testing.T) { testCases := []struct { model Model @@ -180,7 +142,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) { }, }, } - _, err := cfg.CreateAgent(zap.NewNop()) + _, err := cfg.CreateAgent(&fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.Error(t, err) if testCase.err != "" { assert.EqualError(t, err, testCase.err) @@ -190,18 +152,39 @@ func TestBuilderWithProcessorErrors(t *testing.T) { } } -type fakeReporter struct{} +func TestMultipleCollectorProxies(t *testing.T) { + b := Builder{} + ra := fakeCollectorProxy{} + rb := fakeCollectorProxy{} + b.WithReporter(ra) + r := b.getReporter(rb) + mr, ok := r.(reporter.MultiReporter) + require.True(t, ok) + fmt.Println(mr) + assert.Equal(t, rb, mr[0]) + assert.Equal(t, ra, mr[1]) +} -func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { - return nil +type fakeCollectorProxy struct { } -func (fr fakeReporter) EmitBatch(batch *jaeger.Batch) (err error) { - return nil +func (f fakeCollectorProxy) GetReporter() reporter.Reporter { + return fakeCollectorProxy{} +} +func (f fakeCollectorProxy) GetManager() httpserver.ClientConfigManager { + return fakeCollectorProxy{} } -type fakeDiscoverer struct{} +func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { + return nil +} +func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) { + return nil +} -func (fd fakeDiscoverer) Instances() ([]string, error) { - return nil, errors.New("discoverer error") +func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { + return nil, errors.New("no peers available") +} +func (fakeCollectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) { + return nil, nil } diff --git a/cmd/agent/app/flags.go b/cmd/agent/app/flags.go index 6eb747bf2ce..f9b5dbde689 100644 --- a/cmd/agent/app/flags.go +++ b/cmd/agent/app/flags.go @@ -17,7 +17,6 @@ package app import ( "flag" "fmt" - "strings" "github.com/spf13/viper" ) @@ -27,10 +26,7 @@ const ( suffixServerQueueSize = "server-queue-size" suffixServerMaxPacketSize = "server-max-packet-size" suffixServerHostPort = "server-host-port" - collectorHostPort = "collector.host-port" httpServerHostPort = "http-server.host-port" - discoveryMinPeers = "discovery.min-peers" - discoveryConnCheckTimeout = "discovery.conn-check-timeout" ) var defaultProcessors = []struct { @@ -52,22 +48,10 @@ func AddFlags(flags *flag.FlagSet) { flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server") flags.String(prefix+suffixServerHostPort, processor.hostPort, "host:port for the UDP server") } - flags.String( - collectorHostPort, - "", - "comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") flags.String( httpServerHostPort, defaultHTTPServerHostPort, "host:port of the http server (e.g. for /sampling point and /baggage endpoint)") - flags.Int( - discoveryMinPeers, - defaultMinPeers, - "if using service discovery, the min number of connections to maintain to the backend") - flags.Duration( - discoveryConnCheckTimeout, - defaultConnCheckTimeout, - "sets the timeout used when establishing new connections") } // InitFromViper initializes Builder with properties retrieved from Viper. @@ -84,11 +68,6 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder { b.Processors = append(b.Processors, *p) } - if len(v.GetString(collectorHostPort)) > 0 { - b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",") - } b.HTTPServer.HostPort = v.GetString(httpServerHostPort) - b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers) - b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout) return b } diff --git a/cmd/agent/app/flags_test.go b/cmd/agent/app/flags_test.go index 65a6025524c..9a271f0287a 100644 --- a/cmd/agent/app/flags_test.go +++ b/cmd/agent/app/flags_test.go @@ -34,8 +34,6 @@ func TestBingFlags(t *testing.T) { v.BindPFlags(command.PersistentFlags()) err := command.ParseFlags([]string{ - "--collector.host-port=1.2.3.4:555,1.2.3.4:666", - "--discovery.min-peers=42", "--http-server.host-port=:8080", "--processor.jaeger-binary.server-host-port=:1111", "--processor.jaeger-binary.server-max-packet-size=4242", @@ -46,8 +44,6 @@ func TestBingFlags(t *testing.T) { b.InitFromViper(v) assert.Equal(t, 3, len(b.Processors)) - assert.Equal(t, []string{"1.2.3.4:555", "1.2.3.4:666"}, b.CollectorHostPorts) - assert.Equal(t, 42, b.DiscoveryMinPeers) assert.Equal(t, ":8080", b.HTTPServer.HostPort) assert.Equal(t, ":1111", b.Processors[2].Server.HostPort) assert.Equal(t, 4242, b.Processors[2].Server.MaxPacketSize) diff --git a/cmd/agent/app/reporter/tchannel/builder_test.go b/cmd/agent/app/reporter/tchannel/builder_test.go index e92a18e9d2c..6ed8be97d67 100644 --- a/cmd/agent/app/reporter/tchannel/builder_test.go +++ b/cmd/agent/app/reporter/tchannel/builder_test.go @@ -15,6 +15,7 @@ package tchannel import ( + "errors" "testing" "github.com/stretchr/testify/assert" @@ -49,6 +50,11 @@ func TestBuilderFromConfig(t *testing.T) { t, []string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"}, cfg.CollectorHostPorts) + r, err := cfg.CreateReporter(metrics.NullFactory, zap.NewNop()) + require.NoError(t, err) + assert.NotNil(t, r) + assert.Equal(t, "some-collector-service", r.CollectorServiceName()) + } func TestBuilderWithDiscovery(t *testing.T) { @@ -66,6 +72,13 @@ func TestBuilderWithDiscovery(t *testing.T) { assert.NotNil(t, agent) } +func TestBuilderWithDiscoveryError(t *testing.T) { + tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) + rep, err := tbuilder.CreateReporter(metrics.NullFactory, zap.NewNop()) + assert.EqualError(t, err, "cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified") + assert.Nil(t, rep) +} + func TestBuilderWithCollectorServiceName(t *testing.T) { cfg := &Builder{} cfg.WithCollectorServiceName("svc") @@ -88,6 +101,7 @@ func TestBuilderWithChannel(t *testing.T) { rep, err := cfg.CreateReporter(metrics.NullFactory, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, rep.Channel()) + assert.Equal(t, defaultCollectorServiceName, rep.CollectorServiceName()) } func TestBuilderWithCollectors(t *testing.T) { @@ -103,3 +117,9 @@ func TestBuilderWithCollectors(t *testing.T) { assert.NoError(t, err) assert.Equal(t, c, hostPorts) } + +type fakeDiscoverer struct{} + +func (fd fakeDiscoverer) Instances() ([]string, error) { + return nil, errors.New("discoverer error") +} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go new file mode 100644 index 00000000000..fa855b68dd6 --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -0,0 +1,51 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" +) + +// ProxyBuilder holds objects communicating with collector +type ProxyBuilder struct { + reporter *Reporter + manager httpserver.ClientConfigManager +} + +// NewCollectorProxy creates ProxyBuilder +func NewCollectorProxy(builder *Builder, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { + b := &ProxyBuilder{} + r, err := builder.CreateReporter(mFactory, logger) + if err != nil { + return nil, err + } + b.reporter = r + b.manager = httpserver.NewCollectorProxy(b.reporter.CollectorServiceName(), b.reporter.Channel(), mFactory) + return b, nil +} + +// GetReporter returns Reporter +func (b ProxyBuilder) GetReporter() reporter.Reporter { + return b.reporter +} + +// GetManager returns manager +func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager { + return b.manager +} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go new file mode 100644 index 00000000000..1f320548fa6 --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -0,0 +1,46 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" +) + +func TestErrorReporterBuilder(t *testing.T) { + tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) + b, err := NewCollectorProxy(tbuilder, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + assert.Nil(t, b) +} + +func TestCreate(t *testing.T) { + cfg := &Builder{} + mFactory := metrics.NullFactory + logger := zap.NewNop() + b, err := NewCollectorProxy(cfg, mFactory, logger) + require.NoError(t, err) + assert.NotNil(t, b) + r, _ := cfg.CreateReporter(mFactory, logger) + assert.Equal(t, r, b.GetReporter()) + m := httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory) + assert.Equal(t, m, b.GetManager()) +} diff --git a/cmd/agent/app/reporter/tchannel/flags.go b/cmd/agent/app/reporter/tchannel/flags.go new file mode 100644 index 00000000000..67d32e0c60d --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/flags.go @@ -0,0 +1,84 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "flag" + "strings" + "time" + + "github.com/spf13/viper" + "go.uber.org/zap" +) + +const ( + defaultConnCheckTimeout = 250 * time.Millisecond + tchannelPrefix = "reporter.tchannel." + collectorHostPort = "collector.host-port" + discoveryMinPeers = "discovery.min-peers" + discoveryConnCheckTimeout = "discovery.conn-check-timeout" +) + +// AddFlags adds flags for Builder. +func AddFlags(flags *flag.FlagSet) { + flags.String( + tchannelPrefix+collectorHostPort, + "", + "comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") + flags.Int( + tchannelPrefix+discoveryMinPeers, + defaultMinPeers, + "if using service discovery, the min number of connections to maintain to the backend") + flags.Duration( + tchannelPrefix+discoveryConnCheckTimeout, + defaultConnCheckTimeout, + "sets the timeout used when establishing new connections") + // TODO remove deprecated in 1.9 + flags.String( + collectorHostPort, + "", + "Deprecated; comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") + flags.Int( + discoveryMinPeers, + defaultMinPeers, + "Deprecated; if using service discovery, the min number of connections to maintain to the backend") + flags.Duration( + discoveryConnCheckTimeout, + defaultConnCheckTimeout, + "Deprecated; sets the timeout used when establishing new connections") +} + +// InitFromViper initializes Builder with properties retrieved from Viper. +func (b *Builder) InitFromViper(v *viper.Viper, logger *zap.Logger) *Builder { + if len(v.GetString(collectorHostPort)) > 0 { + logger.Warn("Using deprecated configuration", zap.String("option", collectorHostPort)) + b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",") + } + b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers) + if b.DiscoveryMinPeers != defaultMinPeers { + logger.Warn("Using deprecated configuration", zap.String("option", discoveryMinPeers)) + } + b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout) + if b.ConnCheckTimeout != defaultConnCheckTimeout { + logger.Warn("Using deprecated configuration", zap.String("option", discoveryConnCheckTimeout)) + } + + if len(v.GetString(tchannelPrefix+collectorHostPort)) > 0 { + b.CollectorHostPorts = strings.Split(v.GetString(tchannelPrefix+collectorHostPort), ",") + } + b.DiscoveryMinPeers = v.GetInt(tchannelPrefix + discoveryMinPeers) + b.ConnCheckTimeout = v.GetDuration(tchannelPrefix + discoveryConnCheckTimeout) + return b +} diff --git a/cmd/agent/app/reporter/tchannel/flags_test.go b/cmd/agent/app/reporter/tchannel/flags_test.go new file mode 100644 index 00000000000..473b9619321 --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/flags_test.go @@ -0,0 +1,72 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "flag" + "testing" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestBingFlags(t *testing.T) { + v := viper.New() + command := cobra.Command{} + flags := &flag.FlagSet{} + AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + tests := []struct { + flags []string + builder Builder + }{ + {flags: []string{ + "--reporter.tchannel.collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--reporter.tchannel.discovery.min-peers=42", + "--reporter.tchannel.discovery.conn-check-timeout=85s", + }, builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}}, + }, + {flags: []string{ + "--collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--discovery.min-peers=42", + "--discovery.conn-check-timeout=85s", + }, + builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}}, + }, + {flags: []string{ + "--collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--discovery.min-peers=42", + "--discovery.conn-check-timeout=85s", + "--reporter.tchannel.collector.host-port=1.2.3.4:5556,1.2.3.4:6667", + "--reporter.tchannel.discovery.min-peers=43", + "--reporter.tchannel.discovery.conn-check-timeout=86s", + }, + builder: Builder{ConnCheckTimeout: time.Second * 86, DiscoveryMinPeers: 43, CollectorHostPorts: []string{"1.2.3.4:5556", "1.2.3.4:6667"}}, + }, + } + for _, test := range tests { + err := command.ParseFlags(test.flags) + require.NoError(t, err) + b := Builder{} + b.InitFromViper(v, zap.NewNop()) + assert.Equal(t, test.builder, b) + } +} diff --git a/cmd/agent/app/reporter/tchannel/reporter.go b/cmd/agent/app/reporter/tchannel/reporter.go index 4c83fe27727..619893de76b 100644 --- a/cmd/agent/app/reporter/tchannel/reporter.go +++ b/cmd/agent/app/reporter/tchannel/reporter.go @@ -57,6 +57,7 @@ type Reporter struct { peerListMgr *peerlistmgr.PeerListManager batchesMetrics map[string]batchMetrics logger *zap.Logger + serviceName string } // New creates new TChannel-based Reporter. @@ -83,6 +84,7 @@ func New( peerListMgr: peerListMgr, logger: zlogger, batchesMetrics: batchesMetrics, + serviceName: collectorServiceName, } } @@ -136,3 +138,8 @@ func (r *Reporter) submitAndReport(submissionFunc func(ctx thrift.Context) error batchMetrics.SpansSubmitted.Inc(size) return nil } + +// CollectorServiceName returns collector service name. +func (r *Reporter) CollectorServiceName() string { + return r.serviceName +} diff --git a/cmd/agent/main.go b/cmd/agent/main.go index e6481fe47ee..0a0b7ecae23 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -50,11 +51,23 @@ func main() { builder := &app.Builder{} builder.InitFromViper(v) + tchanRep := tchannel.NewBuilder().InitFromViper(v, logger) + mBldr := new(metrics.Builder).InitFromViper(v) + + mFactory, err := mBldr.CreateMetricsFactory("jaeger") + if err != nil { + logger.Fatal("Could not create metrics", zap.Error(err)) + } + mFactory = mFactory.Namespace("agent", nil) + + cp, err := tchannel.NewCollectorProxy(tchanRep, mFactory, logger) + if err != nil { + logger.Fatal("Could not create collector proxy", zap.Error(err)) + } // TODO illustrate discovery service wiring - // TODO illustrate additional reporter - agent, err := builder.CreateAgent(logger) + agent, err := builder.CreateAgent(cp, logger, mFactory) if err != nil { return errors.Wrap(err, "Unable to initialize Jaeger Agent") } @@ -75,6 +88,7 @@ func main() { flags.AddConfigFileFlag, flags.AddLoggingFlag, app.AddFlags, + tchannel.AddFlags, metrics.AddFlags, ) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 155ee7a47e6..5877be903ae 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -38,6 +38,7 @@ import ( "go.uber.org/zap" agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" + agentTchanRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" basic "github.com/jaegertracing/jaeger/cmd/builder" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" collector "github.com/jaegertracing/jaeger/cmd/collector/app/builder" @@ -124,10 +125,11 @@ func main() { samplingHandler := initializeSamplingHandler(strategyStoreFactory, v, metricsFactory, logger) aOpts := new(agentApp.Builder).InitFromViper(v) + tchannelRep := agentTchanRep.NewBuilder().InitFromViper(v, logger) cOpts := new(collector.CollectorOptions).InitFromViper(v) qOpts := new(queryApp.QueryOptions).InitFromViper(v) - startAgent(aOpts, cOpts, logger, metricsFactory) + startAgent(aOpts, tchannelRep, cOpts, logger, metricsFactory) startCollector(cOpts, spanWriter, logger, metricsFactory, samplingHandler, hc) startQuery(qOpts, spanReader, dependencyReader, logger, metricsFactory, mBldr, hc) hc.Ready() @@ -156,6 +158,7 @@ func main() { flags.AddFlags, storageFactory.AddFlags, agentApp.AddFlags, + agentTchanRep.AddFlags, collector.AddFlags, queryApp.AddFlags, pMetrics.AddFlags, @@ -170,16 +173,20 @@ func main() { func startAgent( b *agentApp.Builder, + tchanRep *agentTchanRep.Builder, cOpts *collector.CollectorOptions, logger *zap.Logger, baseFactory metrics.Factory, ) { metricsFactory := baseFactory.Namespace("agent", nil) - if len(b.CollectorHostPorts) == 0 { - b.CollectorHostPorts = append(b.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) + tchanRep.CollectorHostPorts = append(tchanRep.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) + cp, err := agentTchanRep.NewCollectorProxy(tchanRep, metricsFactory, logger) + if err != nil { + logger.Fatal("Could not create collector proxy", zap.Error(err)) } - agent, err := b.WithMetricsFactory(metricsFactory).CreateAgent(logger) + + agent, err := b.CreateAgent(cp, logger, baseFactory) if err != nil { logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err)) } diff --git a/docker-compose/jaeger-docker-compose.yml b/docker-compose/jaeger-docker-compose.yml index 554c922800d..5fb80c27078 100644 --- a/docker-compose/jaeger-docker-compose.yml +++ b/docker-compose/jaeger-docker-compose.yml @@ -25,7 +25,7 @@ services: jaeger-agent: image: jaegertracing/jaeger-agent - command: ["--collector.host-port=jaeger-collector:14267"] + command: ["--reporter.tchannel.collector.host-port=jaeger-collector:14267"] ports: - "5775:5775/udp" - "6831:6831/udp"