-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Eureka service discovery implementation #504
Changes from 2 commits
cb03da6
72a93ad
f5b8fe6
2a823da
443f6ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package eureka | ||
|
||
import ( | ||
fargo "github.com/hudl/fargo" | ||
) | ||
|
||
// Client is a wrapper around the Eureka API. | ||
type Client interface { | ||
// Register an instance with Eureka. | ||
Register(i *fargo.Instance) error | ||
|
||
// Deregister an instance from Eureka. | ||
Deregister(i *fargo.Instance) error | ||
|
||
// Send an instance heartbeat to Eureka. | ||
Heartbeat(i *fargo.Instance) error | ||
|
||
// Get all instances for an app in Eureka. | ||
Instances(app string) ([]*fargo.Instance, error) | ||
|
||
// Receive scheduled updates about an app's instances in Eureka. | ||
ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate | ||
} | ||
|
||
type client struct { | ||
connection *fargo.EurekaConnection | ||
} | ||
|
||
// NewClient returns an implementation of the Client interface, wrapping a | ||
// concrete connection to Eureka using the Fargo library. | ||
// Taking in Fargo's own connection abstraction gives the user maximum | ||
// freedom in regards to how that connection is configured. | ||
func NewClient(ec *fargo.EurekaConnection) Client { | ||
return &client{connection: ec} | ||
} | ||
|
||
func (c *client) Register(i *fargo.Instance) error { | ||
if c.instanceRegistered(i) { | ||
// Already registered. Send a heartbeat instead. | ||
return c.Heartbeat(i) | ||
} | ||
return c.connection.RegisterInstance(i) | ||
} | ||
|
||
func (c *client) Deregister(i *fargo.Instance) error { | ||
return c.connection.DeregisterInstance(i) | ||
} | ||
|
||
func (c *client) Heartbeat(i *fargo.Instance) (err error) { | ||
if err = c.connection.HeartBeatInstance(i); err != nil && c.instanceNotFoundErr(err) { | ||
// Instance not registered. Register first before sending heartbeats. | ||
return c.Register(i) | ||
} | ||
return err | ||
} | ||
|
||
func (c *client) Instances(app string) ([]*fargo.Instance, error) { | ||
stdApp, err := c.connection.GetApp(app) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return stdApp.Instances, nil | ||
} | ||
|
||
func (c *client) ScheduleUpdates(app string, quitc chan struct{}) <-chan fargo.AppUpdate { | ||
return c.connection.ScheduleAppUpdates(app, false, quitc) | ||
} | ||
|
||
func (c *client) instanceRegistered(i *fargo.Instance) bool { | ||
_, err := c.connection.GetInstance(i.App, i.Id()) | ||
return err == nil | ||
} | ||
|
||
func (c *client) instanceNotFoundErr(err error) bool { | ||
code, ok := fargo.HTTPResponseStatusCode(err) | ||
return ok && code == 404 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package eureka | ||
|
||
import ( | ||
"errors" | ||
"reflect" | ||
|
||
fargo "github.com/hudl/fargo" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise above (further comments elided) |
||
|
||
"github.com/go-kit/kit/log" | ||
) | ||
|
||
var ( | ||
errTest = errors.New("kaboom") | ||
loggerTest = log.NewNopLogger() | ||
instanceTest1 = &fargo.Instance{ | ||
HostName: "server1.acme.org", | ||
Port: 8080, | ||
App: "go-kit", | ||
IPAddr: "192.168.0.1", | ||
VipAddress: "192.168.0.1", | ||
SecureVipAddress: "192.168.0.1", | ||
HealthCheckUrl: "http://server1.acme.org:8080/healthz", | ||
StatusPageUrl: "http://server1.acme.org:8080/status", | ||
HomePageUrl: "http://server1.acme.org:8080/", | ||
Status: fargo.UP, | ||
DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, | ||
LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1}, | ||
} | ||
instanceTest2 = &fargo.Instance{ | ||
HostName: "server2.acme.org", | ||
Port: 8080, | ||
App: "go-kit", | ||
IPAddr: "192.168.0.2", | ||
VipAddress: "192.168.0.2", | ||
SecureVipAddress: "192.168.0.2", | ||
HealthCheckUrl: "http://server2.acme.org:8080/healthz", | ||
StatusPageUrl: "http://server2.acme.org:8080/status", | ||
HomePageUrl: "http://server2.acme.org:8080/", | ||
Status: fargo.UP, | ||
DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn}, | ||
} | ||
applicationTest = &fargo.Application{ | ||
Name: "go-kit", | ||
Instances: []*fargo.Instance{instanceTest1, instanceTest2}, | ||
} | ||
) | ||
|
||
type testClient struct { | ||
instances []*fargo.Instance | ||
application *fargo.Application | ||
errInstances error | ||
errApplication error | ||
errHeartbeat error | ||
} | ||
|
||
func (c *testClient) Register(i *fargo.Instance) error { | ||
for _, instance := range c.instances { | ||
if reflect.DeepEqual(*instance, *i) { | ||
return errors.New("already registered") | ||
} | ||
} | ||
|
||
c.instances = append(c.instances, i) | ||
return nil | ||
} | ||
|
||
func (c *testClient) Deregister(i *fargo.Instance) error { | ||
var newInstances []*fargo.Instance | ||
for _, instance := range c.instances { | ||
if reflect.DeepEqual(*instance, *i) { | ||
continue | ||
} | ||
newInstances = append(newInstances, instance) | ||
} | ||
if len(newInstances) == len(c.instances) { | ||
return errors.New("not registered") | ||
} | ||
|
||
c.instances = newInstances | ||
return nil | ||
} | ||
|
||
func (c *testClient) Heartbeat(i *fargo.Instance) (err error) { | ||
return c.errHeartbeat | ||
} | ||
|
||
func (c *testClient) Instances(app string) ([]*fargo.Instance, error) { | ||
return c.instances, c.errInstances | ||
} | ||
|
||
func (c *testClient) ScheduleUpdates(service string, quitc chan struct{}) <-chan fargo.AppUpdate { | ||
updatec := make(chan fargo.AppUpdate, 1) | ||
updatec <- fargo.AppUpdate{App: c.application, Err: c.errApplication} | ||
return updatec | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
// Package eureka provides subscriber and registrar implementations for Netflix OSS's Eureka | ||
package eureka |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
// +build integration | ||
|
||
package eureka | ||
|
||
import ( | ||
"io" | ||
"os" | ||
"testing" | ||
"time" | ||
|
||
fargo "github.com/hudl/fargo" | ||
|
||
"github.com/go-kit/kit/endpoint" | ||
"github.com/go-kit/kit/log" | ||
) | ||
|
||
// Package sd/eureka provides a wrapper around the Netflix Eureka service | ||
// registry by way of the Fargo library. This test assumes the user has an | ||
// instance of Eureka available at the address in the environment variable. | ||
// Example `${EUREKA_ADDR}` format: http://localhost:8761/eureka | ||
// | ||
// NOTE: when starting a Eureka server for integration testing, ensure | ||
// the response cache interval is reduced to one second. This can be | ||
// achieved with the following Java argument: | ||
// `-Deureka.server.responseCacheUpdateIntervalMs=1000` | ||
func TestIntegration(t *testing.T) { | ||
eurekaAddr := os.Getenv("EUREKA_ADDR") | ||
if eurekaAddr == "" { | ||
t.Skip("EUREKA_ADDR is not set") | ||
} | ||
|
||
var client Client | ||
{ | ||
var fargoConfig fargo.Config | ||
fargoConfig.Eureka.ServiceUrls = []string{eurekaAddr} | ||
fargoConfig.Eureka.PollIntervalSeconds = 1 | ||
|
||
fargoConnection := fargo.NewConnFromConfig(fargoConfig) | ||
client = NewClient(&fargoConnection) | ||
} | ||
|
||
logger := log.NewLogfmtLogger(os.Stderr) | ||
logger = log.With(logger, "ts", log.DefaultTimestamp) | ||
|
||
// Register one instance. | ||
registrar1 := NewRegistrar(client, instanceTest1, log.With(logger, "component", "registrar1")) | ||
registrar1.Register() | ||
defer registrar1.Deregister() | ||
|
||
// This should be enough time for the Eureka server response cache to update. | ||
time.Sleep(time.Second) | ||
|
||
// Build a subscriber. | ||
factory := func(instance string) (endpoint.Endpoint, io.Closer, error) { | ||
t.Logf("factory invoked for %q", instance) | ||
return endpoint.Nop, nil, nil | ||
} | ||
s := NewSubscriber( | ||
client, | ||
factory, | ||
log.With(logger, "component", "subscriber"), | ||
instanceTest1.App, | ||
) | ||
defer s.Stop() | ||
|
||
// We should have one endpoint immediately after subscriber instantiation. | ||
endpoints, err := s.Endpoints() | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
if want, have := 1, len(endpoints); want != have { | ||
t.Errorf("want %d, have %d", want, have) | ||
} | ||
|
||
// Register a second instance | ||
registrar2 := NewRegistrar(client, instanceTest2, log.With(logger, "component", "registrar2")) | ||
registrar2.Register() | ||
defer registrar2.Deregister() // In case of exceptional circumstances. | ||
|
||
// This should be enough time for a scheduled update assuming Eureka is | ||
// configured with the properties mentioned in the function comments. | ||
time.Sleep(2 * time.Second) | ||
|
||
// Now we should have two endpoints. | ||
endpoints, err = s.Endpoints() | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
if want, have := 2, len(endpoints); want != have { | ||
t.Errorf("want %d, have %d", want, have) | ||
} | ||
|
||
// Deregister the second instance. | ||
registrar2.Deregister() | ||
|
||
// Wait for another scheduled update. | ||
time.Sleep(2 * time.Second) | ||
|
||
// And then there was one. | ||
endpoints, err = s.Endpoints() | ||
if err != nil { | ||
t.Error(err) | ||
} | ||
if want, have := 1, len(endpoints); want != have { | ||
t.Errorf("want %d, have %d", want, have) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package eureka | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
fargo "github.com/hudl/fargo" | ||
|
||
"github.com/go-kit/kit/log" | ||
) | ||
|
||
// Registrar maintains service instance liveness information in Eureka. | ||
type Registrar struct { | ||
client Client | ||
instance *fargo.Instance | ||
logger log.Logger | ||
quit chan bool | ||
} | ||
|
||
// NewRegistrar returns an Eureka Registrar acting on behalf of the provided | ||
// Fargo instance. | ||
func NewRegistrar(client Client, i *fargo.Instance, logger log.Logger) *Registrar { | ||
return &Registrar{ | ||
client: client, | ||
instance: i, | ||
logger: log.With(logger, "service", i.App, "address", fmt.Sprintf("%s:%d", i.IPAddr, i.Port)), | ||
} | ||
} | ||
|
||
// Register implements sd.Registrar interface. | ||
func (r *Registrar) Register() { | ||
if err := r.client.Register(r.instance); err != nil { | ||
r.logger.Log("err", err) | ||
} else { | ||
r.logger.Log("action", "register") | ||
} | ||
|
||
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 { | ||
// User has opted for heartbeat functionality in Eureka. | ||
if r.quit == nil { | ||
r.quit = make(chan bool) | ||
go r.loop() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, no I'm afraid this is racy, now... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fear I may need to reintroduce that mutex to remove the racy-ness completely (latest commit). I'm still getting up to speed on Go's concurrency patterns and I would welcome any thoughts you had on this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we can start by gathering requirements. What behaviors are you trying to allow in this implementation of Register/Deregister? How do those behaviors compare to the other registrars? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The behaviour is to make periodic heartbeat calls to the Eureka registry after initial registration. By default Eureka will expect a heartbeat every 30s, and 3 missed heartbeats will result in the instance being expired. This is why I felt it was a must-have, and while Fargo makes available a Heartbeat RPC for the correct Eureka endpoint, it does not provide the periodic functionality. Also, these heartbeat periods and expiry thresholds are decided when setting up a Eureka server cluster and will vary in the wild. To cater for this I made the heartbeat interval configurable in the go-kit implementation using the instance's Regarding how the behaviours compare, I had a look around and found similar functionality (I think) in |
||
} | ||
} | ||
|
||
// Deregister implements sd.Registrar interface. | ||
func (r *Registrar) Deregister() { | ||
if err := r.client.Deregister(r.instance); err != nil { | ||
r.logger.Log("err", err) | ||
} else { | ||
r.logger.Log("action", "deregister") | ||
} | ||
|
||
if r.quit != nil { | ||
r.quit <- true | ||
r.quit = nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, this seems like a lot of unnecessary ceremony. Can you think of a way to get the semantics you want without a mutex? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I was able to shuffle things around and ditch the mutex entirely. Thanks. |
||
} | ||
|
||
func (r *Registrar) loop() { | ||
tick := time.NewTicker(time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second) | ||
defer tick.Stop() | ||
for { | ||
select { | ||
case <-tick.C: | ||
if err := r.client.Heartbeat(r.instance); err != nil { | ||
r.logger.Log("err", err) | ||
} | ||
case <-r.quit: | ||
return | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to alias the import with its own name :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doh, thanks. Too hasty with a
s/stdeureka/fargo/g