Skip to content

Commit

Permalink
Merge pull request #492 from yurishkuro/ys-discovery-via-push
Browse files Browse the repository at this point in the history
Support push model for service discovery
  • Loading branch information
peterbourgon committed Jun 4, 2017
2 parents 454871a + f5bc66e commit 62f8228
Show file tree
Hide file tree
Showing 43 changed files with 1,181 additions and 757 deletions.
18 changes: 10 additions & 8 deletions examples/apigateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,19 @@ func main() {
tags = []string{}
passingOnly = true
endpoints = addsvc.Endpoints{}
instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly)
)
{
factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger)
subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
endpoints.SumEndpoint = retry
}
{
factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger)
subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
endpoints.ConcatEndpoint = retry
}
Expand All @@ -120,18 +121,19 @@ func main() {
passingOnly = true
uppercase endpoint.Endpoint
count endpoint.Endpoint
instancer = consulsd.NewInstancer(client, logger, "stringsvc", tags, passingOnly)
)
{
factory := stringsvcFactory(ctx, "GET", "/uppercase")
subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
uppercase = retry
}
{
factory := stringsvcFactory(ctx, "GET", "/count")
subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
count = retry
}
Expand Down
37 changes: 19 additions & 18 deletions examples/profilesvc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,68 +40,69 @@ func New(consulAddr string, logger log.Logger) (profilesvc.Service, error) {

var (
sdclient = consul.NewClient(apiclient)
instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly)
endpoints profilesvc.Endpoints
)
{
factory := factoryFor(profilesvc.MakePostProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePutProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PutProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePatchProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PatchProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressesEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressesEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePostAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteAddressEndpoint = retry
}
Expand Down
6 changes: 3 additions & 3 deletions examples/stringsvc3/proxying.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger
// discovery system.
var (
instanceList = split(instances)
subscriber sd.FixedSubscriber
endpointer sd.FixedEndpointer
)
logger.Log("proxy_to", fmt.Sprint(instanceList))
for _, instance := range instanceList {
var e endpoint.Endpoint
e = makeUppercaseProxy(ctx, instance)
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
subscriber = append(subscriber, e)
endpointer = append(endpointer, e)
}

// Now, build a single, retrying, load-balancing endpoint out of all of
// those individual endpoints.
balancer := lb.NewRoundRobin(subscriber)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(maxAttempts, maxTime, balancer)

// And finally, return the ServiceMiddleware, implemented by proxymw.
Expand Down
6 changes: 3 additions & 3 deletions sd/cache/benchmark_test.go → sd/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cache
package sd

import (
"io"
Expand All @@ -14,12 +14,12 @@ func BenchmarkEndpoints(b *testing.B) {
cb = make(closer)
cmap = map[string]io.Closer{"a": ca, "b": cb}
factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, cmap[instance], nil }
c = New(factory, log.NewNopLogger())
c = newEndpointCache(factory, log.NewNopLogger(), endpointerOptions{})
)

b.ReportAllocs()

c.Update([]string{"a", "b"})
c.Update(Event{Instances: []string{"a", "b"}})

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down
96 changes: 0 additions & 96 deletions sd/cache/cache.go

This file was deleted.

91 changes: 0 additions & 91 deletions sd/cache/cache_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion sd/consul/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// Package consul provides subscriber and registrar implementations for Consul.
// Package consul provides Instancer and Registrar implementations for Consul.
package consul
Loading

0 comments on commit 62f8228

Please sign in to comment.