Skip to content

Commit

Permalink
Initial POC
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuri Shkuro committed Mar 8, 2017
1 parent 212ef2c commit 39f0c3e
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 54 deletions.
20 changes: 11 additions & 9 deletions examples/profilesvc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/examples/profilesvc"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/cache"
"github.com/go-kit/kit/sd/consul"
"github.com/go-kit/kit/sd/lb"
)
Expand Down Expand Up @@ -40,67 +41,68 @@ func New(consulAddr string, logger log.Logger) (profilesvc.Service, error) {

var (
sdclient = consul.NewClient(apiclient)
notifier = consul.NewNotifier(sdclient, logger, consulService, consulTags, passingOnly)
endpoints profilesvc.Endpoints
)
{
factory := factoryFor(profilesvc.MakePostProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePutProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PutProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePatchProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PatchProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressesEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressesEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePostAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteAddressEndpoint = retry
Expand Down
9 changes: 4 additions & 5 deletions sd/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"github.com/go-kit/kit/sd"
)

// TODO move this to endpoint/cache
// TODO move this to endpoint/cache?

// Cache collects the most recent set of endpoints from a service discovery
// system via a subscriber, and makes them available to consumers. Cache is
// meant to be embedded inside of a concrete subscriber, and can serve Service
// invocations directly.
// Cache collects the most recent set of instances from a service discovery
// system, creates endpoints for them using a factory, and makes them available
// to consumers.
type Cache struct {
mtx sync.RWMutex
factory sd.Factory
Expand Down
72 changes: 72 additions & 0 deletions sd/cache/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cache

import (
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
)

// TODO if Cache is moved to endpoint/cache, then this can be renamed to Cache.

// Observer subscribes to updates from a service discovery system,
// creates endpoints for them using a factory, and makes them available
// to consumers. It implements Subscriber and Discoverer interfaces.
type Observer struct {
cache *Cache
notifications chan []string // used to receive notifications
notifier sd.Notifier
}

var _ sd.Subscriber = &Observer{} // API check
var _ sd.Discoverer = &Observer{} // API check

// NewObserver crates a new Observer.
func NewObserver(
discoverer sd.Discoverer,
notifier sd.Notifier,
factory sd.Factory,
logger log.Logger,
) *Observer {
obs := &Observer{
cache: New(factory, logger),
notifier: notifier,
notifications: make(chan []string, 10),
}

go obs.observe()
notifier.Register(obs.notifications)

// TBD - a bit of a race condition here if notifier and discoverer are not in sync

instances, err := discoverer.Instances()
if err == nil {
logger.Log("instances", len(instances))
} else {
logger.Log("err", err)
}
obs.cache.Update(instances)

return obs
}

// Close stops the observer.
func (o *Observer) Close() {
o.notifier.Unregister(o.notifications)
close(o.notifications)
}

// Instances implements the Discoverer interface.
func (o *Observer) Instances() ([]string, error) {
return o.cache.Instances(), nil
}

// Endpoints implements the Subscriber interface.
func (o *Observer) Endpoints() ([]endpoint.Endpoint, error) {
return o.cache.Endpoints(), nil
}

func (o *Observer) observe() {
for instances := range o.notifications {
o.cache.Update(instances)
}
}
41 changes: 14 additions & 27 deletions sd/consul/subscriber.go → sd/consul/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,32 @@ import (

consul "github.com/hashicorp/consul/api"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/cache"
)

const defaultIndex = 0

// Subscriber yields endpoints for a service in Consul. Updates to the service
// are watched and will update the Subscriber endpoints.
type Subscriber struct {
cache *cache.Cache
// Notifier yields endpoints for a service in Consul. Updates to the service
// are watched and will update the Discoverer instances as well as be used to
// notify any registered observers.
type Notifier struct {
sd.Dispatcher
client Client
logger log.Logger
service string
tags []string
passingOnly bool
endpointsc chan []endpoint.Endpoint
quitc chan struct{}
}

var _ sd.Subscriber = &Subscriber{}
var _ sd.Notifier = &Notifier{}

// NewSubscriber returns a Consul subscriber which returns endpoints for the
// NewNotifier returns a Consul notifier which returns instances for the
// requested service. It only returns instances for which all of the passed tags
// are present.
func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service string, tags []string, passingOnly bool) *Subscriber {
s := &Subscriber{
cache: cache.New(factory, logger),
func NewNotifier(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Notifier {
s := &Notifier{
client: client,
logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
service: service,
Expand All @@ -50,27 +47,17 @@ func NewSubscriber(client Client, factory sd.Factory, logger log.Logger, service
s.logger.Log("err", err)
}

s.cache.Update(instances)
s.Notify(instances)
go s.loop(index)
return s
}

// Instances implements the Discoverer interface.
func (s *Subscriber) Instances() ([]string, error) {
return s.cache.Instances(), nil
}

// Endpoints implements the Subscriber interface.
func (s *Subscriber) Endpoints() ([]endpoint.Endpoint, error) {
return s.cache.Endpoints(), nil
}

// Stop terminates the subscriber.
func (s *Subscriber) Stop() {
func (s *Notifier) Stop() {
close(s.quitc)
}

func (s *Subscriber) loop(lastIndex uint64) {
func (s *Notifier) loop(lastIndex uint64) {
var (
instances []string
err error
Expand All @@ -83,12 +70,12 @@ func (s *Subscriber) loop(lastIndex uint64) {
case err != nil:
s.logger.Log("err", err)
default:
s.cache.Update(instances)
s.Notify(instances)
}
}
}

func (s *Subscriber) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
func (s *Notifier) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
tag := ""
if len(s.tags) > 0 {
tag = s.tags[0]
Expand Down
34 changes: 21 additions & 13 deletions sd/consul/subscriber_test.go → sd/consul/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func TestSubscriber(t *testing.T) {
client = newTestClient(consulState)
)

s := NewSubscriber(client, testFactory, logger, "search", []string{"api"}, true)
s := NewNotifier(client, logger, "search", []string{"api"}, true)
defer s.Stop()

endpoints, err := s.Endpoints()
instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

if want, have := 2, len(endpoints); want != have {
if want, have := 2, len(instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
}
Expand All @@ -82,15 +82,15 @@ func TestSubscriberNoService(t *testing.T) {
client = newTestClient(consulState)
)

s := NewSubscriber(client, testFactory, logger, "feed", []string{}, true)
s := NewNotifier(client, logger, "feed", []string{}, true)
defer s.Stop()

endpoints, err := s.Endpoints()
instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

if want, have := 0, len(endpoints); want != have {
if want, have := 0, len(instances); want != have {
t.Fatalf("want %d, have %d", want, have)
}
}
Expand All @@ -101,33 +101,41 @@ func TestSubscriberWithTags(t *testing.T) {
client = newTestClient(consulState)
)

s := NewSubscriber(client, testFactory, logger, "search", []string{"api", "v2"}, true)
s := NewNotifier(client, logger, "search", []string{"api", "v2"}, true)
defer s.Stop()

endpoints, err := s.Endpoints()
instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

if want, have := 1, len(endpoints); want != have {
if want, have := 1, len(instances); want != have {
t.Fatalf("want %d, have %d", want, have)
}
}

func TestSubscriberAddressOverride(t *testing.T) {
s := NewSubscriber(newTestClient(consulState), testFactory, log.NewNopLogger(), "search", []string{"db"}, true)
s := NewNotifier(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
defer s.Stop()

endpoints, err := s.Endpoints()
instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

if want, have := 1, len(endpoints); want != have {
if want, have := 1, len(instances); want != have {
t.Fatalf("want %d, have %d", want, have)
}

response, err := endpoints[0](context.Background(), struct{}{})
endpoint, closer, err := testFactory(instances[0])
if err != nil {
t.Fatal(err)
}
if closer != nil {
defer closer.Close()
}

response, err := endpoint(context.Background(), struct{}{})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 39f0c3e

Please sign in to comment.