Skip to content

Commit f6cfb78

Browse files
guanwyurishkuro
guanw
authored andcommitted
Add grpc_resolver using external discovery service (#1498)
* add grpc_resolver which gets notified from external discovery service Signed-off-by: Jude Wang <judew@uber.com> * address feedback Signed-off-by: Jude Wang <judew@uber.com> * minor update on naming Signed-off-by: Jude Wang <judew@uber.com> * address more feedback Signed-off-by: Jude Wang <judew@uber.com> * revert gopkg.lock change Signed-off-by: Jude Wang <judew@uber.com> * update naming for waitGroup Signed-off-by: Jude Wang <judew@uber.com> * more feedback Signed-off-by: Jude Wang <judew@uber.com> * minor updates Signed-off-by: Jude Wang <judew@uber.com> * resolve merge conflict Signed-off-by: Jude Wang <judew@uber.com> * more feedback Signed-off-by: Jude Wang <judew@uber.com> * more feedback Signed-off-by: Jude Wang <judew@uber.com>
1 parent 2aacfaf commit f6cfb78

File tree

9 files changed

+392
-42
lines changed

9 files changed

+392
-42
lines changed

Gopkg.lock

+6-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/agent/app/reporter/grpc/builder.go

+23-26
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"google.golang.org/grpc/resolver/manual"
2828

2929
"github.com/jaegertracing/jaeger/pkg/discovery"
30+
"github.com/jaegertracing/jaeger/pkg/discovery/grpcresolver"
3031
)
3132

3233
// ConnBuilder Struct to hold configurations
@@ -39,15 +40,9 @@ type ConnBuilder struct {
3940
TLSCA string
4041
TLSServerName string
4142

42-
notifier discovery.Notifier
43-
}
44-
45-
// WithDiscoveryNotifier sets service discovery notifier
46-
// TODO User should provide their own notifier so that notifier can push address updates to grpc resolver by invoking notifier.Notify(instances []string)
47-
// We will add integation code with custom notifier and resolver in next PR
48-
func (b *ConnBuilder) WithDiscoveryNotifier(n discovery.Notifier) *ConnBuilder {
49-
b.notifier = n
50-
return b
43+
DiscoveryMinPeers int
44+
Notifier discovery.Notifier
45+
Discoverer discovery.Discoverer
5146
}
5247

5348
// NewConnBuilder creates a new grpc connection builder.
@@ -81,26 +76,28 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er
8176
dialOptions = append(dialOptions, grpc.WithInsecure())
8277
}
8378

84-
if b.notifier != nil {
85-
return nil, errors.New("not implemented")
86-
}
87-
if b.CollectorHostPorts == nil {
88-
return nil, errors.New("at least one collector hostPort address is required when resolver is not available")
89-
}
90-
if len(b.CollectorHostPorts) > 1 {
91-
r, _ := manual.GenerateAndRegisterManualResolver()
92-
var resolvedAddrs []resolver.Address
93-
for _, addr := range b.CollectorHostPorts {
94-
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: addr})
95-
}
96-
r.InitialState(resolver.State{Addresses: resolvedAddrs})
97-
dialTarget = r.Scheme() + ":///round_robin"
98-
logger.Info("Agent is connecting to a static list of collectors", zap.String("dialTarget", dialTarget), zap.String("collector hosts", strings.Join(b.CollectorHostPorts, ",")))
79+
if b.Notifier != nil && b.Discoverer != nil {
80+
logger.Info("Using external discovery service with roundrobin load balancer")
81+
grpcResolver := grpcresolver.New(b.Notifier, b.Discoverer, logger, b.DiscoveryMinPeers)
82+
dialTarget = grpcResolver.Scheme() + ":///round_robin"
9983
} else {
100-
dialTarget = b.CollectorHostPorts[0]
84+
if b.CollectorHostPorts == nil {
85+
return nil, errors.New("at least one collector hostPort address is required when resolver is not available")
86+
}
87+
if len(b.CollectorHostPorts) > 1 {
88+
r, _ := manual.GenerateAndRegisterManualResolver()
89+
var resolvedAddrs []resolver.Address
90+
for _, addr := range b.CollectorHostPorts {
91+
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: addr})
92+
}
93+
r.InitialState(resolver.State{Addresses: resolvedAddrs})
94+
dialTarget = r.Scheme() + ":///round_robin"
95+
logger.Info("Agent is connecting to a static list of collectors", zap.String("dialTarget", dialTarget), zap.String("collector hosts", strings.Join(b.CollectorHostPorts, ",")))
96+
} else {
97+
dialTarget = b.CollectorHostPorts[0]
98+
}
10199
}
102100
dialOptions = append(dialOptions, grpc.WithBalancerName(roundrobin.Name))
103-
104101
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(b.MaxRetry))))
105102
return grpc.Dial(dialTarget, dialOptions...)
106103
}

cmd/agent/app/reporter/grpc/builder_test.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func TestBuilderWithCollectors(t *testing.T) {
7878
hostPorts []string
7979
checkSuffixOnly bool
8080
notifier discovery.Notifier
81+
discoverer discovery.Discoverer
8182
err error
8283
}{
8384
{
@@ -86,28 +87,31 @@ func TestBuilderWithCollectors(t *testing.T) {
8687
hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"},
8788
checkSuffixOnly: true,
8889
notifier: nil,
90+
discoverer: nil,
8991
},
9092
{
9193
target: "127.0.0.1:9876",
9294
name: "with single host",
9395
hostPorts: []string{"127.0.0.1:9876"},
9496
checkSuffixOnly: false,
9597
notifier: nil,
98+
discoverer: nil,
9699
},
97100
{
98-
target: "dns://random_stuff",
99-
name: "with custom resolver",
101+
target: "///round_robin",
102+
name: "with custom resolver and fixed discoverer",
100103
hostPorts: []string{"dns://random_stuff"},
101-
checkSuffixOnly: false,
104+
checkSuffixOnly: true,
102105
notifier: noopNotifier{},
103-
err: errors.New("not implemented"),
106+
discoverer: discovery.FixedDiscoverer{},
104107
},
105108
{
106109
target: "",
107110
name: "without collectorPorts and resolver",
108111
hostPorts: nil,
109112
checkSuffixOnly: false,
110113
notifier: nil,
114+
discoverer: nil,
111115
err: errors.New("at least one collector hostPort address is required when resolver is not available"),
112116
},
113117
}
@@ -117,7 +121,8 @@ func TestBuilderWithCollectors(t *testing.T) {
117121
// Use NewBuilder for code coverage consideration
118122
cfg := NewConnBuilder()
119123
cfg.CollectorHostPorts = test.hostPorts
120-
cfg.WithDiscoveryNotifier(test.notifier)
124+
cfg.Notifier = test.notifier
125+
cfg.Discoverer = test.discoverer
121126

122127
conn, err := cfg.CreateConnection(zap.NewNop())
123128
if err != nil {

cmd/agent/app/reporter/grpc/flags.go

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
collectorTLS = gRPCPrefix + "tls"
3030
collectorTLSCA = gRPCPrefix + "tls.ca"
3131
collectorTLSServerName = gRPCPrefix + "tls.server-name"
32+
discoveryMinPeers = gRPCPrefix + "discovery.min-peers"
3233
)
3334

3435
// AddFlags adds flags for Options.
@@ -38,6 +39,7 @@ func AddFlags(flags *flag.FlagSet) {
3839
flags.Bool(collectorTLS, false, "Enable TLS.")
3940
flags.String(collectorTLSCA, "", "Path to a TLS CA file. (default use the systems truststore)")
4041
flags.String(collectorTLSServerName, "", "Override the TLS server name.")
42+
flags.Int(discoveryMinPeers, 3, "Max number of collectors to which the agent will try to connect at any given time")
4143
}
4244

4345
// InitFromViper initializes Options with properties retrieved from Viper.
@@ -50,5 +52,6 @@ func (b *ConnBuilder) InitFromViper(v *viper.Viper) *ConnBuilder {
5052
b.TLS = v.GetBool(collectorTLS)
5153
b.TLSCA = v.GetString(collectorTLSCA)
5254
b.TLSServerName = v.GetString(collectorTLSServerName)
55+
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
5356
return b
5457
}

cmd/agent/app/reporter/grpc/flags_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ func TestBindFlags(t *testing.T) {
3030
expected *ConnBuilder
3131
}{
3232
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111", "--reporter.grpc.retry.max=15"},
33-
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111"}, MaxRetry: 15}},
33+
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111"}, MaxRetry: 15, DiscoveryMinPeers: 3}},
3434
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222"},
35-
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry}},
35+
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry, DiscoveryMinPeers: 3}},
36+
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222", "--reporter.grpc.discovery.min-peers=5"},
37+
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry, DiscoveryMinPeers: 5}},
3638
}
3739
for _, test := range tests {
3840
v := viper.New()

cmd/agent/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ func main() {
5454
Namespace(metrics.NSOptions{Name: "agent"})
5555

5656
rOpts := new(reporter.Options).InitFromViper(v)
57-
tChanOpts := tchannel.NewBuilder().InitFromViper(v, logger)
58-
grpcOpts := grpc.NewConnBuilder().InitFromViper(v)
59-
cp, err := app.CreateCollectorProxy(rOpts, tChanOpts, grpcOpts, logger, mFactory)
57+
tchanBuilder := tchannel.NewBuilder().InitFromViper(v, logger)
58+
grpcBuilder := grpc.NewConnBuilder().InitFromViper(v)
59+
cp, err := app.CreateCollectorProxy(rOpts, tchanBuilder, grpcBuilder, logger, mFactory)
6060
if err != nil {
6161
logger.Fatal("Could not create collector proxy", zap.Error(err))
6262
}

cmd/all-in-one/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,12 @@ func main() {
119119

120120
aOpts := new(agentApp.Builder).InitFromViper(v)
121121
repOpts := new(agentRep.Options).InitFromViper(v)
122-
tchannelRepOpts := agentTchanRep.NewBuilder().InitFromViper(v, logger)
123-
grpcRepOpts := agentGrpcRep.NewConnBuilder().InitFromViper(v)
122+
tchanBuilder := agentTchanRep.NewBuilder().InitFromViper(v, logger)
123+
grpcBuilder := agentGrpcRep.NewConnBuilder().InitFromViper(v)
124124
cOpts := new(collector.CollectorOptions).InitFromViper(v)
125125
qOpts := new(queryApp.QueryOptions).InitFromViper(v)
126126

127-
startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory)
127+
startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts, logger, metricsFactory)
128128
collectorSrv := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, svc.HC())
129129
querySrv := startQuery(
130130
svc, qOpts, archiveOptions(storageFactory, logger),

0 commit comments

Comments
 (0)