Skip to content

Commit

Permalink
Use roundRobin balancing even if only one hostname (jaegertracing#1329)
Browse files Browse the repository at this point in the history
If you run jaeger-agent with `--reporter.grpc.host-port
dns:///jaeger-collector-hostname:14250` (note the `dns:///` part!),
the GRPC client resolves the given hostname to one or more IPs and can
load balance among the backend addresses it finds.  It even looks up
and obeys SRV records, if they exist.  This change makes jaeger-agent
use round-robin load balancing in that situation, rather than the
default of `pick_first`.  If there is only one backend, or the user
doesn't explicitly put `dns:///` before the hostname, this will
have no effect.

Signed-off-by: Benjamin Staffin <benley@gmail.com>
  • Loading branch information
benley authored and iori-yja committed Feb 15, 2019
1 parent 56475b1 commit 06afbff
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger)
conn, _ = grpc.Dial(r.Scheme()+":///round_robin", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
} else {
// It does not return error if the collector is not running
conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure())
conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
}
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
return &ProxyBuilder{
Expand Down
7 changes: 7 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Configuration struct {
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}
Expand All @@ -80,6 +81,7 @@ type ClientBuilder interface {
GetTagDotReplacement() string
GetUseReadWriteAliases() bool
GetTokenFilePath() string
IsEnabled() bool
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -233,6 +235,11 @@ func (c *Configuration) GetTokenFilePath() string {
return c.TokenFilePath
}

// IsEnabled determines whether storage is enabled
func (c *Configuration) IsEnabled() bool {
return c.Enabled
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
Expand Down
21 changes: 15 additions & 6 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger,
return errors.Wrap(err, "failed to create primary Elasticsearch client")
}
f.primaryClient = primaryClient
archiveClient, err := f.archiveConfig.NewClient(logger, metricsFactory)
if err != nil {
return errors.Wrap(err, "failed to create archive Elasticsearch client")
if f.archiveConfig.IsEnabled() {
f.archiveClient, err = f.archiveConfig.NewClient(logger, metricsFactory)
if err != nil {
return errors.Wrap(err, "failed to create archive Elasticsearch client")
}
}
f.archiveClient = archiveClient
return nil
}

Expand Down Expand Up @@ -126,12 +127,20 @@ func loadTagsFromFile(filePath string) ([]string, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.Options.Get(archiveNamespace), true)
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
}

func createSpanReader(
Expand Down
39 changes: 35 additions & 4 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@ func TestElasticsearchFactory(t *testing.T) {
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), healthcheck.GetNullStatusReporter()), "failed to create primary Elasticsearch client: made-up error")

f.primaryConfig = &mockClientBuilder{}
<<<<<<< HEAD
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2")}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), healthcheck.GetNullStatusReporter()), "failed to create archive Elasticsearch client: made-up error2")
=======
f.archiveConfig = &mockClientBuilder{err: errors.New("made-up error2"), Configuration:escfg.Configuration{Enabled:true}}
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2")
>>>>>>> e96fe91... Make Elasticsearch archive storage optional (#1334)

f.archiveConfig = &mockClientBuilder{}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), healthcheck.GetNullStatusReporter()))
Expand Down Expand Up @@ -128,12 +133,12 @@ func TestLoadTagsFromFile(t *testing.T) {

func TestFactory_LoadMapping(t *testing.T) {
spanMapping, serviceMapping := GetMappings(10, 0)
tests := []struct{
name string
tests := []struct {
name string
toTest string
}{
{name: "jaeger-span.json", toTest:spanMapping},
{name: "jaeger-service.json", toTest:serviceMapping},
{name: "jaeger-span.json", toTest: spanMapping},
{name: "jaeger-service.json", toTest: serviceMapping},
}
for _, test := range tests {
mapping := loadMapping(test.name)
Expand All @@ -150,3 +155,29 @@ func TestFactory_LoadMapping(t *testing.T) {
assert.Equal(t, expectedMapping, test.toTest)
}
}

func TestArchiveDisabled(t *testing.T) {
f := NewFactory()
f.Options.Get(archiveNamespace).Enabled = false
w, err := f.CreateArchiveSpanWriter()
assert.Nil(t, w)
assert.Nil(t, err)
r, err := f.CreateArchiveSpanReader()
assert.Nil(t, r)
assert.Nil(t, err)
}

func TestArchiveEnabled2(t *testing.T) {
f := NewFactory()
f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{}
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
f.Options.Get(archiveNamespace).Enabled = true
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
}
9 changes: 9 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixEnabled = ".enabled"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
TagDotReplacement: "@",
Enabled: true,
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
Expand Down Expand Up @@ -206,6 +208,12 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
"(experimental) Use read and write aliases for indices. Use this option with Elasticsearch rollover "+
"API. It requires an external component to create aliases before startup and then performing its management. "+
"Note that "+nsConfig.namespace+suffixMaxSpanAge+" is not taken into the account and has to be substituted by external component managing read alias.")
if nsConfig.namespace == archiveNamespace {
flagSet.Bool(
nsConfig.namespace+suffixEnabled,
nsConfig.Enabled,
"Enable extra storage")
}
}

// InitFromViper initializes Options with properties from viper
Expand Down Expand Up @@ -240,6 +248,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
}

// GetPrimary returns primary configuration.
Expand Down

0 comments on commit 06afbff

Please sign in to comment.