Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/grpcsync: support two ways to schedule a callback with the serializer #7408

Merged
merged 3 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
// it is safe to call into the balancer here.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
errCh := make(chan error)
ok := ccb.serializer.Schedule(func(ctx context.Context) {
uccs := func(ctx context.Context) {
defer close(errCh)
if ctx.Err() != nil || ccb.balancer == nil {
return
Expand All @@ -110,17 +110,23 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}
errCh <- err
})
if !ok {
return nil
}
onFailure := func() { close(errCh) }

// UpdateClientConnState can race with Close, and when the latter wins, the
// serializer is closed, and the attempt to schedule the callback will fail.
// It is acceptable to ignore this failure. But since we want to handle the
// state update in a blocking fashion (when we successfully schedule the
// callback), we have to use the ScheduleOr method and not the MaybeSchedule
// method on the serializer.
ccb.serializer.ScheduleOr(uccs, onFailure)
return <-errCh
}

// resolverError is invoked by grpc to push a resolver error to the underlying
// balancer. The call to the balancer is executed from the serializer.
func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.serializer.Schedule(func(ctx context.Context) {
ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}
Expand All @@ -136,7 +142,7 @@ func (ccb *ccBalancerWrapper) close() {
ccb.closed = true
ccb.mu.Unlock()
channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")
ccb.serializer.Schedule(func(context.Context) {
ccb.serializer.TrySchedule(func(context.Context) {
if ccb.balancer == nil {
return
}
Expand All @@ -148,7 +154,7 @@ func (ccb *ccBalancerWrapper) close() {

// exitIdle invokes the balancer's exitIdle method in the serializer.
func (ccb *ccBalancerWrapper) exitIdle() {
ccb.serializer.Schedule(func(ctx context.Context) {
ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}
Expand Down Expand Up @@ -256,7 +262,7 @@ type acBalancerWrapper struct {
// updateState is invoked by grpc to push a subConn state update to the
// underlying balancer.
func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolver.Address, err error) {
acbw.ccb.serializer.Schedule(func(ctx context.Context) {
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
Expand Down
24 changes: 18 additions & 6 deletions internal/grpcsync/callback_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,28 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
return cs
}

// Schedule adds a callback to be scheduled after existing callbacks are run.
// TrySchedule tries to schedules the provided callback function f to be
// executed in the order it was added. This is a best-effort operation. If the
// context passed to NewCallbackSerializer was canceled before this method is
// called, the callback will not be scheduled.
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (cs *CallbackSerializer) TrySchedule(f func(ctx context.Context)) {
cs.callbacks.Put(f)
}

// ScheduleOr schedules the provided callback function f to be executed in the
// order it was added. If the context passed to NewCallbackSerializer has been
// canceled before this method is called, the onFailure callback will be
// executed inline instead.
//
// Return value indicates if the callback was successfully added to the list of
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
return cs.callbacks.Put(f) == nil
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure func()) {
if cs.callbacks.Put(f) != nil {
onFailure()
}
}

func (cs *CallbackSerializer) run(ctx context.Context) {
Expand Down
26 changes: 13 additions & 13 deletions internal/grpcsync/callback_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
mu.Lock()
defer mu.Unlock()
scheduleOrderCh <- id
cs.Schedule(func(ctx context.Context) {
cs.TrySchedule(func(ctx context.Context) {
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -115,7 +115,7 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
wg.Add(numCallbacks)
for i := 0; i < numCallbacks; i++ {
go func() {
cs.Schedule(func(context.Context) {
cs.TrySchedule(func(context.Context) {
wg.Done()
})
}()
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
// Schedule a callback which blocks until the context passed to it is
// canceled. It also closes a channel to signal that it has started.
firstCallbackStartedCh := make(chan struct{})
cs.Schedule(func(ctx context.Context) {
cs.TrySchedule(func(ctx context.Context) {
close(firstCallbackStartedCh)
<-ctx.Done()
})
Expand All @@ -159,9 +159,9 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
callbackCh := make(chan int, numCallbacks)
for i := 0; i < numCallbacks; i++ {
num := i
if !cs.Schedule(func(context.Context) { callbackCh <- num }) {
t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed")
}
callback := func(context.Context) { callbackCh <- num }
onFailure := func() { t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed") }
cs.ScheduleOr(callback, onFailure)
}

// Ensure that none of the newer callbacks are executed at this point.
Expand Down Expand Up @@ -192,15 +192,15 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
}
<-cs.Done()

// Ensure that a callback cannot be scheduled after the serializer is
// closed.
done := make(chan struct{})
if cs.Schedule(func(context.Context) { close(done) }) {
t.Fatal("Scheduled a callback after closing the serializer")
}

// Ensure that the latest callback is executed at this point.
callback := func(context.Context) { t.Fatal("Scheduled a callback after closing the serializer") }
onFailure := func() { close(done) }
cs.ScheduleOr(callback, onFailure)
select {
case <-time.After(defaultTestShortTimeout):
case <-time.After(defaultTestTimeout):
t.Fatal("Successfully scheduled callback after serializer is closed")
case <-done:
t.Fatal("Newer callback executed when scheduled after closing serializer")
}
}
4 changes: 2 additions & 2 deletions internal/grpcsync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {

if ps.msg != nil {
msg := ps.msg
ps.cs.Schedule(func(context.Context) {
ps.cs.TrySchedule(func(context.Context) {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.subscribers[sub] {
Expand All @@ -103,7 +103,7 @@ func (ps *PubSub) Publish(msg any) {
ps.msg = msg
for sub := range ps.subscribers {
s := sub
ps.cs.Schedule(func(context.Context) {
ps.cs.TrySchedule(func(context.Context) {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.subscribers[s] {
Expand Down
6 changes: 3 additions & 3 deletions resolver_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
// any newly created ccResolverWrapper, except that close may be called instead.
func (ccr *ccResolverWrapper) start() error {
errCh := make(chan error)
ccr.serializer.Schedule(func(ctx context.Context) {
ccr.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil {
return
}
Expand All @@ -85,7 +85,7 @@ func (ccr *ccResolverWrapper) start() error {
}

func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
ccr.serializer.Schedule(func(ctx context.Context) {
ccr.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || ccr.resolver == nil {
return
}
Expand All @@ -102,7 +102,7 @@ func (ccr *ccResolverWrapper) close() {
ccr.closed = true
ccr.mu.Unlock()

ccr.serializer.Schedule(func(context.Context) {
ccr.serializer.TrySchedule(func(context.Context) {
if ccr.resolver == nil {
return
}
Expand Down
22 changes: 11 additions & 11 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@
b.lbCfg = lbCfg

// Handle the update in a blocking fashion.
done := make(chan struct{})
ok = b.serializer.Schedule(func(context.Context) {
errCh := make(chan error, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't make it unbuffered here because if ScheduleOr fails, it will call onFailure inline and this means that the write to the channel on line 327 errCh <- errBalancerClosed will happen before the read on line 330. Therefore it will just hang.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, that makes sense.

callback := func(context.Context) {
// A config update with a changed top-level cluster name means that none
// of our old watchers make any sense any more.
b.closeAllWatchers()
Expand All @@ -319,20 +319,20 @@
// could end up creating more watchers if turns out to be an aggregate
// cluster.
b.createAndAddWatcherForCluster(lbCfg.ClusterName)
close(done)
})
if !ok {
errCh <- nil
}
onFailure := func() {
// The call to Schedule returns false *only* if the serializer has been
// closed, which happens only when we receive an update after close.
return errBalancerClosed
errCh <- errBalancerClosed

Check warning on line 327 in xds/internal/balancer/cdsbalancer/cdsbalancer.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/balancer/cdsbalancer/cdsbalancer.go#L327

Added line #L327 was not covered by tests
}
<-done
return nil
b.serializer.ScheduleOr(callback, onFailure)
return <-errCh
}

// ResolverError handles errors reported by the xdsResolver.
func (b *cdsBalancer) ResolverError(err error) {
b.serializer.Schedule(func(context.Context) {
b.serializer.TrySchedule(func(context.Context) {
// Resource not found error is reported by the resolver when the
// top-level cluster resource is removed by the management server.
if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
Expand Down Expand Up @@ -364,7 +364,7 @@
// Close cancels the CDS watch, closes the child policy and closes the
// cdsBalancer.
func (b *cdsBalancer) Close() {
b.serializer.Schedule(func(ctx context.Context) {
b.serializer.TrySchedule(func(ctx context.Context) {
b.closeAllWatchers()

if b.childLB != nil {
Expand All @@ -384,7 +384,7 @@
}

func (b *cdsBalancer) ExitIdle() {
b.serializer.Schedule(func(context.Context) {
b.serializer.TrySchedule(func(context.Context) {
if b.childLB == nil {
b.logger.Warningf("Received ExitIdle with no child policy")
return
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/cdsbalancer/cluster_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ type clusterWatcher struct {
}

func (cw *clusterWatcher) OnUpdate(u *xdsresource.ClusterResourceData) {
cw.parent.serializer.Schedule(func(context.Context) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterUpdate(cw.name, u.Resource)
})
}

func (cw *clusterWatcher) OnError(err error) {
cw.parent.serializer.Schedule(func(context.Context) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterError(cw.name, err)
})
}

func (cw *clusterWatcher) OnResourceDoesNotExist() {
cw.parent.serializer.Schedule(func(context.Context) {
cw.parent.serializer.TrySchedule(func(context.Context) {
cw.parent.onClusterResourceNotFound(cw.name)
})
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterresolver/resource_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (rr *resourceResolver) generateLocked() {
}

func (rr *resourceResolver) onUpdate() {
rr.serializer.Schedule(func(context.Context) {
rr.serializer.TrySchedule(func(context.Context) {
rr.mu.Lock()
rr.generateLocked()
rr.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if v := atomic.AddInt32(ref, -1); v == 0 {
// This entry will be removed from activeClusters when
// producing the service config for the empty update.
cs.r.serializer.Schedule(func(context.Context) {
cs.r.serializer.TrySchedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func (cs *configSelector) stop() {
// selector; we need another update to delete clusters from the config (if
// we don't have another update pending already).
if needUpdate {
cs.r.serializer.Schedule(func(context.Context) {
cs.r.serializer.TrySchedule(func(context.Context) {
cs.r.onClusterRefDownToZero()
})
}
Expand Down
12 changes: 6 additions & 6 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@
}

func (l *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceUpdate(update.Resource)
})
}

func (l *listenerWatcher) OnError(err error) {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceError(err)
})
}

func (l *listenerWatcher) OnResourceDoesNotExist() {
l.parent.serializer.Schedule(func(context.Context) {
l.parent.serializer.TrySchedule(func(context.Context) {
l.parent.onListenerResourceNotFound()
})
}
Expand All @@ -72,19 +72,19 @@
}

func (r *routeConfigWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceUpdate(r.resourceName, update.Resource)
})
}

func (r *routeConfigWatcher) OnError(err error) {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.serializer.TrySchedule(func(context.Context) {
r.parent.onRouteConfigResourceError(r.resourceName, err)
})
}

func (r *routeConfigWatcher) OnResourceDoesNotExist() {
r.parent.serializer.Schedule(func(context.Context) {
r.parent.serializer.TrySchedule(func(context.Context) {

Check warning on line 87 in xds/internal/resolver/watch_service.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/resolver/watch_service.go#L87

Added line #L87 was not covered by tests
r.parent.onRouteConfigResourceNotFound(r.resourceName)
})
}
Expand Down
Loading