Skip to content

Commit

Permalink
Refactor agent configuration (#1092)
Browse files Browse the repository at this point in the history
* Make agent reporters configurable

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Add nil check

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix docker compose

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Remove redundant if

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Deprecate flags

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Pass metrics explicitly

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Collector proxy

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* interface collector proxy

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix all-in-one

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Remove comments

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Use directly reporters

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored and yurishkuro committed Oct 29, 2018
1 parent 28c8ba2 commit 3bdb8e6
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 147 deletions.
6 changes: 4 additions & 2 deletions cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

jmetrics "github.com/jaegertracing/jaeger/pkg/metrics"
Expand All @@ -33,7 +34,7 @@ import (

func TestAgentStartError(t *testing.T) {
cfg := &Builder{}
agent, err := cfg.CreateAgent(zap.NewNop())
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
require.NoError(t, err)
agent.httpServer.Addr = "bad-address"
assert.Error(t, agent.Run())
Expand Down Expand Up @@ -100,7 +101,8 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) {
},
}
logger, logBuf := testutils.NewLogger()
agent, err := cfg.CreateAgent(logger)
f, _ := cfg.Metrics.CreateMetricsFactory("jaeger")
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, logger, f)
require.NoError(t, err)
ch := make(chan error, 2)
go func() {
Expand Down
92 changes: 33 additions & 59 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ package app
import (
"fmt"
"net/http"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
tchreporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
jmetrics "github.com/jaegertracing/jaeger/pkg/metrics"
Expand All @@ -37,11 +34,9 @@ import (
)

const (
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultServerWorkers = 10
defaultMinPeers = 3
defaultConnCheckTimeout = 250 * time.Millisecond
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultServerWorkers = 10

defaultHTTPServerHostPort = ":5778"

Expand All @@ -67,16 +62,19 @@ var (
}
)

// CollectorProxy provides access to Reporter and ClientConfigManager
type CollectorProxy interface {
GetReporter() reporter.Reporter
GetManager() httpserver.ClientConfigManager
}

// Builder Struct to hold configurations
type Builder struct {
Processors []ProcessorConfiguration `yaml:"processors"`
HTTPServer HTTPServerConfiguration `yaml:"httpServer"`
Metrics jmetrics.Builder `yaml:"metrics"`

tchreporter.Builder `yaml:",inline"`

otherReporters []reporter.Reporter
metricsFactory metrics.Factory
reporters []reporter.Reporter
}

// ProcessorConfiguration holds config for a processor that receives spans from Server
Expand All @@ -100,62 +98,35 @@ type HTTPServerConfiguration struct {
}

// WithReporter adds auxiliary reporters.
func (b *Builder) WithReporter(r reporter.Reporter) *Builder {
b.otherReporters = append(b.otherReporters, r)
return b
}

// WithMetricsFactory sets an externally initialized metrics factory.
func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder {
b.metricsFactory = mf
func (b *Builder) WithReporter(r ...reporter.Reporter) *Builder {
b.reporters = append(b.reporters, r...)
return b
}

func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (*tchreporter.Reporter, error) {
return b.CreateReporter(mFactory, logger)
}

func (b *Builder) getMetricsFactory() (metrics.Factory, error) {
if b.metricsFactory != nil {
return b.metricsFactory, nil
}

baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger")
// CreateAgent creates the Agent
func (b *Builder) CreateAgent(primaryProxy CollectorProxy, logger *zap.Logger, mFactory metrics.Factory) (*Agent, error) {
r := b.getReporter(primaryProxy)
processors, err := b.getProcessors(r, mFactory, logger)
if err != nil {
return nil, err
}

return baseFactory.Namespace("agent", nil), nil
server := b.HTTPServer.getHTTPServer(primaryProxy.GetManager(), mFactory, &b.Metrics)
return NewAgent(processors, server, logger), nil
}

// CreateAgent creates the Agent
func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) {
mFactory, err := b.getMetricsFactory()
if err != nil {
return nil, errors.Wrap(err, "cannot create metrics factory")
}
mainReporter, err := b.createMainReporter(mFactory, logger)
if err != nil {
return nil, errors.Wrap(err, "cannot create main Reporter")
func (b *Builder) getReporter(primaryProxy CollectorProxy) reporter.Reporter {
if len(b.reporters) == 0 {
return primaryProxy.GetReporter()
}
var rep reporter.Reporter = mainReporter
if len(b.otherReporters) > 0 {
reps := append([]reporter.Reporter{mainReporter}, b.otherReporters...)
rep = reporter.NewMultiReporter(reps...)
rep := make([]reporter.Reporter, len(b.reporters)+1)
rep[0] = primaryProxy.GetReporter()
for i, r := range b.reporters {
rep[i+1] = r
}
processors, err := b.GetProcessors(rep, mFactory, logger)
if err != nil {
return nil, err
}
httpServer := b.HTTPServer.GetHTTPServer(b.CollectorServiceName, mainReporter.Channel(), mFactory)
if h := b.Metrics.Handler(); mFactory != nil && h != nil {
httpServer.Handler.(*http.ServeMux).Handle(b.Metrics.HTTPRoute, h)
}
return NewAgent(processors, httpServer, logger), nil
return reporter.NewMultiReporter(rep...)
}

// GetProcessors creates Processors with attached Reporter
func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) {
func (b *Builder) getProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) {
retMe := make([]processors.Processor, len(b.Processors))
for idx, cfg := range b.Processors {
protoFactory, ok := protocolFactoryMap[cfg.Protocol]
Expand Down Expand Up @@ -185,12 +156,15 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory,
}

// GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries.
func (c HTTPServerConfiguration) GetHTTPServer(svc string, channel *tchannel.Channel, mFactory metrics.Factory) *http.Server {
mgr := httpserver.NewCollectorProxy(svc, channel, mFactory)
func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) *http.Server {
if c.HostPort == "" {
c.HostPort = defaultHTTPServerHostPort
}
return httpserver.NewHTTPServer(c.HostPort, mgr, mFactory)
server := httpserver.NewHTTPServer(c.HostPort, manager, mFactory)
if h := mBuilder.Handler(); mFactory != nil && h != nil {
server.Handler.(*http.ServeMux).Handle(mBuilder.HTTPRoute, h)
}
return server
}

// GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration
Expand Down
91 changes: 37 additions & 54 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package app

import (
"errors"
"fmt"
"strings"
"testing"

Expand All @@ -25,7 +26,11 @@ import (
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

Expand All @@ -51,14 +56,6 @@ processors:
httpServer:
hostPort: 4.4.4.4:5778
collectorHostPorts:
- 127.0.0.1:14267
- 127.0.0.1:14268
- 127.0.0.1:14269
collectorServiceName: some-collector-service
minPeers: 4
`

func TestBuilderFromConfig(t *testing.T) {
Expand Down Expand Up @@ -101,59 +98,24 @@ func TestBuilderFromConfig(t *testing.T) {
},
}, cfg.Processors[2])
assert.Equal(t, "4.4.4.4:5778", cfg.HTTPServer.HostPort)

assert.Equal(t, 4, cfg.DiscoveryMinPeers)
assert.Equal(t, "some-collector-service", cfg.CollectorServiceName)
assert.Equal(
t,
[]string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
}

func TestBuilderWithExtraReporter(t *testing.T) {
cfg := &Builder{}
cfg.WithReporter(fakeReporter{})
agent, err := cfg.CreateAgent(zap.NewNop())
agent, err := cfg.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
assert.NoError(t, err)
assert.NotNil(t, agent)
}

func TestBuilderMetrics(t *testing.T) {
mf := metrics.NullFactory
b := new(Builder).WithMetricsFactory(mf)
mf2, err := b.getMetricsFactory()
assert.NoError(t, err)
assert.Equal(t, mf, mf2)
}

func TestBuilderMetricsHandler(t *testing.T) {
b := &Builder{}
b.Metrics.Backend = "expvar"
b.Metrics.HTTPRoute = "/expvar"
factory, err := b.Metrics.CreateMetricsFactory("test")
assert.NoError(t, err)
assert.NotNil(t, factory)
b.metricsFactory = factory
agent, err := b.CreateAgent(zap.NewNop())
agent, err := b.CreateAgent(fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
assert.NoError(t, err)
assert.NotNil(t, agent)
}

func TestBuilderMetricsError(t *testing.T) {
b := &Builder{}
b.Metrics.Backend = "invalid"
_, err := b.CreateAgent(zap.NewNop())
assert.EqualError(t, err, "cannot create metrics factory: unknown metrics backend specified")
}

func TestBuilderWithDiscoveryError(t *testing.T) {
cfg := &Builder{}
cfg.WithDiscoverer(fakeDiscoverer{})
agent, err := cfg.CreateAgent(zap.NewNop())
assert.EqualError(t, err, "cannot create main Reporter: cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified")
assert.Nil(t, agent)
}

func TestBuilderWithProcessorErrors(t *testing.T) {
testCases := []struct {
model Model
Expand All @@ -180,7 +142,7 @@ func TestBuilderWithProcessorErrors(t *testing.T) {
},
},
}
_, err := cfg.CreateAgent(zap.NewNop())
_, err := cfg.CreateAgent(&fakeCollectorProxy{}, zap.NewNop(), metrics.NullFactory)
assert.Error(t, err)
if testCase.err != "" {
assert.EqualError(t, err, testCase.err)
Expand All @@ -190,18 +152,39 @@ func TestBuilderWithProcessorErrors(t *testing.T) {
}
}

type fakeReporter struct{}
func TestMultipleCollectorProxies(t *testing.T) {
b := Builder{}
ra := fakeCollectorProxy{}
rb := fakeCollectorProxy{}
b.WithReporter(ra)
r := b.getReporter(rb)
mr, ok := r.(reporter.MultiReporter)
require.True(t, ok)
fmt.Println(mr)
assert.Equal(t, rb, mr[0])
assert.Equal(t, ra, mr[1])
}

func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
return nil
type fakeCollectorProxy struct {
}

func (fr fakeReporter) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
func (f fakeCollectorProxy) GetReporter() reporter.Reporter {
return fakeCollectorProxy{}
}
func (f fakeCollectorProxy) GetManager() httpserver.ClientConfigManager {
return fakeCollectorProxy{}
}

type fakeDiscoverer struct{}
func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
return nil
}
func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
}

func (fd fakeDiscoverer) Instances() ([]string, error) {
return nil, errors.New("discoverer error")
func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, errors.New("no peers available")
}
func (fakeCollectorProxy) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
return nil, nil
}
21 changes: 0 additions & 21 deletions cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package app
import (
"flag"
"fmt"
"strings"

"github.com/spf13/viper"
)
Expand All @@ -27,10 +26,7 @@ const (
suffixServerQueueSize = "server-queue-size"
suffixServerMaxPacketSize = "server-max-packet-size"
suffixServerHostPort = "server-host-port"
collectorHostPort = "collector.host-port"
httpServerHostPort = "http-server.host-port"
discoveryMinPeers = "discovery.min-peers"
discoveryConnCheckTimeout = "discovery.conn-check-timeout"
)

var defaultProcessors = []struct {
Expand All @@ -52,22 +48,10 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server")
flags.String(prefix+suffixServerHostPort, processor.hostPort, "host:port for the UDP server")
}
flags.String(
collectorHostPort,
"",
"comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)")
flags.String(
httpServerHostPort,
defaultHTTPServerHostPort,
"host:port of the http server (e.g. for /sampling point and /baggage endpoint)")
flags.Int(
discoveryMinPeers,
defaultMinPeers,
"if using service discovery, the min number of connections to maintain to the backend")
flags.Duration(
discoveryConnCheckTimeout,
defaultConnCheckTimeout,
"sets the timeout used when establishing new connections")
}

// InitFromViper initializes Builder with properties retrieved from Viper.
Expand All @@ -84,11 +68,6 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder {
b.Processors = append(b.Processors, *p)
}

if len(v.GetString(collectorHostPort)) > 0 {
b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",")
}
b.HTTPServer.HostPort = v.GetString(httpServerHostPort)
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout)
return b
}
Loading

0 comments on commit 3bdb8e6

Please sign in to comment.