-
Notifications
You must be signed in to change notification settings - Fork 521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental Server Implementation #422
Changes from 8 commits
8e9ef1f
607d6d7
e60f37c
46df9d4
8acda3e
983070f
5154b8a
502507b
5ee43a8
3a8a63e
92cf396
55e6010
c5e6ff5
5d42783
3f75e52
29dc60f
286d696
aca74f9
fce6e70
ad8acc8
f140e2d
f8460e1
efc9bd3
50f8fc2
12d90bb
b24673b
5a95db6
bfc32da
8e386e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,17 @@ package delta | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"strconv" | ||
"sync/atomic" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" | ||
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" | ||
"github.com/envoyproxy/go-control-plane/pkg/cache/v3" | ||
"github.com/envoyproxy/go-control-plane/pkg/resource/v3" | ||
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" | ||
) | ||
|
||
|
@@ -26,6 +34,8 @@ type Callbacks interface { | |
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse) | ||
} | ||
|
||
var deltaErrorResponse = &cache.RawDeltaResponse{} | ||
|
||
type server struct { | ||
cache cache.ConfigWatcher | ||
callbacks Callbacks | ||
|
@@ -44,6 +54,299 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba | |
} | ||
} | ||
|
||
func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { | ||
streamID := atomic.AddInt64(&s.streamCount, 1) | ||
|
||
// streamNonce holds a unique nonce for req-resp pairs per xDS stream. The server | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// ignores stale nonces and nonce is only modified within send() function. | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var streamNonce int64 | ||
|
||
// a collection of stack allocated watches per request type | ||
watches := newWatches() | ||
|
||
defer func() { | ||
watches.Cancel() | ||
if s.callbacks != nil { | ||
s.callbacks.OnDeltaStreamClosed(streamID) | ||
} | ||
}() | ||
|
||
// Sends a response, returns the new stream nonce | ||
send := func(resp cache.DeltaResponse) (string, error) { | ||
if resp == nil { | ||
return "", errors.New("missing response") | ||
} | ||
|
||
out, err := resp.GetDeltaDiscoveryResponse() | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return "", err | ||
} | ||
|
||
streamNonce = streamNonce + 1 | ||
out.Nonce = strconv.FormatInt(streamNonce, 10) | ||
if s.callbacks != nil { | ||
s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), out) | ||
} | ||
|
||
return out.Nonce, str.Send(out) | ||
} | ||
|
||
// Processes a response and updates the current server state | ||
process := func(resp cache.DeltaResponse, more bool) error { | ||
if more { | ||
typ := resp.GetDeltaRequest().GetTypeUrl() | ||
|
||
if isErr := isDeltaErrorResponse(resp); isErr { | ||
return status.Errorf(codes.Unavailable, typ+" watch failed") | ||
} | ||
|
||
nonce, err := send(resp) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
watch := watches.deltaResponses[typ] | ||
watch.nonce = nonce | ||
watches.deltaResponses[typ] = watch | ||
|
||
state := watches.deltaStreamStates[typ] | ||
if state.ResourceVersions == nil { | ||
state.ResourceVersions = make(map[string]string) | ||
} | ||
|
||
state.ResourceVersions = resp.GetNextVersionMap() | ||
watches.deltaStreamStates[typ] = state | ||
} | ||
|
||
return nil | ||
} | ||
|
||
rerequest := func(typ, nonce string, req *discovery.DeltaDiscoveryRequest, state *stream.StreamState) { | ||
watch := watches.deltaResponses[typ] | ||
if watch.nonce == "" || watch.nonce == nonce { | ||
if watch.cancel != nil { | ||
if cancel := watch.cancel; cancel != nil { | ||
cancel() | ||
} | ||
} | ||
watch.responses, watch.cancel = s.cache.CreateDeltaWatch(req, state) | ||
watches.deltaResponses[typ] = watch | ||
} | ||
} | ||
|
||
if s.callbacks != nil { | ||
if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
var node = &core.Node{} | ||
isWildcard := map[string]bool{} | ||
|
||
for { | ||
select { | ||
case <-s.ctx.Done(): | ||
return nil | ||
case resp, more := <-watches.deltaResponses[resource.EndpointType].responses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case resp, more := <-watches.deltaResponses[resource.ClusterType].responses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case resp, more := <-watches.deltaResponses[resource.RouteType].responses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case resp, more := <-watches.deltaResponses[resource.ListenerType].responses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case resp, more := <-watches.deltaResponses[resource.SecretType].responses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case resp, more := <-watches.deltaResponses[resource.RuntimeType].responses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case resp, more := <-watches.deltaMuxedResponses: | ||
err := process(resp, more) | ||
if err != nil { | ||
return err | ||
} | ||
case req, more := <-reqCh: | ||
// input stream ended or errored out | ||
if !more { | ||
return nil | ||
} | ||
if req == nil { | ||
return status.Errorf(codes.Unavailable, "empty request") | ||
} | ||
|
||
if s.callbacks != nil { | ||
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// TODO: potentially do something with this error detail? | ||
if req.ErrorDetail != nil { | ||
} | ||
|
||
// we verify nonce only if nonce is not initialized | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var nonce string | ||
// the node information may only be set on the first incoming delta discovery request | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if req.Node != nil { | ||
node = req.Node | ||
nonce = req.GetResponseNonce() | ||
} else { | ||
req.Node = node | ||
|
||
// If we have no nonce, i.e. this is the first request on a delta stream, set one | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a conditional here? Seems like we always set this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that's true, we set that only while looking at the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the comment here is incorrect but the code is doing the right thing. Envoy can send node info only on the first request in the stream if we set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean if we receive node information again (if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, definitely not. Nonces are only valid in the context of the stream in which they are sent. Versions persist, e.g. in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops sorry I meant "aren't nonces not supposed to live past streams"! You're definitely right there There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see how the presence of Node affects how we handle the nonce? Nonce isn't part of the |
||
nonce = strconv.FormatInt(streamNonce, 10) | ||
} | ||
|
||
// type URL is required for ADS but is implicit for xDS | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if defaultTypeURL == resource.AnyType { | ||
if req.TypeUrl == "" { | ||
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") | ||
} | ||
} else if req.TypeUrl == "" { | ||
req.TypeUrl = defaultTypeURL | ||
} | ||
|
||
var state stream.StreamState | ||
// Initialize the state map if we haven't already. | ||
// This means that we're handling the first request for this type on the stream. | ||
if s, ok := watches.deltaStreamStates[req.GetTypeUrl()]; !ok { | ||
state.ResourceVersions = make(map[string]string) | ||
} else { | ||
state = s | ||
} | ||
|
||
// We are in the wildcard mode if the first request of a particular type has an empty subscription list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know the answer to this but what happens if we start getting resource subscriptions on a stream that was previously wild card? Does the spec say anything about this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well I'm not sure if I know the answer either. I assumed the stream will stay wildcard. The spec has extremely limited info on wildcard mode for delta. @htuch Do you have any insight here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Funny you should ask, we're having this exact discussion in envoyproxy/envoy#15857 (comment). I suggest tagging along as that is likely to settle the question on how this works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
var found bool | ||
if state.IsWildcard, found = isWildcard[req.TypeUrl]; !found { | ||
state.IsWildcard = len(req.GetResourceNamesSubscribe()) == 0 | ||
isWildcard[req.TypeUrl] = state.IsWildcard | ||
} | ||
|
||
s.subscribe(req.GetResourceNamesSubscribe(), state.ResourceVersions) | ||
// We assume this is the first request on the stream | ||
if streamNonce == 0 { | ||
for r, v := range req.InitialResourceVersions { | ||
state.ResourceVersions[r] = v | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is incorrect for ADS, we can read There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh that's a good point, and the nonce will increment since ADS will send multiple // StreamState will keep track of resource state on a stream
type StreamState struct {
// Indicates whether the original DeltaRequest was a wildcard LDS/RDS request.
IsWildcard bool
// Indicates whether this is the first request on the stream for the corresponding resource type
IsFirst map[string]bool
// ResourceVersions contains a hash of the resource as the value and the resource name as the key.
// This field stores the last state sent to the client.
ResourceVersions map[string]string
}
func NewStreamState() StreamState {
return StreamState{
IsWildcard: false,
IsFirst: make(map[string]bool),
ResourceVersions: make(map[string]string),
}
} Which we can just use as a toggle and check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking something along these lines? // Check if this is the first request on the stream for a type
if first := state.IsFirst[typeURL]; first {
for r, v := range req.InitialResourceVersions {
state.ResourceVersions[r] = v
}
// We've processed our initial resource versions (which can only happen on the first request on a stream)
// so now we mark false.
state.IsFirst[typeURL] = false
} else if !first && len(req.InitialResourceVersions) > 0 {
return status.Errorf(codes.InvalidArgument, "InitialResourceVersions can only be set on initial stream request")
} |
||
} else if streamNonce > 0 && len(req.InitialResourceVersions) > 0 { | ||
return status.Errorf(codes.InvalidArgument, "InitialResourceVersions can only be set on initial stream request") | ||
} | ||
s.unsubscribe(req.GetResourceNamesUnsubscribe(), state.ResourceVersions) | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// cancel existing watches to (re-)request a newer version | ||
for typ := range watches.deltaResponses { | ||
// If we've found our type, we go ahead and initiate the createWatch cycle | ||
if typ == req.TypeUrl { | ||
rerequest(typ, nonce, req, &state) | ||
continue | ||
} | ||
|
||
typeURL := req.TypeUrl | ||
responseNonce, seen := watches.deltaNonces[typeURL] | ||
if !seen || responseNonce == nonce { | ||
// We must signal goroutine termination to prevent a race between the cancel closing the watch | ||
// and the producer closing the watch. | ||
if terminate, exists := watches.deltaTerminations[typeURL]; exists { | ||
close(terminate) | ||
} | ||
if cancel, seen := watches.deltaCancellations[typeURL]; seen && cancel != nil { | ||
cancel() | ||
} | ||
|
||
var watch chan cache.DeltaResponse | ||
watch, watches.deltaCancellations[typeURL] = s.cache.CreateDeltaWatch(req, &state) | ||
|
||
// a go-routine. Golang does not allow selecting over a dynamic set of channels. | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
terminate := make(chan struct{}) | ||
watches.deltaTerminations[typeURL] = terminate | ||
go func() { | ||
select { | ||
case resp, more := <-watch: | ||
if more { | ||
watches.deltaMuxedResponses <- resp | ||
} else { | ||
// Check again if the watch is cancelled. | ||
select { | ||
case <-terminate: // do nothing | ||
default: | ||
// We cannot close the responses channel since it can be closed twice. | ||
// Instead we send a fake error response. | ||
watches.deltaMuxedResponses <- deltaErrorResponse | ||
} | ||
} | ||
break | ||
case <-terminate: | ||
break | ||
} | ||
}() | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) error { | ||
return nil | ||
// a channel for receiving incoming delta requests | ||
reqCh := make(chan *discovery.DeltaDiscoveryRequest) | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <-str.Context().Done(): | ||
close(reqCh) | ||
return | ||
default: | ||
req, err := str.Recv() | ||
if err != nil { | ||
close(reqCh) | ||
return | ||
} | ||
|
||
reqCh <- req | ||
} | ||
} | ||
}() | ||
alecholmez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return s.processDelta(str, reqCh, typeURL) | ||
} | ||
|
||
// When we subscribe, we just want to make the cache know we are subscribing to a resource. | ||
// Providing a name with an empty version is enough to make that happen. | ||
func (s *server) subscribe(resources []string, sv map[string]string) { | ||
for _, resource := range resources { | ||
sv[resource] = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes me think that on the cache side it does need to leave entries in the map when they are deleted in order to keep the subscription going? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's a true statement. We also encounter the issue of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in this case we are explicitly letting envoy know that the resource no longer exists. https://github.com/alecholmez/go-control-plane/blob/incremental/pkg/cache/v3/delta_common.go#L91 And if it still needs the resource it should resubscribe. If we never delete resources then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, and in the current cache implementation we still signal what needs to be removed and entries still get deleted: https://github.com/envoyproxy/go-control-plane/blob/main/pkg/cache/v3/delta.go#L81 so I think the functionality should be preserved. @snowp do you know if envoy would just resubscribe if it still needs it? I feel like that makes sense so we don't hold on to potentially stale subscriptions There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it makes sense for resource additions/removals to affect the subscription state, but I could be wrong here. I think we need a way to communicate that a resource has been subscribed to within the stream but we haven't been able to provide the resource. Similarly if this resource goes away we want the server to be able to communicate this via I think it would be good to cover this case in a test, seems like a likely edge case. |
||
} | ||
} | ||
|
||
// Unsubscriptions remove resources from the stream state to | ||
// indicate to the cache that we don't care about the resource anymore | ||
func (s *server) unsubscribe(resources []string, sv map[string]string) { | ||
for _, resource := range resources { | ||
delete(sv, resource) | ||
} | ||
} | ||
|
||
func isDeltaErrorResponse(resp cache.DeltaResponse) bool { | ||
if resp == deltaErrorResponse { | ||
return true | ||
} | ||
|
||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope not sure why that happened, my editor might've messed up there. Should I revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well, no need to pollute the git history