-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Service Discovery support for etcd v3 #663
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
package etcdv3 | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"errors" | ||
"time" | ||
|
||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/pkg/transport" | ||
) | ||
|
||
var ( | ||
// ErrNoKey indicates a client method needs a key but receives none. | ||
ErrNoKey = errors.New("no key provided") | ||
|
||
// ErrNoValue indicates a client method needs a value but receives none. | ||
ErrNoValue = errors.New("no value provided") | ||
) | ||
|
||
// Client is a wrapper around the etcd client. | ||
type Client interface { | ||
// GetEntries queries the given prefix in etcd and returns a slice | ||
// containing the values of all keys found, recursively, underneath that | ||
// prefix. | ||
GetEntries(prefix string) ([]string, error) | ||
|
||
// WatchPrefix watches the given prefix in etcd for changes. When a change | ||
// is detected, it will signal on the passed channel. Clients are expected | ||
// to call GetEntries to update themselves with the latest set of complete | ||
// values. WatchPrefix will always send an initial sentinel value on the | ||
// channel after establishing the watch, to ensure that clients always | ||
// receive the latest set of values. WatchPrefix will block until the | ||
// context passed to the NewClient constructor is terminated. | ||
WatchPrefix(prefix string, ch chan struct{}) | ||
|
||
// Register a service with etcd. | ||
Register(s Service) error | ||
|
||
// Deregister a service with etcd. | ||
Deregister(s Service) error | ||
|
||
// LeaseID returns the lease id created for this service instance | ||
LeaseID() int64 | ||
} | ||
|
||
type client struct { | ||
cli *clientv3.Client | ||
ctx context.Context | ||
|
||
kv clientv3.KV | ||
|
||
// Watcher interface instance, used to leverage Watcher.Close() | ||
watcher clientv3.Watcher | ||
// watcher context | ||
wctx context.Context | ||
// watcher cancel func | ||
wcf context.CancelFunc | ||
|
||
// leaseID will be 0 (clientv3.NoLease) if a lease was not created | ||
leaseID clientv3.LeaseID | ||
|
||
hbch <-chan *clientv3.LeaseKeepAliveResponse | ||
// Lease interface instance, used to leverage Lease.Close() | ||
leaser clientv3.Lease | ||
} | ||
|
||
// ClientOptions defines options for the etcd client. All values are optional. | ||
// If any duration is not specified, a default of 3 seconds will be used. | ||
type ClientOptions struct { | ||
Cert string | ||
Key string | ||
CACert string | ||
DialTimeout time.Duration | ||
DialKeepAlive time.Duration | ||
Username string | ||
Password string | ||
} | ||
|
||
// NewClient returns Client with a connection to the named machines. It will | ||
// return an error if a connection to the cluster cannot be made. | ||
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) { | ||
if options.DialTimeout == 0 { | ||
options.DialTimeout = 3 * time.Second | ||
} | ||
if options.DialKeepAlive == 0 { | ||
options.DialKeepAlive = 3 * time.Second | ||
} | ||
|
||
var err error | ||
var tlscfg *tls.Config | ||
|
||
if options.Cert != "" && options.Key != "" { | ||
tlsInfo := transport.TLSInfo{ | ||
CertFile: options.Cert, | ||
KeyFile: options.Key, | ||
TrustedCAFile: options.CACert, | ||
} | ||
tlscfg, err = tlsInfo.ClientConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
cli, err := clientv3.New(clientv3.Config{ | ||
Context: ctx, | ||
Endpoints: machines, | ||
DialTimeout: options.DialTimeout, | ||
DialKeepAliveTime: options.DialKeepAlive, | ||
TLS: tlscfg, | ||
Username: options.Username, | ||
Password: options.Password, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &client{ | ||
cli: cli, | ||
ctx: ctx, | ||
kv: clientv3.NewKV(cli), | ||
}, nil | ||
} | ||
|
||
func (c *client) LeaseID() int64 { return int64(c.leaseID) } | ||
|
||
// GetEntries implements the etcd Client interface. | ||
func (c *client) GetEntries(key string) ([]string, error) { | ||
resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
entries := make([]string, len(resp.Kvs)) | ||
for i, kv := range resp.Kvs { | ||
entries[i] = string(kv.Value) | ||
} | ||
|
||
return entries, nil | ||
} | ||
|
||
// WatchPrefix implements the etcd Client interface. | ||
func (c *client) WatchPrefix(prefix string, ch chan struct{}) { | ||
c.wctx, c.wcf = context.WithCancel(c.ctx) | ||
c.watcher = clientv3.NewWatcher(c.cli) | ||
|
||
wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0)) | ||
ch <- struct{}{} | ||
for wr := range wch { | ||
if wr.Canceled { | ||
return | ||
} | ||
ch <- struct{}{} | ||
} | ||
} | ||
|
||
func (c *client) Register(s Service) error { | ||
var err error | ||
|
||
if s.Key == "" { | ||
return ErrNoKey | ||
} | ||
if s.Value == "" { | ||
return ErrNoValue | ||
} | ||
|
||
if c.leaser != nil { | ||
c.leaser.Close() | ||
} | ||
c.leaser = clientv3.NewLease(c.cli) | ||
|
||
if c.watcher != nil { | ||
c.watcher.Close() | ||
} | ||
c.watcher = clientv3.NewWatcher(c.cli) | ||
if c.kv == nil { | ||
c.kv = clientv3.NewKV(c.cli) | ||
} | ||
|
||
if s.TTL == nil { | ||
s.TTL = NewTTLOption(time.Second*3, time.Second*10) | ||
} | ||
|
||
grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds())) | ||
if err != nil { | ||
return err | ||
} | ||
c.leaseID = grantResp.ID | ||
|
||
_, err = c.kv.Put( | ||
c.ctx, | ||
s.Key, | ||
s.Value, | ||
clientv3.WithLease(c.leaseID), | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// this will keep the key alive 'forever' or until we revoke it or | ||
// the context is canceled | ||
c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *client) Deregister(s Service) error { | ||
defer c.close() | ||
|
||
if s.Key == "" { | ||
return ErrNoKey | ||
} | ||
if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// close will close any open clients and call | ||
// the watcher cancel func | ||
func (c *client) close() { | ||
if c.leaser != nil { | ||
c.leaser.Close() | ||
} | ||
if c.watcher != nil { | ||
c.watcher.Close() | ||
c.wcf() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
// Package etcdv3 provides an Instancer and Registrar implementation for etcd v3. If | ||
// you use etcd v3 as your service discovery system, this package will help you | ||
// implement the registration and client-side load balancing patterns. | ||
package etcdv3 | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package etcdv3 | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"time" | ||
|
||
"github.com/go-kit/kit/endpoint" | ||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/sd" | ||
"github.com/go-kit/kit/sd/lb" | ||
) | ||
|
||
func Example() { | ||
// Let's say this is a service that means to register itself. | ||
// First, we will set up some context. | ||
var ( | ||
etcdServer = "10.0.0.1:2379" // in the change from v2 to v3, the schema is no longer necessary if connecting directly to an etcd v3 instance | ||
prefix = "/services/foosvc/" // known at compile time | ||
instance = "1.2.3.4:8080" // taken from runtime or platform, somehow | ||
key = prefix + instance // should be globally unique | ||
value = "http://" + instance // based on our transport | ||
ctx = context.Background() | ||
) | ||
|
||
options := ClientOptions{ | ||
// Path to trusted ca file | ||
CACert: "", | ||
|
||
// Path to certificate | ||
Cert: "", | ||
|
||
// Path to private key | ||
Key: "", | ||
|
||
// Username if required | ||
Username: "", | ||
|
||
// Password if required | ||
Password: "", | ||
|
||
// If DialTimeout is 0, it defaults to 3s | ||
DialTimeout: time.Second * 3, | ||
|
||
// If DialKeepAlive is 0, it defaults to 3s | ||
DialKeepAlive: time.Second * 3, | ||
} | ||
|
||
// Build the client. | ||
client, err := NewClient(ctx, []string{etcdServer}, options) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
// Build the registrar. | ||
registrar := NewRegistrar(client, Service{ | ||
Key: key, | ||
Value: value, | ||
}, log.NewNopLogger()) | ||
|
||
// Register our instance. | ||
registrar.Register() | ||
|
||
// At the end of our service lifecycle, for example at the end of func main, | ||
// we should make sure to deregister ourselves. This is important! Don't | ||
// accidentally skip this step by invoking a log.Fatal or os.Exit in the | ||
// interim, which bypasses the defer stack. | ||
defer registrar.Deregister() | ||
|
||
// It's likely that we'll also want to connect to other services and call | ||
// their methods. We can build an Instancer to listen for changes from etcd, | ||
// create Endpointer, wrap it with a load-balancer to pick a single | ||
// endpoint, and finally wrap it with a retry strategy to get something that | ||
// can be used as an endpoint directly. | ||
barPrefix := "/services/barsvc" | ||
logger := log.NewNopLogger() | ||
instancer, err := NewInstancer(client, barPrefix, logger) | ||
if err != nil { | ||
panic(err) | ||
} | ||
endpointer := sd.NewEndpointer(instancer, barFactory, logger) | ||
balancer := lb.NewRoundRobin(endpointer) | ||
retry := lb.Retry(3, 3*time.Second, balancer) | ||
|
||
// And now retry can be used like any other endpoint. | ||
req := struct{}{} | ||
if _, err = retry(ctx, req); err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
func barFactory(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One or two sentences of doc comment would be appreciated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied and updated the doc comment from sd/etcd to sd/etcdv3 for consistency sake.