Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support push model for service discovery #492

Merged
merged 8 commits into from
Jun 4, 2017

Conversation

yurishkuro
Copy link
Contributor

This is a POC to solve #475, #403, and #209, and based on #463/#478.

Main changes:

  • introduce Notifier interface that allows subscribing/unsubscribing to notifications from service discovery system
  • introduce Dispatcher helper struct that implements the Notifier mechanics
  • add cache.Observer that uses cache.Cache to keep track of instances and converting them to endpoints, but acts as an observer for some Notifier
  • change Consul to only implement Notifier, not Subscriber
  • change examples/profilesvc/client/client.go to demonstrate how the new approach can be used

Some open questions:

  • agree on naming: Discoverer, Notifier
  • consider moving sd/cache/Cache to endpoint/cache and sd.Factory to endpoint.Factory
  • if cache moved, then rename sd/cache/Observer to just Cache
  • cache.Observer requires both discoverer and notifier, the former to seed the initial list of instances, and the later to receive further notifications. This creates a race condition if those two are not in sync. In the example the same object is used to satisfy both interfaces, so generally not a problem, but it smells. One possible solution is to make Notifier extend Discoverer, which will create a semantic requirement that a race condition is resolved internally in the Notifier. It might also be beneficial to do for discovery systems that do not support push model - subscribing the observer would be always no-op, and only Instances() method would make sense.

var _ sd.Subscriber = &Observer{} // API check
var _ sd.Discoverer = &Observer{} // API check

// NewObserver crates a new Observer.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crates -> creates

@peterbourgon
Copy link
Member

Thanks for this. I'll need a little while before I can review it. Please don't get discouraged :)

@yurishkuro
Copy link
Contributor Author

@peterbourgon I can wait. At the moment I have wired our internal service discovery system to essentially the same API, so I know that this approach can be made to work.

@yurishkuro
Copy link
Contributor Author

@peterbourgon ping

@peterbourgon
Copy link
Member

peterbourgon commented Apr 2, 2017

@yurishkuro Thanks for the ongoing discussion and the PR. Please accept my apologies for the delay, I was inundated with other responsibilities in the past weeks. Before I give my feedback I'd like to establish a canonical set of requirements that this effort should satisfy. Here is my first guess:

  • Allow consumption of both raw instances []string and endpoints []endpoint.Endpoint
    • In contrast, current implementation allows to consume only endpoints
  • Allow both sync and async consumption models
    • In contrast, current implementation allows only sync model

How do you feel about that? Am I missing important details?

@yurishkuro
Copy link
Contributor Author

@peterbourgon yes, I think that covers it. The only "however" I'd add is that "both" is perhaps not a requirement. The sync / async models are two independent interfaces and an implementation can support either or both. As for stings vs. Endpoint, I think none of the discovery implementations needs to support the Endpoint API, because it can always be added as a composition of string-based API and a cache + endpoint factory.

@peterbourgon
Copy link
Member

peterbourgon commented Apr 2, 2017

OK, great. So, I will attempt to describe the type catalog of the PR as it stands right now. We have

  • The existing Subscriber interface, with method Endpoints() ([]endpoint.Endpoint, error), which I will rename the Endpointer for the purposes of this discussion, to retain my sanity :)
  • A new Discoverer interface with method Instances() ([]string, error), which I will likewise rename the Instancer interface
  • A new Notifier interface with methods De/register(chan<- []string) to describe async capabilities of an Instancer, which I will likewise rename the InstanceNotifier interface

One could imagine, but we do not currently have,

  • A new EndpointNotifier interface with methods De/register(chan<- []endpoint.Endpoint)

We have also

  • A new Notifier struct defined in each SD system's package, taking the place of the concrete Subscriber struct, implementing InstanceNotifier and (notably) ceasing to implement Endpointer
  • A new Dispatcher struct defined in package sd, implementing Instancer and InstanceNotifier, designed to be embedded in per-system Notifier structs

So far this gets us instance strings, but in order to get actual endpoints we need to compose an endpoint Factory with

  • The existing Cache struct, with existing method Endpoints() []endpoint.Endpoint and new method Instances() []string, which allows it to help satisfy the Endpointer and Instancer interfaces respectively if it is embedded/included in a type

Finally we have

  • A new Observer struct defined in package sd, which wraps a Discoverer (typically a concrete Dispatcher from a specific SD system package), a Notifier (likewise a concrete Notifier from a specific SD system package), and Factory; and constructs/includes a Cache struct to help it to implement Instancer and Endpointer.

With these types, whoever will want to provide complete support for a new SD system will need to write

  • A concrete Notifier struct, holding a system-specific client, and embedding a Dispatcher, in order to implement Instancer and InstanceNotifier; updates received via the client should update the Dispatcher via Notify

Whoever will want to use a concrete SD system to generate callable Endpoints will need to construct

  • A single, system-specific client — provided by Go kit or the client library package
  • A single, system-specific Notifier — provided by Go kit
  • For each endpoint,
    • An endpoint Factory — written by the user
    • An sd/cache.Observer, wrapping the single Notifier for its Instancer and InstanceNotifier capabilities, and the Factory, to provide Endpointer capabilities — provided by Go kit
    • A load balancer, wrapping the Observer for its Endpointer capabilities — provided by Go kit
    • A retry wrapper — provided by Go kit

This is roughly the same as before. The extra value is for those who want to subscribe to the raw feed of instance strings, they may now do so by leveraging the InstanceNotifier capabilities of the concrete Notifier struct.

So, reviewing our requirements and how we meet them, we see

  • Consuming raw instances []string sync via the concrete Notifier type's Instancer capabilities
  • Consuming raw instances async via its InstanceNotifier capabilities
  • Consuming endpoints []endpoint.Endpoint via the sd/cache.Observer wrapper of the concrete Notifier type
  • No way at the moment to consume endpoints async — though I guess we can imagine a way

— – -

IMO there are too many new, intermediating types and concepts for our goals. I think there is an alternative that reduces the number of both of these things. Very roughly,

  • A concrete type (previously Subscriber, in this PR Notifier, future name TBD) per SD system package, wrapping the system client, always implementing Instancer, and optionally implementing InstanceNotifier if the system has natural pub/sub capabilities
  • A single concrete type provided in package sd, taking an Instancer and a Factory, implementing Endpointer
  • A single concrete type provided in package sd, taking an InstanceNotifier and a Factory, implementing EndpointNotifier

The rest follows, I hope, naturally. What do you think?

@yurishkuro
Copy link
Contributor Author

A few thoughts:

  • can you elaborate on "there are too many new, intermediating types and concepts for our goals"? I.e. which ones do you see as extraneous? Specifically, want to call to attention that in the profilesvc example there's only one extra line added, which is actually a good thing because it allows to reuse the single notifier for different endpoints (addressing package sd treats every endpoint independently, which is inefficient #403).
    • NB: perhaps the "too many" observation comes from poor naming, i.e. "Observer" sounds like a new concept but in fact is just a composable implementation.
  • I'd argue against EndpointNotifier because endpoints are functions whose identities cannot be compared, thus in practice the only thing one could do with them is to store the new array somewhere, which is already accomplished by the Endpointer. In other words, I'd defer implementing EndpointNotifier until a real use case comes up. In practice, though, implementing it is fairly straightforward and agnostic to the underlying SD implementation, just another composable util struct.

@peterbourgon
Copy link
Member

peterbourgon commented Apr 5, 2017

can you elaborate on "there are too many new, intermediating types and concepts for our goals"? I.e. which ones do you see as extraneous?

I think both Dispatcher and Observer are extraneous, but on reflection I think a lot of my feeling of being overwhelmed is indeed a function of the names.

I would like to see each concrete package need to provide the least possible code, to me this seems like it is a (using old terminology) Subscriber struct, connecting to the SD system and implementing (using new terminology) sd.Instancer, and optionally sd.InstanceNotifier if the system supports pub/sub semantics. This job is so simple I don't think there is any need for package SD to provide an embeddable instance Cache or subscription Dispatcher helper or anything like that, but I may be convinced otherwise.

If that's the contract for each concrete package then everything else can be provided by us in package sd, but I guess "everything else" reduces to a concrete type (name I guess TBD) that wraps an Instancer and a Factory to implement the sd.Endpointer interface. It could even do something tricky, here I'm speculating but something like

func NewEndpointer(src Instancer, f Factory, ...) Endpointer {
    if notifier, ok := src.(InstanceNotifier); ok {
        c := make(chan []string)
        notifier.Register(c)
        return streamCachingEndpointer{c, f, ...}
    }
    return passthruEndpointer{src, f, ...}
}

Does this make sense? Am I oversimplifying, or ignoring any necessary complexity?

I'd argue against EndpointNotifier because endpoints are functions whose identities cannot be compared, thus in practice the only thing one could do with them is to store the new array somewhere…

OK, I am convinced :)

@yurishkuro
Copy link
Contributor Author

(quotes rephrased)

each concrete implementation needs to provide Instancer and optionally InstanceNotifier

Agreed. That's exactly what I have in this PR. We should have a common name for such struct, the former "Subscriber" (and the new Observer) increase the mental overhead. Should the guideline be to name those NewInstancer or NewInstanceNotifier, depending on the capability of the implementation?

I think Dispatcher is a useful util to have, just like the Cache struct before, it's some 50 lines of code that every implementation would have to repeat to support channel-based observers. It could be renamed SimpleInstanceNotifier.

func NewEndpointer(src Instancer, f Factory, ...) Endpointer {

sgtm

@yurishkuro
Copy link
Contributor Author

@peterbourgon
PS: I don't have a strong opinion on the Instancer/Endpointer naming. Do you want to go with these names? I can update the PR to make it in line with the discussion.

Btw, somewhat relevant prior art: github.com/coreos/etcd/client/discover.go

// Discoverer is an interface that wraps the Discover method.
type Discoverer interface {
        // Discover looks up the etcd servers for the domain.
        Discover(domain string) ([]string, error)
}

@peterbourgon
Copy link
Member

peterbourgon commented Apr 15, 2017

OK, so iterating on all of that, how does this sound?

package sd

type Instancer interface {
    Instances() ([]string, error)
}

type InstanceNotifier interface {
    Register(chan []string)
    Deregister(chan []string)
}

type Endpointer interface {
    Endpoints() ([]endpoint.Endpoint, error)
}

// EndpointNotifier possible but elided for now.

type Factory func(string) endpoint.Endpoint

func NewEndpointer(src Instancer, f Factory) Endpointer {
    if notifier, ok := src.(InstanceNotifier); ok {
        return newStreamCachingEndpointer(notifier, f)
    }
    return newSimpleEndpointer(src, f)
}
package sd/internal/instance

type Cache struct { ... }

func (c *Cache) Update(instances []string)    { ... }
func (c *Cache) Instances() ([]string, error) { ... }

type Notifier struct { ... }

func (n *Notifier) Update(instances []string)  { ... }
func (n *Notifier) Register(c chan []string)   { ... }
func (n *Notifier) Deregister(c chan []string) { ... }
package sd/whateversystem

import (
    "github.com/go-kit/kit/sd/internal/instance"
)

type Instancer struct {
    instance.Cache    // always
    instance.Notifier // optional, if the system supports pub/sub
}

func NewInstancer(c *whatever.Client, ...) *Instancer {
    i := &Instancer{...}
    go i.loop()
    return i
}

func (i *Instancer) loop() {
    for instances := range i.client.Stream() {
        i.Cache.Update(instances)    // always
        i.Notifier.Update(instances) // optional
    }
}

Key points:

  • Current Subscriber structs are renamed Instancer
  • Helper types (to be embedded into concrete Instancers) go to package sd/internal/instance
    • instance.Cache implements Instancer
    • instance.Notifier implements InstanceNotifier
    • These don't belong in package endpoint IMO — an instance is a pure SD concept

WDYT?

@yurishkuro
Copy link
Contributor Author

Sounds good. I will update the PR to match. Will be traveling next week, so probably around the next weekend.

@yurishkuro
Copy link
Contributor Author

@peterbourgon I'm running into a couple of issues with func NewEndpointer(src Instancer, f Factory) Endpointer:

  1. for the push model, creating newStreamCachingEndpointer means it has to register a channel with InstanceNotifier and start a go-routine for reading it. But because the constructor is private, there is no way to implement a "Closer" for such endpointer to do a clean shutdown (bad for tests). We could return (Endpointer, io.Closer) from NewEndpointer, and the closer could be a no-op for pull model.
  2. for the pull model, newSimpleEndpointer is somewhat inadequate because a naive implementation would have to implement Endpoints() by calling into src.Instances() every time. Even if it uses the Cache, it still becomes a relatively expensive op. I noticed that currently sd implementations that don't support push model (e.g. dnssrv) run an internal polling. But if we want to move Endpointer away from sd implementations, it seems we need to expose the push and pull endpointer versions to end users, since the latter needs a TTL parameter for the internal poll.

Thoughts?

@peterbourgon
Copy link
Member

re: 1, yep, I understand, and your idea could work. Another way could be to add Close method to Endpointer interface, with no-op implementation for non-streaming version. My intuition prefers that one, WDYT?

re: 2, I don't think that calling Cache.Instances with every request is a high cost, it's a function call and returning a slice (i.e. reference semantics). I'm happy to be proven wrong if you want to benchmark it against an alternative...

re: exposing TTL for polling-based implementations, I would expect it be a parameter to the e.g. dnssrv.NewInstancer constructor, because that's what's responsible for fetching new instance strings. And there we have complete freedom in designing the constructor API. But perhaps I'm missing something?

@yurishkuro
Copy link
Contributor Author

let's defer the Closer discussion as it depends on point 2.

The cost is not in calling Instances(). The current implementation of Cache has all Endpoints pre-created when Update() is called, so the call to Cache.Endpoints() is an atomic load. But we have nobody to call Update(), thus the new implementation of Endpoints() essentially looks like this:

    instances _ := c.source.Instances()
    c.Update(instances)
    return c.Endpoints.Load().([]endpoint.Endpoint)

The cost is in invoking Update() on every read of Endpoints, as opposed to the previous solution where Update was only called on TTL expiring (in case of dnssrv). At minimum Update will do sorting of instances and comparing them with the current map in the cache - not terribly expensive, but just silly to do for the data that changes 2-3 orders of magnitude slower than the frequency of calls to Endpoints() (which could be fanout * ServiceQPS).

@peterbourgon
Copy link
Member

Ah, right. Hm.

@peterbourgon
Copy link
Member

OK, further iteration — and bringing us closer to the current system, a bit...

package sd

type Instancer interface {
    Instances() ([]string, error)
    Register(chan<- []string)
    Deregister(chan<- []string)
}

type Endpointer interface {
    Endpoints() ([]endpoint.Endpoint, error)
}

// EndpointNotifier possible but elided for now.

type Factory func(string) endpoint.Endpoint

func NewEndpointer(src Instancer, f Factory) *SimpleEndpointer {
    return &SimpleEndpointer{src, f}
}
package sd/internal/instance

import "sync"

type Cache struct {
	mtx       sync.RWMutex
	instances []string
	reg       registry
}

func NewCache() *Cache {
	return &Cache{
		reg: registry{},
	}
}

func (c *Cache) Update(instances []string) {
	c.mtx.Lock()
	defer c.mtx.Unlock()
	sort.Strings(instances)
	if reflect.DeepEqual(instances, c.instances) {
		return
	}
	c.instances = instances
	c.reg.broadcast(instances)
}

func (c *Cache) Instances() []string {
	c.mtx.RLock()
	defer c.mtx.RUnlock()
	return c.instances
}

func (c *Cache) Register(c chan<- []string) {
	c.mtx.Lock()
	defer c.mtx.Unlock()
	c.reg.register(c)
}

func (c *Cache) Deregister(c chan<- []string) {
	c.mtx.Lock()
	defer c.mtx.Unlock()
	c.reg.deregister(c)
}

// registry is not goroutine-safe.
type registry map[chan<- []string]struct{}

func (r registry) broadcast(instances []string) {
	for c := range r {
		c <- instances
	}
}

func (r registry) register(c chan<- []string) {
	r[c] = struct{}{}
}

func (r registry) deregister(c chan<- []string) {
	delete(r, c)
}
package sd/whateversystem

import (
    "github.com/go-kit/kit/sd/internal/instance"
)

type Instancer struct {
    instance.Cache
}

func NewInstancer(c *whatever.Client, ...) *Instancer {
    i := &Instancer{...}
    go i.loop()
    return i
}

func (i *Instancer) loop() {
    for instances := range i.client.Stream() {
        i.Cache.Update(instances)
    }
}

In summary,

  • sd.Instancer interface combines Instancer and InstanceNotifier
  • internal/instance.Cache type implements new Instancer interface
  • Concrete implementations embed the instance.Cache and Update via polling or streaming
  • SimpleEndpointer can leverage Register/Deregister to get efficient updates

Does this improve things?

@yurishkuro
Copy link
Contributor Author

Yes, basically if we make Register() a requirement for Instancer, then it solves the problem I had. I am not sure if we should even have Instances() method in the Instancer in this case, because mixing push and pull models creates unnecessary concurrency challenges during initialization. I'll try to play with it, but I'd prefer to just say that any time a new channel is Register()-ed, it's immediately sent the current state of the world known to the Instancer, and that's the only way to get the initial state to the subscriber.

@peterbourgon
Copy link
Member

Sounds reasonable to me.

@@ -82,6 +82,11 @@ func (p *Subscriber) loop(t *time.Ticker, lookup Lookup) {
}
}

// Instances implements the Discoverer interface.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this, I have not updated dnssrv yet

sd/endpointer.go Outdated

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
iEndpoint "github.com/go-kit/kit/sd/internal/endpoint"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unhappy with the name clash here. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, naming another package endpoint is a non-starter.

// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
Endpoints() ([]endpoint.Endpoint, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: I think it makes sense to remove the error return value. In practice the real Endpointer gets instances via channel, so it never gets errors from sd implementation, and it never has any error to return here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection between a service and the SD system is broken, Endpoints should return an error. If that's not currently true, we should make it true. It's important that client be able to distinguish "my connection to the rest of the world is broken" from "my dependency simply has no instances at the moment".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's important in theory, but this is not how the current implementations behave, e.g. consul and dnssrv, as examples of internal push and pull models, both return nil for the error from their current Endpoints() method. If an implementation uses push model both internally and in the Instancer (via internal/instance.Cache), how do you see it returning an error from the pull model's Endpoints() method? I.e. which error would it be, the last one received via push (such as lost connection to sd backend)?

Note that I had a TBD in the Instancer interface about using richer types in the channels than plain []string. Doing it that way would at least allow the internal sd errors to bubble up to the Instancer subscribers. But it still runs into the same issue as above when push model of the Instancer flips to a pull model of the Endpointer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's important in theory, but this is not how the current implementations behave

Yeah, that's a problem, and my bad :(

If an implementation uses push model both internally and in the Instancer (via internal/instance.Cache), how do you see it returning an error from the pull model's Endpoints() method? I.e. which error would it be, the last one received via push (such as lost connection to sd backend)?

If the subscription to the Consul server is broken, then for the duration of the break, I would expect Instancer to return some sd.UnavailableError, perhaps with a Cause or Reason error field with the specific error returned by the consul client library. Likewise if (say) 3 DNS SRV polls in a row fail, I would expect to get exactly the same sd.UnavailableError.

Is this enough to go on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterbourgon Take a look at the latest commit. I changed Instancer from pushing []string to pushing Event struct that contains either instances or an error. consul implementation updated accordingly.

I have not introduced any special error, it is simply whichever error the underlying sd system returns, because it may be difficult to find common error patterns across all sd implementations. The semantics are simple - if a notification contains an error, previously pushed instances are unreliable, stale, "use at your own risk".

This leaves one open question. Typically discovery middleware should avoid breaking the application when temporarily loosing connection to discovery backend. However, in the current form loosing a connection will trigger a push of an Event with the error, and the endpointCache will close all Endpoints, even though the actual service instances are probably just fine at their previously known locations. Previous pull model did not have that behavior because all errors from SD backend were simply logged, without erasing the Endpoints. We can restore that (desired) behavior by not closing Endpoints in the endpointCache when an error is received, however that effectively puts us back to the state the code was in yesterday, i.e. the Endpointer always returns Endpoints and never returns the error. If we change the API contract to say that Endpointer may return both Endpoints and error, then it's a breaking change, and in particular it breaks the lb implementations since they bail on error right away.

Thus, while I like the latest commit, I am back to my suggestion of removing the error from the Endpointer signature - it doesn't do any good. If the application needs to be notified of an issue with discovery system and take some measures, then it can easily register another channel on the Instancer. Maybe I am missing some use cases (like a decorator Endpointer that does health checks - but that seems like it should sit between Instancer and endpointCache).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had typed a more thorough reply but in the end it boils down to: LGTM. So,

Event struct . . .

Yep, LGTM.

I am back to my suggestion of removing the error from the Endpointer signature

Also SGTM to remove the error from the Endpointer interface. It means you must choose what to do when the endpointCache receives a non-nil error Event from the Instancer. The most obvious choice is to leave the current behavior and immediately invalidate all of the Endpoints. If you want to do something smarter, like only invalidate them after a timeout, you'll need to parameterize that behavior in the endpointCache constructor. Without having made prototype implementations to see how each option feels, I don't have strong opinions which choice is better.

Copy link
Member

@peterbourgon peterbourgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the internal/endpoint package is problematic...

sd/endpointer.go Outdated

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
iEndpoint "github.com/go-kit/kit/sd/internal/endpoint"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, naming another package endpoint is a non-starter.

// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
Endpoints() ([]endpoint.Endpoint, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the connection between a service and the SD system is broken, Endpoints should return an error. If that's not currently true, we should make it true. It's important that client be able to distinguish "my connection to the rest of the world is broken" from "my dependency simply has no instances at the moment".

sd/endpointer.go Outdated
//
// Users are expected to provide their own factory functions that assume
// specific transports, or can deduce transports by parsing the instance string.
type Factory func(instance string) (endpoint.Endpoint, io.Closer, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can't have package sd/internal/endpoint.

sd/endpointer.go Outdated
return f(instance)
}
se := &simpleEndpointer{
Cache: *iEndpoint.NewCache(factory, logger),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What prevents e.g. endpointCache from being defined in this package as a non-exported type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I can do that.

@yurishkuro yurishkuro changed the title POC: service discovery via push model WIP: Enable push model for service discovery Apr 28, 2017
@yurishkuro yurishkuro changed the title WIP: Enable push model for service discovery WIP: Support push model for service discovery Apr 28, 2017
@yurishkuro
Copy link
Contributor Author

@peterbourgon take a look at 54081bf

your comment about timeout made me realize that I can keep the error in the Endpointer signature and still have "fail open" behavior by default.

I think this was the last open question. If we agree on the current approach I can go & fix the rest of the implementations and write tests

@yurishkuro
Copy link
Contributor Author

@peterbourgon are we good with the current approach?

@peterbourgon
Copy link
Member

In general yes. I will probably have some code-organization-style feedback when it's complete but nothing structural.

@yurishkuro yurishkuro force-pushed the ys-discovery-via-push branch 2 times, most recently from 1ff1ee7 to 449bebb Compare May 17, 2017 03:21
@yurishkuro
Copy link
Contributor Author

I will probably have some code-organization-style feedback when it's complete

@peterbourgon bring it on :-) I fixed all other sd implementations and all examples, it's ready for review & clean-up (& possibly more tests). Note a couple TODOs, including one "thought experiment" in the examples which makes the initialization code a lot more compact.

Also, the prior endpoints-based tests were flaky due to Endpointer now being async. I've converted some of the tests (consul / eureka) to test just the Instancer behavior by calling .State(), which is mostly equivalent to former .Endpoints() due to built-in cache. We may want to update the other sd impl tests to use just the instances as well.

Copy link
Member

@peterbourgon peterbourgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this actually looks pretty good so far. I was afraid the new abstractions wouuld be awkward to use in practice, but they actually don't change very much. A few things especially re: embedding, otherwise mostly LGTM. Ping me again when you consider it done?

sd/endpointer.go Outdated
// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled with ResetOnError option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InvalidateOnError

sd/endpointer.go Outdated
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled with ResetOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) Endpointer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer it if SimpleEndpointer were exported, and this returned *SimpleEndpointer. At the moment, users have no way to invoke Close...

sd/cache.go Outdated
}

c.mtx.RUnlock()
c.mtx.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is racy, and you better take the write Lock for the whole method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather check again after W-lock. I think it's good to keep the top under R-lock only, since it's on the hot path and very likely to be used concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the current version is less contentious, but (unless I'm missing something) it's incorrect. Between line 129 and 130, another goroutine can e.g. invoke Update and change the value of c.err.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can, but the behavior will be the same as if we had just one write-lock and that other update happened before we took the lock. It's a standard double-checked locking pattern. The only time it will be a problem is if in the future someone makes a code change so that L123 and L133 use different conditions.

Copy link
Member

@peterbourgon peterbourgon May 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm dense, but I don't believe you.

  • Goroutine 1: enters Endpoints, takes RLock
  • Goroutine 1: observes c.err != nil, skips early return
  • Goroutine 1: releases RLock
  • Goroutine 2: enters Update, takes Lock
  • Goroutine 2: observes event.Err == nil, sets c.err = nil
  • Goroutine 2: leaves Update, releases Lock
  • Goroutine 1: takes Lock
  • Goroutine 1: calls c.updateCache(nil) under the prior assumption that c.err != nil — but it isn't anymore

AFAIK, there's no generally safe way to "upgrade" read locks to write locks in Go.

func (t *thing) foo() {
    t.mtx.RLock()
    // ...
    t.mtx.RUnlock() // this is
    t.mtx.Lock()    // not safe
    // unless this code makes no assumptions from the previous block
    t.mtx.Unlock()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you looking at the latest code? When I navigate through the comments link Github shows a stale commit. In the latest version the last step in your sequence won't happen because the code again checks for the same err == nil condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was looking at an old version. My mistake. I still don't believe the performance penalty of the Lock is worth this dance, but I defer to you :)

sd/cache.go Outdated
c.err = nil
}

c.logger.Log("err", event.Err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in an else block or something? I guess we don't want to see err=<nil> logged with every update...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was missing a return above

sd/endpointer.go Outdated
}

type endpointerOptions struct {
invalidateOnErrorTimeout *time.Duration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointers as ersatz Maybe types make me :( Can we have this be a value, and treat 0 as not set, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterbourgon zero value for this field is actually meaningful, i.e. it can be used to make the cache invalidate all endpoints as soon as an error event is received from sd. Whereas having a nil timeout means the endpoints will remain in use indefinitely, until updated by a real event from sd. Given that the pointer is an implementation detail of a private struct, how strongly do you feel about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I would prefer to see this expressed as

invalidateOnError bool
invalidateTimeout time.Duration

sd/cache.go Outdated
// set new deadline to invalidate Endpoints unless non-error Event is received
c.invalidateDeadline = time.Now().Add(*c.options.invalidateOnErrorTimeout)
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Happy path.
if event.Err == nil {
    c.udpateCache(event.Instances)
    c.invalidateDeadline = time.Time{}
    c.err = nil
    return
}

// Sad path. Something's gone wrong.
c.logger.Log("err", event.Err)
if c.options.invalidateOnErrorTimeout.IsZero() { // assuming you make the change suggested below 
    return // keep the old instances
}
if c.err != nil { // I think this is a better sigil for the error state...
    return // already in the error state, do nothing & keep original error
}
c.err = event.Err
c.invalidateDeadline = time.Now().Add(c.options.invalidateOnErrorTimeout)

?

sd/endpointer.go Outdated
}

type simpleEndpointer struct {
endpointCache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm irrationally afraid of struct embedding, especially in cases like this when it's not strictly true that simpleEndpointer is-an endpointCache. Specifically I'm afraid of endpointCache methods leaking through to the public type definition. Would you mind making it a named member and deliberately plumbing through the methods you intend to support—even if, at the moment, that's most or all of them?

// Instancer yields instances stored in a certain etcd keyspace. Any kind of
// change in that keyspace is watched and will update the Instancer's Instancers.
type Instancer struct {
instance.Cache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise to the other comment, I'd feel safer if these weren't embedded but were regular named fields, with necessary methods plumbed through.

@@ -40,17 +37,18 @@ func NewSubscriber(c Client, path string, factory sd.Factory, logger log.Logger)
instances, eventc, err := s.client.GetEntries(s.path)
if err != nil {
logger.Log("path", s.path, "msg", "failed to retrieve entries", "err", err)
// TODO why zk constructor exits when other implementations continue?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, looks like inconsistency, please feel free to update.

@yurishkuro yurishkuro changed the title WIP: Support push model for service discovery Support push model for service discovery May 27, 2017
@yurishkuro
Copy link
Contributor Author

@peterbourgon any other comments?

.gitignore Outdated
glide.lock
glide.yaml
vendor/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these, if you don't mind.

coverage.bash Outdated
@@ -7,7 +7,7 @@
set -e

function go_files { find . -name '*_test.go' ; }
function filter { grep -v '/_' ; }
function filter { grep -v -e '/_' -e vendor ; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here.

Copy link
Member

@peterbourgon peterbourgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just those two minor things. Are you happy with the current state? — Thanks for bearing with me on this incredible journey 😲

@yurishkuro
Copy link
Contributor Author

@peterbourgon yeah, I think it's good to go. The main Instancer interface is very similar to what we have in Jaeger internally already, so will be easy to adapt.

I added a bit more tests to have coverage in core functions close to 100%.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants