Skip to content

Commit

Permalink
Refactor to new naming
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuri Shkuro committed Apr 22, 2017
1 parent 171ce31 commit dfc00d3
Show file tree
Hide file tree
Showing 19 changed files with 294 additions and 298 deletions.
61 changes: 41 additions & 20 deletions examples/profilesvc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/go-kit/kit/examples/profilesvc"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/cache"
"github.com/go-kit/kit/sd/consul"
"github.com/go-kit/kit/sd/lb"
)
Expand Down Expand Up @@ -41,69 +40,91 @@ func New(consulAddr string, logger log.Logger) (profilesvc.Service, error) {

var (
sdclient = consul.NewClient(apiclient)
notifier = consul.NewNotifier(sdclient, logger, consulService, consulTags, passingOnly)
instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly)
endpoints profilesvc.Endpoints
)
// TODO: thought experiment
mapping := []struct {
factory func(s profilesvc.Service) endpoint.Endpoint
endpoint *endpoint.Endpoint
}{
{
factory: profilesvc.MakePostProfileEndpoint,
endpoint: &endpoints.PostProfileEndpoint,
},
{
factory: profilesvc.MakeGetProfileEndpoint,
endpoint: &endpoints.GetProfileEndpoint,
},
}
for _, m := range mapping {
factory := factoryFor(m.factory)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
*m.endpoint = retry
}
// TODO: why not 2 lines per endpoint registration above instead of 7 lines per endpoint below?
{
factory := factoryFor(profilesvc.MakePostProfileEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetProfileEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePutProfileEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PutProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePatchProfileEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PatchProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteProfileEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressesEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressesEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePostAddressEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteAddressEndpoint)
subscriber := cache.NewObserver(notifier, notifier, factory, logger)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteAddressEndpoint = retry
}
Expand Down
72 changes: 0 additions & 72 deletions sd/cache/observer.go

This file was deleted.

28 changes: 14 additions & 14 deletions sd/consul/notifier.go → sd/consul/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/internal/instance"
)

const defaultIndex = 0

// Notifier yields endpoints for a service in Consul. Updates to the service
// are watched and will update the Discoverer instances as well as be used to
// notify any registered observers.
type Notifier struct {
sd.Dispatcher
// Instancer yields endpoints for a service in Consul.
type Instancer struct {
instance.Cache
client Client
logger log.Logger
service string
Expand All @@ -25,13 +24,14 @@ type Notifier struct {
quitc chan struct{}
}

var _ sd.Notifier = &Notifier{}
var _ sd.Instancer = &Instancer{}

// NewNotifier returns a Consul notifier which returns instances for the
// NewInstancer returns a Consul notifier which returns instances for the
// requested service. It only returns instances for which all of the passed tags
// are present.
func NewNotifier(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Notifier {
s := &Notifier{
func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
s := &Instancer{
Cache: *instance.NewCache(),
client: client,
logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
service: service,
Expand All @@ -47,17 +47,17 @@ func NewNotifier(client Client, logger log.Logger, service string, tags []string
s.logger.Log("err", err)
}

s.Notify(instances)
s.Update(instances)
go s.loop(index)
return s
}

// Stop terminates the subscriber.
func (s *Notifier) Stop() {
func (s *Instancer) Stop() {
close(s.quitc)
}

func (s *Notifier) loop(lastIndex uint64) {
func (s *Instancer) loop(lastIndex uint64) {
var (
instances []string
err error
Expand All @@ -70,12 +70,12 @@ func (s *Notifier) loop(lastIndex uint64) {
case err != nil:
s.logger.Log("err", err)
default:
s.Notify(instances)
s.Update(instances)
}
}
}

func (s *Notifier) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
tag := ""
if len(s.tags) > 0 {
tag = s.tags[0]
Expand Down
32 changes: 8 additions & 24 deletions sd/consul/notifier_test.go → sd/consul/instancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,10 @@ func TestSubscriber(t *testing.T) {
client = newTestClient(consulState)
)

s := NewNotifier(client, logger, "search", []string{"api"}, true)
s := NewInstancer(client, logger, "search", []string{"api"}, true)
defer s.Stop()

instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

instances := s.Instances()
if want, have := 2, len(instances); want != have {
t.Errorf("want %d, have %d", want, have)
}
Expand All @@ -82,14 +78,10 @@ func TestSubscriberNoService(t *testing.T) {
client = newTestClient(consulState)
)

s := NewNotifier(client, logger, "feed", []string{}, true)
s := NewInstancer(client, logger, "feed", []string{}, true)
defer s.Stop()

instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

instances := s.Instances()
if want, have := 0, len(instances); want != have {
t.Fatalf("want %d, have %d", want, have)
}
Expand All @@ -101,28 +93,20 @@ func TestSubscriberWithTags(t *testing.T) {
client = newTestClient(consulState)
)

s := NewNotifier(client, logger, "search", []string{"api", "v2"}, true)
s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true)
defer s.Stop()

instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

instances := s.Instances()
if want, have := 1, len(instances); want != have {
t.Fatalf("want %d, have %d", want, have)
}
}

func TestSubscriberAddressOverride(t *testing.T) {
s := NewNotifier(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
defer s.Stop()

instances, err := s.Instances()
if err != nil {
t.Fatal(err)
}

instances := s.Instances()
if want, have := 1, len(instances); want != have {
t.Fatalf("want %d, have %d", want, have)
}
Expand Down
9 changes: 0 additions & 9 deletions sd/discoverer.go

This file was deleted.

Loading

0 comments on commit dfc00d3

Please sign in to comment.