Skip to content

Commit

Permalink
server: add Node to stream closed callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
tony612 committed Jul 8, 2022
1 parent e4b1168 commit 3a65643
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
9 changes: 5 additions & 4 deletions docs/server.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"log"
"sync"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)

Expand All @@ -46,9 +47,9 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
}
return nil
}
func (cb *Callbacks) OnStreamClosed(id int64) {
func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("stream %d closed\n", id)
log.Printf("stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
Expand All @@ -57,9 +58,9 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
}
return nil
}
func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("delta stream %d closed\n", id)
log.Printf("delta stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Callbacks interface {
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnDeltaStreamOpen(context.Context, int64, string) error
// OnDeltaStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnDeltaStreamClosed(int64)
OnDeltaStreamClosed(int64, *core.Node)
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
Expand Down Expand Up @@ -63,10 +63,12 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// a collection of stack allocated watches per request type
watches := newWatches()

var node = &core.Node{}

defer func() {
watches.Cancel()
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID)
s.callbacks.OnDeltaStreamClosed(streamID, node)
}
}()

Expand Down Expand Up @@ -96,7 +98,6 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}

var node = &core.Node{}
for {
select {
case <-s.ctx.Done():
Expand Down
10 changes: 5 additions & 5 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Callbacks interface {
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamOpen(context.Context, int64, string) error
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnStreamClosed(int64)
OnStreamClosed(int64, *core.Node)
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamRequest(int64, *discovery.DiscoveryRequest) error
Expand Down Expand Up @@ -87,10 +87,13 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
// a collection of stack allocated watches per request type
watches := newWatches()

// node may only be set on the first discovery request
var node = &core.Node{}

defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
s.callbacks.OnStreamClosed(streamID, node)
}
}()

Expand Down Expand Up @@ -130,9 +133,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
}
}

// node may only be set on the first discovery request
var node = &core.Node{}

// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)

Expand Down
13 changes: 7 additions & 6 deletions pkg/server/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
Expand Down Expand Up @@ -68,9 +69,9 @@ type Callbacks interface {
// CallbackFuncs is a convenience type for implementing the Callbacks interface.
type CallbackFuncs struct {
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64)
StreamClosedFunc func(int64, *core.Node)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64)
DeltaStreamClosedFunc func(int64, *core.Node)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
Expand All @@ -91,9 +92,9 @@ func (c CallbackFuncs) OnStreamOpen(ctx context.Context, streamID int64, typeURL
}

// OnStreamClosed invokes StreamClosedFunc.
func (c CallbackFuncs) OnStreamClosed(streamID int64) {
func (c CallbackFuncs) OnStreamClosed(streamID int64, node *core.Node) {
if c.StreamClosedFunc != nil {
c.StreamClosedFunc(streamID)
c.StreamClosedFunc(streamID, node)
}
}

Expand All @@ -107,9 +108,9 @@ func (c CallbackFuncs) OnDeltaStreamOpen(ctx context.Context, streamID int64, ty
}

// OnDeltaStreamClosed invokes DeltaStreamClosedFunc.
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64) {
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64, node *core.Node) {
if c.DeltaStreamClosedFunc != nil {
c.DeltaStreamClosedFunc(streamID)
c.DeltaStreamClosedFunc(streamID, node)
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/test/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"log"
"sync"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

type Callbacks struct {
Expand All @@ -18,6 +20,8 @@ type Callbacks struct {
mu sync.Mutex
}

var _ server.Callbacks = &Callbacks{}

func (cb *Callbacks) Report() {
cb.mu.Lock()
defer cb.mu.Unlock()
Expand All @@ -29,9 +33,9 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
}
return nil
}
func (cb *Callbacks) OnStreamClosed(id int64) {
func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("stream %d closed\n", id)
log.Printf("stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
Expand All @@ -40,9 +44,9 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
}
return nil
}
func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) {
if cb.Debug {
log.Printf("delta stream %d closed\n", id)
log.Printf("delta stream %d of node %s closed\n", id, node.Id)
}
}
func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error {
Expand Down

0 comments on commit 3a65643

Please sign in to comment.