From 16c48ef9f0e83a9bfe2f97266764cf2dc555869c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 27 Sep 2018 14:28:12 +0200 Subject: [PATCH 01/11] Make agent reporters configurable Signed-off-by: Pavol Loffay --- cmd/agent/app/agent_test.go | 2 + cmd/agent/app/builder.go | 77 ++++++++++--------- cmd/agent/app/builder_test.go | 46 ++++------- cmd/agent/app/flags.go | 21 ----- cmd/agent/app/flags_test.go | 4 - .../app/reporter/tchannel/builder_test.go | 20 +++++ cmd/agent/app/reporter/tchannel/flags.go | 57 ++++++++++++++ cmd/agent/app/reporter/tchannel/flags_test.go | 48 ++++++++++++ cmd/agent/app/reporter/tchannel/reporter.go | 7 ++ cmd/agent/main.go | 15 +++- cmd/all-in-one/main.go | 17 +++- 11 files changed, 216 insertions(+), 98 deletions(-) create mode 100644 cmd/agent/app/reporter/tchannel/flags.go create mode 100644 cmd/agent/app/reporter/tchannel/flags_test.go diff --git a/cmd/agent/app/agent_test.go b/cmd/agent/app/agent_test.go index a9422e579a0..1afcadcb3c9 100644 --- a/cmd/agent/app/agent_test.go +++ b/cmd/agent/app/agent_test.go @@ -33,6 +33,7 @@ import ( func TestAgentStartError(t *testing.T) { cfg := &Builder{} + configureSamplingManager(t, cfg) agent, err := cfg.CreateAgent(zap.NewNop()) require.NoError(t, err) agent.httpServer.Addr = "bad-address" @@ -100,6 +101,7 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) { }, } logger, logBuf := testutils.NewLogger() + configureSamplingManager(t, &cfg) agent, err := cfg.CreateAgent(logger) require.NoError(t, err) ch := make(chan error, 2) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 284f534f34f..75c51c240a4 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -17,18 +17,15 @@ package app import ( "fmt" "net/http" - "time" "github.com/apache/thrift/lib/go/thrift" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" - "github.com/uber/tchannel-go" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" - tchreporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" jmetrics "github.com/jaegertracing/jaeger/pkg/metrics" @@ -37,11 +34,9 @@ import ( ) const ( - defaultQueueSize = 1000 - defaultMaxPacketSize = 65000 - defaultServerWorkers = 10 - defaultMinPeers = 3 - defaultConnCheckTimeout = 250 * time.Millisecond + defaultQueueSize = 1000 + defaultMaxPacketSize = 65000 + defaultServerWorkers = 10 defaultHTTPServerHostPort = ":5778" @@ -73,10 +68,9 @@ type Builder struct { HTTPServer HTTPServerConfiguration `yaml:"httpServer"` Metrics jmetrics.Builder `yaml:"metrics"` - tchreporter.Builder `yaml:",inline"` - - otherReporters []reporter.Reporter + reporters []reporter.Reporter metricsFactory metrics.Factory + configManager httpserver.ClientConfigManager } // ProcessorConfiguration holds config for a processor that receives spans from Server @@ -99,9 +93,9 @@ type HTTPServerConfiguration struct { HostPort string `yaml:"hostPort" validate:"nonzero"` } -// WithReporter adds auxiliary reporters. -func (b *Builder) WithReporter(r reporter.Reporter) *Builder { - b.otherReporters = append(b.otherReporters, r) +// WithReporters adds auxiliary reporters. +func (b *Builder) WithReporters(r ...reporter.Reporter) *Builder { + b.reporters = append(b.reporters, r...) return b } @@ -111,47 +105,46 @@ func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder { return b } -func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (*tchreporter.Reporter, error) { - return b.CreateReporter(mFactory, logger) -} - -func (b *Builder) getMetricsFactory() (metrics.Factory, error) { +// GetMetricsFactory returns metrics factory used by the agent. +func (b *Builder) GetMetricsFactory() (metrics.Factory, error) { if b.metricsFactory != nil { return b.metricsFactory, nil } baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger") + fmt.Println("creating metrics factory") if err != nil { return nil, err } - return baseFactory.Namespace("agent", nil), nil + fmt.Println("creating metrics factory") + b.metricsFactory = baseFactory.Namespace("agent", nil) + return b.metricsFactory, nil } // CreateAgent creates the Agent func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { - mFactory, err := b.getMetricsFactory() + mFactory, err := b.GetMetricsFactory() if err != nil { return nil, errors.Wrap(err, "cannot create metrics factory") } - mainReporter, err := b.createMainReporter(mFactory, logger) + if err != nil { - return nil, errors.Wrap(err, "cannot create main Reporter") - } - var rep reporter.Reporter = mainReporter - if len(b.otherReporters) > 0 { - reps := append([]reporter.Reporter{mainReporter}, b.otherReporters...) - rep = reporter.NewMultiReporter(reps...) + return nil, err } - processors, err := b.GetProcessors(rep, mFactory, logger) + processors, err := b.GetProcessors(b.getReporter(logger), mFactory, logger) if err != nil { return nil, err } - httpServer := b.HTTPServer.GetHTTPServer(b.CollectorServiceName, mainReporter.Channel(), mFactory) - if h := b.Metrics.Handler(); mFactory != nil && h != nil { - httpServer.Handler.(*http.ServeMux).Handle(b.Metrics.HTTPRoute, h) + server, err := b.HTTPServer.getHTTPServer(b.configManager, mFactory, &b.Metrics) + if err != nil { + return nil, err } - return NewAgent(processors, httpServer, logger), nil + return NewAgent(processors, server, logger), nil +} + +func (b *Builder) getReporter(logger *zap.Logger) reporter.Reporter { + return reporter.NewMultiReporter(b.reporters...) } // GetProcessors creates Processors with attached Reporter @@ -184,13 +177,25 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, return retMe, nil } +// WithClientConfigManager adds configuration manager. +func (b *Builder) WithClientConfigManager(manager httpserver.ClientConfigManager) *Builder { + b.configManager = manager + return b +} + // GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries. -func (c HTTPServerConfiguration) GetHTTPServer(svc string, channel *tchannel.Channel, mFactory metrics.Factory) *http.Server { - mgr := httpserver.NewCollectorProxy(svc, channel, mFactory) +func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) (*http.Server, error) { + if manager == nil { + return nil, errors.New("Http manager is null") + } if c.HostPort == "" { c.HostPort = defaultHTTPServerHostPort } - return httpserver.NewHTTPServer(c.HostPort, mgr, mFactory) + server := httpserver.NewHTTPServer(c.HostPort, manager, mFactory) + if h := mBuilder.Handler(); mFactory != nil && h != nil { + server.Handler.(*http.ServeMux).Handle(mBuilder.HTTPRoute, h) + } + return server, nil } // GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 955e425e9b5..cd640210855 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -15,7 +15,6 @@ package app import ( - "errors" "strings" "testing" @@ -25,6 +24,8 @@ import ( "go.uber.org/zap" "gopkg.in/yaml.v2" + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -51,14 +52,6 @@ processors: httpServer: hostPort: 4.4.4.4:5778 - -collectorHostPorts: - - 127.0.0.1:14267 - - 127.0.0.1:14268 - - 127.0.0.1:14269 - -collectorServiceName: some-collector-service -minPeers: 4 ` func TestBuilderFromConfig(t *testing.T) { @@ -101,18 +94,12 @@ func TestBuilderFromConfig(t *testing.T) { }, }, cfg.Processors[2]) assert.Equal(t, "4.4.4.4:5778", cfg.HTTPServer.HostPort) - - assert.Equal(t, 4, cfg.DiscoveryMinPeers) - assert.Equal(t, "some-collector-service", cfg.CollectorServiceName) - assert.Equal( - t, - []string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"}, - cfg.CollectorHostPorts) } func TestBuilderWithExtraReporter(t *testing.T) { cfg := &Builder{} - cfg.WithReporter(fakeReporter{}) + configureSamplingManager(t, cfg) + cfg.WithReporters(fakeReporter{}) agent, err := cfg.CreateAgent(zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, agent) @@ -121,13 +108,14 @@ func TestBuilderWithExtraReporter(t *testing.T) { func TestBuilderMetrics(t *testing.T) { mf := metrics.NullFactory b := new(Builder).WithMetricsFactory(mf) - mf2, err := b.getMetricsFactory() + mf2, err := b.GetMetricsFactory() assert.NoError(t, err) assert.Equal(t, mf, mf2) } func TestBuilderMetricsHandler(t *testing.T) { b := &Builder{} + configureSamplingManager(t, b) b.Metrics.Backend = "expvar" b.Metrics.HTTPRoute = "/expvar" factory, err := b.Metrics.CreateMetricsFactory("test") @@ -146,14 +134,6 @@ func TestBuilderMetricsError(t *testing.T) { assert.EqualError(t, err, "cannot create metrics factory: unknown metrics backend specified") } -func TestBuilderWithDiscoveryError(t *testing.T) { - cfg := &Builder{} - cfg.WithDiscoverer(fakeDiscoverer{}) - agent, err := cfg.CreateAgent(zap.NewNop()) - assert.EqualError(t, err, "cannot create main Reporter: cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified") - assert.Nil(t, agent) -} - func TestBuilderWithProcessorErrors(t *testing.T) { testCases := []struct { model Model @@ -190,6 +170,14 @@ func TestBuilderWithProcessorErrors(t *testing.T) { } } +func configureSamplingManager(t *testing.T, cfg *Builder) { + m, err := cfg.GetMetricsFactory() + require.NoError(t, err) + r, err := tchannel.NewBuilder().CreateReporter(m, zap.NewNop()) + require.NoError(t, err) + cfg.WithReporters(r).WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), m)) +} + type fakeReporter struct{} func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { @@ -199,9 +187,3 @@ func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { func (fr fakeReporter) EmitBatch(batch *jaeger.Batch) (err error) { return nil } - -type fakeDiscoverer struct{} - -func (fd fakeDiscoverer) Instances() ([]string, error) { - return nil, errors.New("discoverer error") -} diff --git a/cmd/agent/app/flags.go b/cmd/agent/app/flags.go index 6eb747bf2ce..f9b5dbde689 100644 --- a/cmd/agent/app/flags.go +++ b/cmd/agent/app/flags.go @@ -17,7 +17,6 @@ package app import ( "flag" "fmt" - "strings" "github.com/spf13/viper" ) @@ -27,10 +26,7 @@ const ( suffixServerQueueSize = "server-queue-size" suffixServerMaxPacketSize = "server-max-packet-size" suffixServerHostPort = "server-host-port" - collectorHostPort = "collector.host-port" httpServerHostPort = "http-server.host-port" - discoveryMinPeers = "discovery.min-peers" - discoveryConnCheckTimeout = "discovery.conn-check-timeout" ) var defaultProcessors = []struct { @@ -52,22 +48,10 @@ func AddFlags(flags *flag.FlagSet) { flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server") flags.String(prefix+suffixServerHostPort, processor.hostPort, "host:port for the UDP server") } - flags.String( - collectorHostPort, - "", - "comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") flags.String( httpServerHostPort, defaultHTTPServerHostPort, "host:port of the http server (e.g. for /sampling point and /baggage endpoint)") - flags.Int( - discoveryMinPeers, - defaultMinPeers, - "if using service discovery, the min number of connections to maintain to the backend") - flags.Duration( - discoveryConnCheckTimeout, - defaultConnCheckTimeout, - "sets the timeout used when establishing new connections") } // InitFromViper initializes Builder with properties retrieved from Viper. @@ -84,11 +68,6 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder { b.Processors = append(b.Processors, *p) } - if len(v.GetString(collectorHostPort)) > 0 { - b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",") - } b.HTTPServer.HostPort = v.GetString(httpServerHostPort) - b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers) - b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout) return b } diff --git a/cmd/agent/app/flags_test.go b/cmd/agent/app/flags_test.go index 65a6025524c..9a271f0287a 100644 --- a/cmd/agent/app/flags_test.go +++ b/cmd/agent/app/flags_test.go @@ -34,8 +34,6 @@ func TestBingFlags(t *testing.T) { v.BindPFlags(command.PersistentFlags()) err := command.ParseFlags([]string{ - "--collector.host-port=1.2.3.4:555,1.2.3.4:666", - "--discovery.min-peers=42", "--http-server.host-port=:8080", "--processor.jaeger-binary.server-host-port=:1111", "--processor.jaeger-binary.server-max-packet-size=4242", @@ -46,8 +44,6 @@ func TestBingFlags(t *testing.T) { b.InitFromViper(v) assert.Equal(t, 3, len(b.Processors)) - assert.Equal(t, []string{"1.2.3.4:555", "1.2.3.4:666"}, b.CollectorHostPorts) - assert.Equal(t, 42, b.DiscoveryMinPeers) assert.Equal(t, ":8080", b.HTTPServer.HostPort) assert.Equal(t, ":1111", b.Processors[2].Server.HostPort) assert.Equal(t, 4242, b.Processors[2].Server.MaxPacketSize) diff --git a/cmd/agent/app/reporter/tchannel/builder_test.go b/cmd/agent/app/reporter/tchannel/builder_test.go index e92a18e9d2c..6ed8be97d67 100644 --- a/cmd/agent/app/reporter/tchannel/builder_test.go +++ b/cmd/agent/app/reporter/tchannel/builder_test.go @@ -15,6 +15,7 @@ package tchannel import ( + "errors" "testing" "github.com/stretchr/testify/assert" @@ -49,6 +50,11 @@ func TestBuilderFromConfig(t *testing.T) { t, []string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"}, cfg.CollectorHostPorts) + r, err := cfg.CreateReporter(metrics.NullFactory, zap.NewNop()) + require.NoError(t, err) + assert.NotNil(t, r) + assert.Equal(t, "some-collector-service", r.CollectorServiceName()) + } func TestBuilderWithDiscovery(t *testing.T) { @@ -66,6 +72,13 @@ func TestBuilderWithDiscovery(t *testing.T) { assert.NotNil(t, agent) } +func TestBuilderWithDiscoveryError(t *testing.T) { + tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) + rep, err := tbuilder.CreateReporter(metrics.NullFactory, zap.NewNop()) + assert.EqualError(t, err, "cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified") + assert.Nil(t, rep) +} + func TestBuilderWithCollectorServiceName(t *testing.T) { cfg := &Builder{} cfg.WithCollectorServiceName("svc") @@ -88,6 +101,7 @@ func TestBuilderWithChannel(t *testing.T) { rep, err := cfg.CreateReporter(metrics.NullFactory, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, rep.Channel()) + assert.Equal(t, defaultCollectorServiceName, rep.CollectorServiceName()) } func TestBuilderWithCollectors(t *testing.T) { @@ -103,3 +117,9 @@ func TestBuilderWithCollectors(t *testing.T) { assert.NoError(t, err) assert.Equal(t, c, hostPorts) } + +type fakeDiscoverer struct{} + +func (fd fakeDiscoverer) Instances() ([]string, error) { + return nil, errors.New("discoverer error") +} diff --git a/cmd/agent/app/reporter/tchannel/flags.go b/cmd/agent/app/reporter/tchannel/flags.go new file mode 100644 index 00000000000..1849e3184f1 --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/flags.go @@ -0,0 +1,57 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "flag" + "strings" + "time" + + "github.com/spf13/viper" +) + +const ( + defaultConnCheckTimeout = 250 * time.Millisecond + tchannelPrefix = "reporter.tchannel." + collectorHostPort = tchannelPrefix + "collector.host-port" + discoveryMinPeers = tchannelPrefix + "discovery.min-peers" + discoveryConnCheckTimeout = tchannelPrefix + "discovery.conn-check-timeout" +) + +// AddFlags adds flags for Builder. +func AddFlags(flags *flag.FlagSet) { + flags.String( + collectorHostPort, + "", + "comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") + flags.Int( + discoveryMinPeers, + defaultMinPeers, + "if using service discovery, the min number of connections to maintain to the backend") + flags.Duration( + discoveryConnCheckTimeout, + defaultConnCheckTimeout, + "sets the timeout used when establishing new connections") +} + +// InitFromViper initializes Builder with properties retrieved from Viper. +func (b *Builder) InitFromViper(v *viper.Viper) *Builder { + if len(v.GetString(collectorHostPort)) > 0 { + b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",") + } + b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers) + b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout) + return b +} diff --git a/cmd/agent/app/reporter/tchannel/flags_test.go b/cmd/agent/app/reporter/tchannel/flags_test.go new file mode 100644 index 00000000000..a8e2ccac1e8 --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/flags_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "flag" + "testing" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBingFlags(t *testing.T) { + v := viper.New() + b := &Builder{} + command := cobra.Command{} + flags := &flag.FlagSet{} + AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags([]string{ + "--reporter.tchannel.collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--reporter.tchannel.discovery.min-peers=42", + "--reporter.tchannel.discovery.conn-check-timeout=85s", + }) + require.NoError(t, err) + + b.InitFromViper(v) + assert.Equal(t, 42, b.DiscoveryMinPeers) + assert.Equal(t, []string{"1.2.3.4:555", "1.2.3.4:666"}, b.CollectorHostPorts) + assert.Equal(t, time.Second*85, b.ConnCheckTimeout) +} diff --git a/cmd/agent/app/reporter/tchannel/reporter.go b/cmd/agent/app/reporter/tchannel/reporter.go index 4c83fe27727..619893de76b 100644 --- a/cmd/agent/app/reporter/tchannel/reporter.go +++ b/cmd/agent/app/reporter/tchannel/reporter.go @@ -57,6 +57,7 @@ type Reporter struct { peerListMgr *peerlistmgr.PeerListManager batchesMetrics map[string]batchMetrics logger *zap.Logger + serviceName string } // New creates new TChannel-based Reporter. @@ -83,6 +84,7 @@ func New( peerListMgr: peerListMgr, logger: zlogger, batchesMetrics: batchesMetrics, + serviceName: collectorServiceName, } } @@ -136,3 +138,8 @@ func (r *Reporter) submitAndReport(submissionFunc func(ctx thrift.Context) error batchMetrics.SpansSubmitted.Inc(size) return nil } + +// CollectorServiceName returns collector service name. +func (r *Reporter) CollectorServiceName() string { + return r.serviceName +} diff --git a/cmd/agent/main.go b/cmd/agent/main.go index e6481fe47ee..659d2f743c6 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -24,6 +24,8 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app" + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -50,9 +52,19 @@ func main() { builder := &app.Builder{} builder.InitFromViper(v) + tchanRep := tchannel.NewBuilder().InitFromViper(v) + mFactory, err := builder.GetMetricsFactory() + if err != nil { + logger.Fatal("Could not create metrics", zap.Error(err)) + } + r, err := tchanRep.CreateReporter(mFactory, logger) + if err != nil { + logger.Fatal("Could not create tchannel reporter", zap.Error(err)) + } + builder.WithReporters(r) + builder.WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory)) // TODO illustrate discovery service wiring - // TODO illustrate additional reporter agent, err := builder.CreateAgent(logger) if err != nil { @@ -75,6 +87,7 @@ func main() { flags.AddConfigFileFlag, flags.AddLoggingFlag, app.AddFlags, + tchannel.AddFlags, metrics.AddFlags, ) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 155ee7a47e6..3b1f21ad773 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -38,6 +38,8 @@ import ( "go.uber.org/zap" agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + agentTchanRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" basic "github.com/jaegertracing/jaeger/cmd/builder" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" collector "github.com/jaegertracing/jaeger/cmd/collector/app/builder" @@ -124,10 +126,11 @@ func main() { samplingHandler := initializeSamplingHandler(strategyStoreFactory, v, metricsFactory, logger) aOpts := new(agentApp.Builder).InitFromViper(v) + tchannelRep := agentTchanRep.NewBuilder().InitFromViper(v) cOpts := new(collector.CollectorOptions).InitFromViper(v) qOpts := new(queryApp.QueryOptions).InitFromViper(v) - startAgent(aOpts, cOpts, logger, metricsFactory) + startAgent(aOpts, tchannelRep, cOpts, logger, metricsFactory) startCollector(cOpts, spanWriter, logger, metricsFactory, samplingHandler, hc) startQuery(qOpts, spanReader, dependencyReader, logger, metricsFactory, mBldr, hc) hc.Ready() @@ -156,6 +159,7 @@ func main() { flags.AddFlags, storageFactory.AddFlags, agentApp.AddFlags, + agentTchanRep.AddFlags, collector.AddFlags, queryApp.AddFlags, pMetrics.AddFlags, @@ -170,15 +174,20 @@ func main() { func startAgent( b *agentApp.Builder, + tchanRep *agentTchanRep.Builder, cOpts *collector.CollectorOptions, logger *zap.Logger, baseFactory metrics.Factory, ) { metricsFactory := baseFactory.Namespace("agent", nil) - - if len(b.CollectorHostPorts) == 0 { - b.CollectorHostPorts = append(b.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) + tchanRep.CollectorHostPorts = append(tchanRep.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) + r, err := tchanRep.CreateReporter(metricsFactory, logger) + if err != nil { + log.Fatal("Could not create tchannel reporter", zap.Error(err)) } + b.WithReporters(r) + b.WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), metricsFactory)) + agent, err := b.WithMetricsFactory(metricsFactory).CreateAgent(logger) if err != nil { logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err)) From 3e3c47ab8222dabb6bc0f5deeb4b122861d71b52 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 2 Oct 2018 16:44:56 +0200 Subject: [PATCH 02/11] Add nil check Signed-off-by: Pavol Loffay --- cmd/agent/app/builder.go | 17 +++++++++++------ cmd/agent/app/builder_test.go | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 75c51c240a4..5591b237a7a 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -112,12 +112,10 @@ func (b *Builder) GetMetricsFactory() (metrics.Factory, error) { } baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger") - fmt.Println("creating metrics factory") if err != nil { return nil, err } - fmt.Println("creating metrics factory") b.metricsFactory = baseFactory.Namespace("agent", nil) return b.metricsFactory, nil } @@ -132,7 +130,11 @@ func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { if err != nil { return nil, err } - processors, err := b.GetProcessors(b.getReporter(logger), mFactory, logger) + r, err := b.getReporter(logger) + if err != nil { + return nil, err + } + processors, err := b.GetProcessors(r, mFactory, logger) if err != nil { return nil, err } @@ -143,8 +145,11 @@ func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { return NewAgent(processors, server, logger), nil } -func (b *Builder) getReporter(logger *zap.Logger) reporter.Reporter { - return reporter.NewMultiReporter(b.reporters...) +func (b *Builder) getReporter(logger *zap.Logger) (reporter.Reporter, error) { + if len(b.reporters) == 0 { + return nil, errors.New("Missing required reporters") + } + return reporter.NewMultiReporter(b.reporters...), nil } // GetProcessors creates Processors with attached Reporter @@ -186,7 +191,7 @@ func (b *Builder) WithClientConfigManager(manager httpserver.ClientConfigManager // GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries. func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) (*http.Server, error) { if manager == nil { - return nil, errors.New("Http manager is null") + return nil, errors.New("Missing required Client config manager") } if c.HostPort == "" { c.HostPort = defaultHTTPServerHostPort diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index cd640210855..e5298b42574 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -30,6 +30,23 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +func TestDefault(t *testing.T) { + tests := []struct { + b *Builder + errMsg string + }{ + {b: &Builder{}, errMsg: "Missing required reporters"}, + {b: (&Builder{}).WithReporters(&fakeReporter{}), errMsg: "Missing required Client config manager"}, + } + + for _, test := range tests { + a, err := test.b.CreateAgent(zap.NewNop()) + require.Error(t, err) + assert.Equal(t, test.errMsg, err.Error()) + assert.Nil(t, a) + } +} + var yamlConfig = ` ignored: abcd @@ -160,6 +177,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) { }, }, } + cfg.WithReporters(&fakeReporter{}) _, err := cfg.CreateAgent(zap.NewNop()) assert.Error(t, err) if testCase.err != "" { From 4430db518b7df980f5975df84e16498cf9351773 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 2 Oct 2018 17:33:23 +0200 Subject: [PATCH 03/11] Fix docker compose Signed-off-by: Pavol Loffay --- docker-compose/jaeger-docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose/jaeger-docker-compose.yml b/docker-compose/jaeger-docker-compose.yml index 554c922800d..5fb80c27078 100644 --- a/docker-compose/jaeger-docker-compose.yml +++ b/docker-compose/jaeger-docker-compose.yml @@ -25,7 +25,7 @@ services: jaeger-agent: image: jaegertracing/jaeger-agent - command: ["--collector.host-port=jaeger-collector:14267"] + command: ["--reporter.tchannel.collector.host-port=jaeger-collector:14267"] ports: - "5775:5775/udp" - "6831:6831/udp" From ea03533d592b26dc3025ecf6f774e165e43faf61 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 3 Oct 2018 09:18:48 +0200 Subject: [PATCH 04/11] Remove redundant if Signed-off-by: Pavol Loffay --- cmd/agent/app/builder.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 5591b237a7a..68b9fda926a 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -126,10 +126,6 @@ func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { if err != nil { return nil, errors.Wrap(err, "cannot create metrics factory") } - - if err != nil { - return nil, err - } r, err := b.getReporter(logger) if err != nil { return nil, err From 5e31a0b4fc02d66c806fb154752a3df5758427e5 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 4 Oct 2018 17:45:55 +0200 Subject: [PATCH 05/11] Deprecate flags Signed-off-by: Pavol Loffay --- cmd/agent/app/builder.go | 2 + cmd/agent/app/reporter/tchannel/flags.go | 41 +++++++++++++--- cmd/agent/app/reporter/tchannel/flags_test.go | 48 ++++++++++++++----- cmd/agent/main.go | 2 +- cmd/all-in-one/main.go | 2 +- 5 files changed, 74 insertions(+), 21 deletions(-) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 68b9fda926a..6b990732867 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -144,6 +144,8 @@ func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { func (b *Builder) getReporter(logger *zap.Logger) (reporter.Reporter, error) { if len(b.reporters) == 0 { return nil, errors.New("Missing required reporters") + } else if len(b.reporters) == 1 { + return b.reporters[0], nil } return reporter.NewMultiReporter(b.reporters...), nil } diff --git a/cmd/agent/app/reporter/tchannel/flags.go b/cmd/agent/app/reporter/tchannel/flags.go index 1849e3184f1..67d32e0c60d 100644 --- a/cmd/agent/app/reporter/tchannel/flags.go +++ b/cmd/agent/app/reporter/tchannel/flags.go @@ -20,38 +20,65 @@ import ( "time" "github.com/spf13/viper" + "go.uber.org/zap" ) const ( defaultConnCheckTimeout = 250 * time.Millisecond tchannelPrefix = "reporter.tchannel." - collectorHostPort = tchannelPrefix + "collector.host-port" - discoveryMinPeers = tchannelPrefix + "discovery.min-peers" - discoveryConnCheckTimeout = tchannelPrefix + "discovery.conn-check-timeout" + collectorHostPort = "collector.host-port" + discoveryMinPeers = "discovery.min-peers" + discoveryConnCheckTimeout = "discovery.conn-check-timeout" ) // AddFlags adds flags for Builder. func AddFlags(flags *flag.FlagSet) { flags.String( - collectorHostPort, + tchannelPrefix+collectorHostPort, "", "comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") flags.Int( - discoveryMinPeers, + tchannelPrefix+discoveryMinPeers, defaultMinPeers, "if using service discovery, the min number of connections to maintain to the backend") flags.Duration( - discoveryConnCheckTimeout, + tchannelPrefix+discoveryConnCheckTimeout, defaultConnCheckTimeout, "sets the timeout used when establishing new connections") + // TODO remove deprecated in 1.9 + flags.String( + collectorHostPort, + "", + "Deprecated; comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)") + flags.Int( + discoveryMinPeers, + defaultMinPeers, + "Deprecated; if using service discovery, the min number of connections to maintain to the backend") + flags.Duration( + discoveryConnCheckTimeout, + defaultConnCheckTimeout, + "Deprecated; sets the timeout used when establishing new connections") } // InitFromViper initializes Builder with properties retrieved from Viper. -func (b *Builder) InitFromViper(v *viper.Viper) *Builder { +func (b *Builder) InitFromViper(v *viper.Viper, logger *zap.Logger) *Builder { if len(v.GetString(collectorHostPort)) > 0 { + logger.Warn("Using deprecated configuration", zap.String("option", collectorHostPort)) b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",") } b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers) + if b.DiscoveryMinPeers != defaultMinPeers { + logger.Warn("Using deprecated configuration", zap.String("option", discoveryMinPeers)) + } b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout) + if b.ConnCheckTimeout != defaultConnCheckTimeout { + logger.Warn("Using deprecated configuration", zap.String("option", discoveryConnCheckTimeout)) + } + + if len(v.GetString(tchannelPrefix+collectorHostPort)) > 0 { + b.CollectorHostPorts = strings.Split(v.GetString(tchannelPrefix+collectorHostPort), ",") + } + b.DiscoveryMinPeers = v.GetInt(tchannelPrefix + discoveryMinPeers) + b.ConnCheckTimeout = v.GetDuration(tchannelPrefix + discoveryConnCheckTimeout) return b } diff --git a/cmd/agent/app/reporter/tchannel/flags_test.go b/cmd/agent/app/reporter/tchannel/flags_test.go index a8e2ccac1e8..473b9619321 100644 --- a/cmd/agent/app/reporter/tchannel/flags_test.go +++ b/cmd/agent/app/reporter/tchannel/flags_test.go @@ -23,26 +23,50 @@ import ( "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestBingFlags(t *testing.T) { v := viper.New() - b := &Builder{} command := cobra.Command{} flags := &flag.FlagSet{} AddFlags(flags) command.PersistentFlags().AddGoFlagSet(flags) v.BindPFlags(command.PersistentFlags()) - err := command.ParseFlags([]string{ - "--reporter.tchannel.collector.host-port=1.2.3.4:555,1.2.3.4:666", - "--reporter.tchannel.discovery.min-peers=42", - "--reporter.tchannel.discovery.conn-check-timeout=85s", - }) - require.NoError(t, err) - - b.InitFromViper(v) - assert.Equal(t, 42, b.DiscoveryMinPeers) - assert.Equal(t, []string{"1.2.3.4:555", "1.2.3.4:666"}, b.CollectorHostPorts) - assert.Equal(t, time.Second*85, b.ConnCheckTimeout) + tests := []struct { + flags []string + builder Builder + }{ + {flags: []string{ + "--reporter.tchannel.collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--reporter.tchannel.discovery.min-peers=42", + "--reporter.tchannel.discovery.conn-check-timeout=85s", + }, builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}}, + }, + {flags: []string{ + "--collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--discovery.min-peers=42", + "--discovery.conn-check-timeout=85s", + }, + builder: Builder{ConnCheckTimeout: time.Second * 85, DiscoveryMinPeers: 42, CollectorHostPorts: []string{"1.2.3.4:555", "1.2.3.4:666"}}, + }, + {flags: []string{ + "--collector.host-port=1.2.3.4:555,1.2.3.4:666", + "--discovery.min-peers=42", + "--discovery.conn-check-timeout=85s", + "--reporter.tchannel.collector.host-port=1.2.3.4:5556,1.2.3.4:6667", + "--reporter.tchannel.discovery.min-peers=43", + "--reporter.tchannel.discovery.conn-check-timeout=86s", + }, + builder: Builder{ConnCheckTimeout: time.Second * 86, DiscoveryMinPeers: 43, CollectorHostPorts: []string{"1.2.3.4:5556", "1.2.3.4:6667"}}, + }, + } + for _, test := range tests { + err := command.ParseFlags(test.flags) + require.NoError(t, err) + b := Builder{} + b.InitFromViper(v, zap.NewNop()) + assert.Equal(t, test.builder, b) + } } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 659d2f743c6..52a38d67541 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -52,7 +52,7 @@ func main() { builder := &app.Builder{} builder.InitFromViper(v) - tchanRep := tchannel.NewBuilder().InitFromViper(v) + tchanRep := tchannel.NewBuilder().InitFromViper(v, logger) mFactory, err := builder.GetMetricsFactory() if err != nil { logger.Fatal("Could not create metrics", zap.Error(err)) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 3b1f21ad773..dd2e193cbb9 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -126,7 +126,7 @@ func main() { samplingHandler := initializeSamplingHandler(strategyStoreFactory, v, metricsFactory, logger) aOpts := new(agentApp.Builder).InitFromViper(v) - tchannelRep := agentTchanRep.NewBuilder().InitFromViper(v) + tchannelRep := agentTchanRep.NewBuilder().InitFromViper(v, logger) cOpts := new(collector.CollectorOptions).InitFromViper(v) qOpts := new(queryApp.QueryOptions).InitFromViper(v) From d166921d5e0625d57a001e18da12d402117aa29c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 5 Oct 2018 10:17:37 +0200 Subject: [PATCH 06/11] Pass metrics explicitly Signed-off-by: Pavol Loffay --- cmd/agent/app/agent_test.go | 10 +++++---- cmd/agent/app/builder.go | 32 +++------------------------- cmd/agent/app/builder_test.go | 39 ++++++++--------------------------- cmd/agent/main.go | 8 +++++-- cmd/all-in-one/main.go | 2 +- 5 files changed, 25 insertions(+), 66 deletions(-) diff --git a/cmd/agent/app/agent_test.go b/cmd/agent/app/agent_test.go index 1afcadcb3c9..857b883a9a4 100644 --- a/cmd/agent/app/agent_test.go +++ b/cmd/agent/app/agent_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" jmetrics "github.com/jaegertracing/jaeger/pkg/metrics" @@ -33,8 +34,8 @@ import ( func TestAgentStartError(t *testing.T) { cfg := &Builder{} - configureSamplingManager(t, cfg) - agent, err := cfg.CreateAgent(zap.NewNop()) + configureSamplingManager(t, cfg, metrics.NullFactory) + agent, err := cfg.CreateAgent(zap.NewNop(), metrics.NullFactory) require.NoError(t, err) agent.httpServer.Addr = "bad-address" assert.Error(t, agent.Run()) @@ -101,8 +102,9 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) { }, } logger, logBuf := testutils.NewLogger() - configureSamplingManager(t, &cfg) - agent, err := cfg.CreateAgent(logger) + f, _ := cfg.Metrics.CreateMetricsFactory("jaeger") + configureSamplingManager(t, &cfg, f) + agent, err := cfg.CreateAgent(logger, f) require.NoError(t, err) ch := make(chan error, 2) go func() { diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 6b990732867..7d946e8efab 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -68,9 +68,8 @@ type Builder struct { HTTPServer HTTPServerConfiguration `yaml:"httpServer"` Metrics jmetrics.Builder `yaml:"metrics"` - reporters []reporter.Reporter - metricsFactory metrics.Factory - configManager httpserver.ClientConfigManager + reporters []reporter.Reporter + configManager httpserver.ClientConfigManager } // ProcessorConfiguration holds config for a processor that receives spans from Server @@ -99,33 +98,8 @@ func (b *Builder) WithReporters(r ...reporter.Reporter) *Builder { return b } -// WithMetricsFactory sets an externally initialized metrics factory. -func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder { - b.metricsFactory = mf - return b -} - -// GetMetricsFactory returns metrics factory used by the agent. -func (b *Builder) GetMetricsFactory() (metrics.Factory, error) { - if b.metricsFactory != nil { - return b.metricsFactory, nil - } - - baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger") - if err != nil { - return nil, err - } - - b.metricsFactory = baseFactory.Namespace("agent", nil) - return b.metricsFactory, nil -} - // CreateAgent creates the Agent -func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) { - mFactory, err := b.GetMetricsFactory() - if err != nil { - return nil, errors.Wrap(err, "cannot create metrics factory") - } +func (b *Builder) CreateAgent(logger *zap.Logger, mFactory metrics.Factory) (*Agent, error) { r, err := b.getReporter(logger) if err != nil { return nil, err diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index e5298b42574..350cba21fbf 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -40,7 +40,7 @@ func TestDefault(t *testing.T) { } for _, test := range tests { - a, err := test.b.CreateAgent(zap.NewNop()) + a, err := test.b.CreateAgent(zap.NewNop(), metrics.NullFactory) require.Error(t, err) assert.Equal(t, test.errMsg, err.Error()) assert.Nil(t, a) @@ -115,42 +115,23 @@ func TestBuilderFromConfig(t *testing.T) { func TestBuilderWithExtraReporter(t *testing.T) { cfg := &Builder{} - configureSamplingManager(t, cfg) + configureSamplingManager(t, cfg, metrics.NullFactory) cfg.WithReporters(fakeReporter{}) - agent, err := cfg.CreateAgent(zap.NewNop()) + agent, err := cfg.CreateAgent(zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) } -func TestBuilderMetrics(t *testing.T) { - mf := metrics.NullFactory - b := new(Builder).WithMetricsFactory(mf) - mf2, err := b.GetMetricsFactory() - assert.NoError(t, err) - assert.Equal(t, mf, mf2) -} - func TestBuilderMetricsHandler(t *testing.T) { b := &Builder{} - configureSamplingManager(t, b) + configureSamplingManager(t, b, metrics.NullFactory) b.Metrics.Backend = "expvar" b.Metrics.HTTPRoute = "/expvar" - factory, err := b.Metrics.CreateMetricsFactory("test") - assert.NoError(t, err) - assert.NotNil(t, factory) - b.metricsFactory = factory - agent, err := b.CreateAgent(zap.NewNop()) + agent, err := b.CreateAgent(zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) } -func TestBuilderMetricsError(t *testing.T) { - b := &Builder{} - b.Metrics.Backend = "invalid" - _, err := b.CreateAgent(zap.NewNop()) - assert.EqualError(t, err, "cannot create metrics factory: unknown metrics backend specified") -} - func TestBuilderWithProcessorErrors(t *testing.T) { testCases := []struct { model Model @@ -178,7 +159,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) { }, } cfg.WithReporters(&fakeReporter{}) - _, err := cfg.CreateAgent(zap.NewNop()) + _, err := cfg.CreateAgent(zap.NewNop(), metrics.NullFactory) assert.Error(t, err) if testCase.err != "" { assert.EqualError(t, err, testCase.err) @@ -188,12 +169,10 @@ func TestBuilderWithProcessorErrors(t *testing.T) { } } -func configureSamplingManager(t *testing.T, cfg *Builder) { - m, err := cfg.GetMetricsFactory() - require.NoError(t, err) - r, err := tchannel.NewBuilder().CreateReporter(m, zap.NewNop()) +func configureSamplingManager(t *testing.T, cfg *Builder, mFactory metrics.Factory) { + r, err := tchannel.NewBuilder().CreateReporter(mFactory, zap.NewNop()) require.NoError(t, err) - cfg.WithReporters(r).WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), m)) + cfg.WithReporters(r).WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory)) } type fakeReporter struct{} diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 52a38d67541..540077fdca1 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -53,10 +53,14 @@ func main() { builder := &app.Builder{} builder.InitFromViper(v) tchanRep := tchannel.NewBuilder().InitFromViper(v, logger) - mFactory, err := builder.GetMetricsFactory() + mBldr := new(metrics.Builder).InitFromViper(v) + + mFactory, err := mBldr.CreateMetricsFactory("jaeger") if err != nil { logger.Fatal("Could not create metrics", zap.Error(err)) } + mFactory = mFactory.Namespace("agent", nil) + r, err := tchanRep.CreateReporter(mFactory, logger) if err != nil { logger.Fatal("Could not create tchannel reporter", zap.Error(err)) @@ -66,7 +70,7 @@ func main() { // TODO illustrate discovery service wiring - agent, err := builder.CreateAgent(logger) + agent, err := builder.CreateAgent(logger, mFactory) if err != nil { return errors.Wrap(err, "Unable to initialize Jaeger Agent") } diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index dd2e193cbb9..e5444caebae 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -188,7 +188,7 @@ func startAgent( b.WithReporters(r) b.WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), metricsFactory)) - agent, err := b.WithMetricsFactory(metricsFactory).CreateAgent(logger) + agent, err := b.CreateAgent(logger, baseFactory) if err != nil { logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err)) } From 9fe3da0fcfa11bbcc5ca54fe63bf6cd9776de44c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 5 Oct 2018 12:26:37 +0200 Subject: [PATCH 07/11] Collector proxy Signed-off-by: Pavol Loffay --- .../app/reporter/tchannel/collector_proxy.go | 50 +++++++++++++++++++ .../reporter/tchannel/collector_proxy_test.go | 46 +++++++++++++++++ cmd/agent/main.go | 9 ++-- cmd/all-in-one/main.go | 11 ++-- 4 files changed, 105 insertions(+), 11 deletions(-) create mode 100644 cmd/agent/app/reporter/tchannel/collector_proxy.go create mode 100644 cmd/agent/app/reporter/tchannel/collector_proxy_test.go diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go new file mode 100644 index 00000000000..e8fa01d859b --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -0,0 +1,50 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" +) + +// ProxyBuilder holds objects communicating with collector +type ProxyBuilder struct { + reporter *Reporter + manager httpserver.ClientConfigManager +} + +// NewCollectorProxy creates ProxyBuilder +func NewCollectorProxy(builder *Builder, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) { + b := &ProxyBuilder{} + r, err := builder.CreateReporter(mFactory, logger) + if err != nil { + return nil, err + } + b.reporter = r + b.manager = httpserver.NewCollectorProxy(b.reporter.CollectorServiceName(), b.reporter.Channel(), mFactory) + return b, nil +} + +// GetReporter returns Reporter +func (b *ProxyBuilder) GetReporter() *Reporter { + return b.reporter +} + +// GetManager returns manager +func (b *ProxyBuilder) GetManager() httpserver.ClientConfigManager { + return b.manager +} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go new file mode 100644 index 00000000000..1f320548fa6 --- /dev/null +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -0,0 +1,46 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tchannel + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" +) + +func TestErrorReporterBuilder(t *testing.T) { + tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) + b, err := NewCollectorProxy(tbuilder, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + assert.Nil(t, b) +} + +func TestCreate(t *testing.T) { + cfg := &Builder{} + mFactory := metrics.NullFactory + logger := zap.NewNop() + b, err := NewCollectorProxy(cfg, mFactory, logger) + require.NoError(t, err) + assert.NotNil(t, b) + r, _ := cfg.CreateReporter(mFactory, logger) + assert.Equal(t, r, b.GetReporter()) + m := httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory) + assert.Equal(t, m, b.GetManager()) +} diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 540077fdca1..7e5ab1b72c0 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -24,7 +24,6 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app" - "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" @@ -61,12 +60,12 @@ func main() { } mFactory = mFactory.Namespace("agent", nil) - r, err := tchanRep.CreateReporter(mFactory, logger) + cp, err := tchannel.NewCollectorProxy(tchanRep, mFactory, logger) if err != nil { - logger.Fatal("Could not create tchannel reporter", zap.Error(err)) + logger.Fatal("Could not create collector proxy", zap.Error(err)) } - builder.WithReporters(r) - builder.WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory)) + builder.WithReporters(cp.GetReporter()) + builder.WithClientConfigManager(cp.GetManager()) // TODO illustrate discovery service wiring diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index e5444caebae..d63faba49b2 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -38,7 +38,6 @@ import ( "go.uber.org/zap" agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" - "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" agentTchanRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" basic "github.com/jaegertracing/jaeger/cmd/builder" collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" @@ -180,13 +179,13 @@ func startAgent( baseFactory metrics.Factory, ) { metricsFactory := baseFactory.Namespace("agent", nil) - tchanRep.CollectorHostPorts = append(tchanRep.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) - r, err := tchanRep.CreateReporter(metricsFactory, logger) + + cp, err := agentTchanRep.NewCollectorProxy(tchanRep, metricsFactory, logger) if err != nil { - log.Fatal("Could not create tchannel reporter", zap.Error(err)) + logger.Fatal("Could not create collector proxy", zap.Error(err)) } - b.WithReporters(r) - b.WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), metricsFactory)) + b.WithReporters(cp.GetReporter()) + b.WithClientConfigManager(cp.GetManager()) agent, err := b.CreateAgent(logger, baseFactory) if err != nil { From 24b18df25cff5c8d2e372d52889c4fdef89bbd16 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 5 Oct 2018 15:21:59 +0200 Subject: [PATCH 08/11] interface collector proxy Signed-off-by: Pavol Loffay --- cmd/agent/app/agent_test.go | 6 +- cmd/agent/app/builder.go | 60 +++++++--------- cmd/agent/app/builder_test.go | 72 ++++++++++--------- .../app/reporter/tchannel/collector_proxy.go | 5 +- cmd/agent/main.go | 4 +- cmd/all-in-one/main.go | 4 +- 6 files changed, 72 insertions(+), 79 deletions(-) diff --git a/cmd/agent/app/agent_test.go b/cmd/agent/app/agent_test.go index 857b883a9a4..0d9612f1d20 100644 --- a/cmd/agent/app/agent_test.go +++ b/cmd/agent/app/agent_test.go @@ -34,8 +34,7 @@ import ( func TestAgentStartError(t *testing.T) { cfg := &Builder{} - configureSamplingManager(t, cfg, metrics.NullFactory) - agent, err := cfg.CreateAgent(zap.NewNop(), metrics.NullFactory) + agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) require.NoError(t, err) agent.httpServer.Addr = "bad-address" assert.Error(t, agent.Run()) @@ -103,8 +102,7 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) { } logger, logBuf := testutils.NewLogger() f, _ := cfg.Metrics.CreateMetricsFactory("jaeger") - configureSamplingManager(t, &cfg, f) - agent, err := cfg.CreateAgent(logger, f) + agent, err := cfg.CreateAgent(fakeCollectorProxy{}, logger, f) require.NoError(t, err) ch := make(chan error, 2) go func() { diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 7d946e8efab..0cb12802755 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -62,14 +62,19 @@ var ( } ) +// CollectorProxy provides access to Reporter and ClientConfigManager +type CollectorProxy interface { + GetReporter() reporter.Reporter + GetManager() httpserver.ClientConfigManager +} + // Builder Struct to hold configurations type Builder struct { Processors []ProcessorConfiguration `yaml:"processors"` HTTPServer HTTPServerConfiguration `yaml:"httpServer"` Metrics jmetrics.Builder `yaml:"metrics"` - reporters []reporter.Reporter - configManager httpserver.ClientConfigManager + collectorProxy []CollectorProxy } // ProcessorConfiguration holds config for a processor that receives spans from Server @@ -92,40 +97,36 @@ type HTTPServerConfiguration struct { HostPort string `yaml:"hostPort" validate:"nonzero"` } -// WithReporters adds auxiliary reporters. -func (b *Builder) WithReporters(r ...reporter.Reporter) *Builder { - b.reporters = append(b.reporters, r...) +// WithCollectorProxy adds auxiliary reporters. +func (b *Builder) WithCollectorProxy(r ...CollectorProxy) *Builder { + b.collectorProxy = append(b.collectorProxy, r...) return b } // CreateAgent creates the Agent -func (b *Builder) CreateAgent(logger *zap.Logger, mFactory metrics.Factory) (*Agent, error) { - r, err := b.getReporter(logger) - if err != nil { - return nil, err - } - processors, err := b.GetProcessors(r, mFactory, logger) - if err != nil { - return nil, err - } - server, err := b.HTTPServer.getHTTPServer(b.configManager, mFactory, &b.Metrics) +func (b *Builder) CreateAgent(primaryProxy CollectorProxy, logger *zap.Logger, mFactory metrics.Factory) (*Agent, error) { + r := b.getReporter(primaryProxy) + processors, err := b.getProcessors(r, mFactory, logger) if err != nil { return nil, err } + server := b.HTTPServer.getHTTPServer(primaryProxy.GetManager(), mFactory, &b.Metrics) return NewAgent(processors, server, logger), nil } -func (b *Builder) getReporter(logger *zap.Logger) (reporter.Reporter, error) { - if len(b.reporters) == 0 { - return nil, errors.New("Missing required reporters") - } else if len(b.reporters) == 1 { - return b.reporters[0], nil +func (b *Builder) getReporter(primaryProxy CollectorProxy) reporter.Reporter { + if len(b.collectorProxy) == 0 { + return primaryProxy.GetReporter() + } + rep := make([]reporter.Reporter, len(b.collectorProxy)+1) + rep[0] = primaryProxy.GetReporter() + for i, p := range b.collectorProxy { + rep[i+1] = p.GetReporter() } - return reporter.NewMultiReporter(b.reporters...), nil + return reporter.NewMultiReporter(rep...) } -// GetProcessors creates Processors with attached Reporter -func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) { +func (b *Builder) getProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) { retMe := make([]processors.Processor, len(b.Processors)) for idx, cfg := range b.Processors { protoFactory, ok := protocolFactoryMap[cfg.Protocol] @@ -154,17 +155,8 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, return retMe, nil } -// WithClientConfigManager adds configuration manager. -func (b *Builder) WithClientConfigManager(manager httpserver.ClientConfigManager) *Builder { - b.configManager = manager - return b -} - // GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries. -func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) (*http.Server, error) { - if manager == nil { - return nil, errors.New("Missing required Client config manager") - } +func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) *http.Server { if c.HostPort == "" { c.HostPort = defaultHTTPServerHostPort } @@ -172,7 +164,7 @@ func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigMa if h := mBuilder.Handler(); mFactory != nil && h != nil { server.Handler.(*http.ServeMux).Handle(mBuilder.HTTPRoute, h) } - return server, nil + return server } // GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 350cba21fbf..9d4237ee135 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -15,6 +15,8 @@ package app import ( + "errors" + "fmt" "strings" "testing" @@ -25,28 +27,13 @@ import ( "gopkg.in/yaml.v2" "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" - "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/thrift-gen/baggage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/sampling" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -func TestDefault(t *testing.T) { - tests := []struct { - b *Builder - errMsg string - }{ - {b: &Builder{}, errMsg: "Missing required reporters"}, - {b: (&Builder{}).WithReporters(&fakeReporter{}), errMsg: "Missing required Client config manager"}, - } - - for _, test := range tests { - a, err := test.b.CreateAgent(zap.NewNop(), metrics.NullFactory) - require.Error(t, err) - assert.Equal(t, test.errMsg, err.Error()) - assert.Nil(t, a) - } -} - var yamlConfig = ` ignored: abcd @@ -115,19 +102,18 @@ func TestBuilderFromConfig(t *testing.T) { func TestBuilderWithExtraReporter(t *testing.T) { cfg := &Builder{} - configureSamplingManager(t, cfg, metrics.NullFactory) - cfg.WithReporters(fakeReporter{}) - agent, err := cfg.CreateAgent(zap.NewNop(), metrics.NullFactory) + //configureSamplingManager(t, cfg, metrics.NullFactory) + agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) } func TestBuilderMetricsHandler(t *testing.T) { b := &Builder{} - configureSamplingManager(t, b, metrics.NullFactory) + //configureSamplingManager(t, b, metrics.NullFactory) b.Metrics.Backend = "expvar" b.Metrics.HTTPRoute = "/expvar" - agent, err := b.CreateAgent(zap.NewNop(), metrics.NullFactory) + agent, err := b.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) } @@ -158,8 +144,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) { }, }, } - cfg.WithReporters(&fakeReporter{}) - _, err := cfg.CreateAgent(zap.NewNop(), metrics.NullFactory) + _, err := cfg.CreateAgent(&fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.Error(t, err) if testCase.err != "" { assert.EqualError(t, err, testCase.err) @@ -169,18 +154,39 @@ func TestBuilderWithProcessorErrors(t *testing.T) { } } -func configureSamplingManager(t *testing.T, cfg *Builder, mFactory metrics.Factory) { - r, err := tchannel.NewBuilder().CreateReporter(mFactory, zap.NewNop()) - require.NoError(t, err) - cfg.WithReporters(r).WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory)) +func TestMultipleCollectorProxies(t *testing.T) { + b := Builder{} + ra := fakeCollectorProxy{} + rb := fakeCollectorProxy{} + b.WithCollectorProxy(ra) + r := b.getReporter(rb) + mr, ok := r.(reporter.MultiReporter) + require.True(t, ok) + fmt.Println(mr) + assert.Equal(t, rb, mr[0]) + assert.Equal(t, ra, mr[1]) } -type fakeReporter struct{} +type fakeCollectorProxy struct { +} -func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { - return nil +func (f fakeCollectorProxy) GetReporter() reporter.Reporter { + return fakeCollectorProxy{} +} +func (f fakeCollectorProxy) GetManager() httpserver.ClientConfigManager { + return fakeCollectorProxy{} } -func (fr fakeReporter) EmitBatch(batch *jaeger.Batch) (err error) { +func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) { + return nil +} +func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) { return nil } + +func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { + return nil, errors.New("no peers available") +} +func (fakeCollectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) { + return nil, nil +} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index e8fa01d859b..fa855b68dd6 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" ) // ProxyBuilder holds objects communicating with collector @@ -40,11 +41,11 @@ func NewCollectorProxy(builder *Builder, mFactory metrics.Factory, logger *zap.L } // GetReporter returns Reporter -func (b *ProxyBuilder) GetReporter() *Reporter { +func (b ProxyBuilder) GetReporter() reporter.Reporter { return b.reporter } // GetManager returns manager -func (b *ProxyBuilder) GetManager() httpserver.ClientConfigManager { +func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager { return b.manager } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 7e5ab1b72c0..0a0b7ecae23 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -64,12 +64,10 @@ func main() { if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) } - builder.WithReporters(cp.GetReporter()) - builder.WithClientConfigManager(cp.GetManager()) // TODO illustrate discovery service wiring - agent, err := builder.CreateAgent(logger, mFactory) + agent, err := builder.CreateAgent(cp, logger, mFactory) if err != nil { return errors.Wrap(err, "Unable to initialize Jaeger Agent") } diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index d63faba49b2..a6933568aa3 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -184,10 +184,8 @@ func startAgent( if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) } - b.WithReporters(cp.GetReporter()) - b.WithClientConfigManager(cp.GetManager()) - agent, err := b.CreateAgent(logger, baseFactory) + agent, err := b.CreateAgent(cp, logger, baseFactory) if err != nil { logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err)) } From f8876dc7a286eb3daaaa834570a9913c2f51856f Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 5 Oct 2018 15:37:00 +0200 Subject: [PATCH 09/11] Fix all-in-one Signed-off-by: Pavol Loffay --- cmd/all-in-one/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index a6933568aa3..5877be903ae 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -180,6 +180,7 @@ func startAgent( ) { metricsFactory := baseFactory.Namespace("agent", nil) + tchanRep.CollectorHostPorts = append(tchanRep.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort)) cp, err := agentTchanRep.NewCollectorProxy(tchanRep, metricsFactory, logger) if err != nil { logger.Fatal("Could not create collector proxy", zap.Error(err)) From d238d12d8bfe8cfb1f22e870d0a2dc5738f3e8a4 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 5 Oct 2018 17:25:56 +0200 Subject: [PATCH 10/11] Remove comments Signed-off-by: Pavol Loffay --- cmd/agent/app/builder_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 9d4237ee135..b308e43f879 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -102,7 +102,6 @@ func TestBuilderFromConfig(t *testing.T) { func TestBuilderWithExtraReporter(t *testing.T) { cfg := &Builder{} - //configureSamplingManager(t, cfg, metrics.NullFactory) agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) assert.NoError(t, err) assert.NotNil(t, agent) @@ -110,7 +109,6 @@ func TestBuilderWithExtraReporter(t *testing.T) { func TestBuilderMetricsHandler(t *testing.T) { b := &Builder{} - //configureSamplingManager(t, b, metrics.NullFactory) b.Metrics.Backend = "expvar" b.Metrics.HTTPRoute = "/expvar" agent, err := b.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory) From 8af5d3e37d8305ae4c8e36b7b1675d824f7baee7 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 11 Oct 2018 14:31:15 +0200 Subject: [PATCH 11/11] Use directly reporters Signed-off-by: Pavol Loffay --- cmd/agent/app/builder.go | 16 ++++++++-------- cmd/agent/app/builder_test.go | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 0cb12802755..e779313ac18 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -74,7 +74,7 @@ type Builder struct { HTTPServer HTTPServerConfiguration `yaml:"httpServer"` Metrics jmetrics.Builder `yaml:"metrics"` - collectorProxy []CollectorProxy + reporters []reporter.Reporter } // ProcessorConfiguration holds config for a processor that receives spans from Server @@ -97,9 +97,9 @@ type HTTPServerConfiguration struct { HostPort string `yaml:"hostPort" validate:"nonzero"` } -// WithCollectorProxy adds auxiliary reporters. -func (b *Builder) WithCollectorProxy(r ...CollectorProxy) *Builder { - b.collectorProxy = append(b.collectorProxy, r...) +// WithReporter adds auxiliary reporters. +func (b *Builder) WithReporter(r ...reporter.Reporter) *Builder { + b.reporters = append(b.reporters, r...) return b } @@ -115,13 +115,13 @@ func (b *Builder) CreateAgent(primaryProxy CollectorProxy, logger *zap.Logger, m } func (b *Builder) getReporter(primaryProxy CollectorProxy) reporter.Reporter { - if len(b.collectorProxy) == 0 { + if len(b.reporters) == 0 { return primaryProxy.GetReporter() } - rep := make([]reporter.Reporter, len(b.collectorProxy)+1) + rep := make([]reporter.Reporter, len(b.reporters)+1) rep[0] = primaryProxy.GetReporter() - for i, p := range b.collectorProxy { - rep[i+1] = p.GetReporter() + for i, r := range b.reporters { + rep[i+1] = r } return reporter.NewMultiReporter(rep...) } diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index b308e43f879..89f0de528c0 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -156,7 +156,7 @@ func TestMultipleCollectorProxies(t *testing.T) { b := Builder{} ra := fakeCollectorProxy{} rb := fakeCollectorProxy{} - b.WithCollectorProxy(ra) + b.WithReporter(ra) r := b.getReporter(rb) mr, ok := r.(reporter.MultiReporter) require.True(t, ok)