Skip to content

Commit

Permalink
add DeliverLatest as common function for use by Manager and ProxyTrac…
Browse files Browse the repository at this point in the history
…ker Open (#19564)

Open
add DeliverLatest as common function for use by Manager and ProxyTracker
  • Loading branch information
jmurret authored Nov 7, 2023
1 parent 8d6545e commit caaff73
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 55 deletions.
37 changes: 8 additions & 29 deletions agent/proxycfg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package proxycfg

import (
"errors"
"github.com/hashicorp/consul/lib/channels"
"runtime/debug"
"sync"

Expand Down Expand Up @@ -259,37 +260,15 @@ func (m *Manager) notify(snap *ConfigSnapshot) {
// it will drain the chan and then re-attempt delivery so that a slow consumer
// gets the latest config earlier. This MUST be called from a method where m.mu
// is held to be safe since it assumes we are the only goroutine sending on ch.
func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan proxysnapshot.ProxySnapshot) {
// Send if chan is empty
select {
case ch <- snap:
return
default:
}

// Not empty, drain the chan of older snapshots and redeliver. For now we only
// use 1-buffered chans but this will still work if we change that later.
OUTER:
for {
select {
case <-ch:
continue
default:
break OUTER
}
}

// Now send again
select {
case ch <- snap:
return
default:
// This should not be possible since we should be the only sender, enforced
// by m.mu but error and drop the update rather than panic.
m.Logger.Error("failed to deliver ConfigSnapshot to proxy",
"proxy", snap.ProxyID.String(),
func (m *Manager) deliverLatest(snap proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) {
m.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", snap.(*ConfigSnapshot).ProxyID)
err := channels.DeliverLatest(snap, ch)
if err != nil {
m.Logger.Error("failed to deliver proxyState to proxy",
"proxy", snap.(*ConfigSnapshot).ProxyID,
)
}

}

// Watch registers a watch on a proxy. It might not exist yet in which case this
Expand Down
29 changes: 3 additions & 26 deletions internal/mesh/proxy-tracker/proxy_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package proxytracker
import (
"errors"
"fmt"
"github.com/hashicorp/consul/lib/channels"
"sync"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -207,32 +208,8 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState proxysnaps

func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) {
pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID)
// Send if chan is empty
select {
case ch <- proxyState:
return
default:
}

// Not empty, drain the chan of older snapshots and redeliver. For now we only
// use 1-buffered chans but this will still work if we change that later.
OUTER:
for {
select {
case <-ch:
continue
default:
break OUTER
}
}

// Now send again
select {
case ch <- proxyState:
return
default:
// This should not be possible since we should be the only sender, enforced
// by m.mu but error and drop the update rather than panic.
err := channels.DeliverLatest(proxyState, ch)
if err != nil {
pt.config.Logger.Error("failed to deliver proxyState to proxy",
"proxy", proxyID.String(),
)
Expand Down
35 changes: 35 additions & 0 deletions lib/channels/deliver_latest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package channels

import "fmt"

// DeliverLatest will drain the channel discarding any messages if there are any and sends the current message.
func DeliverLatest[T any](val T, ch chan T) error {
// Send if chan is empty
select {
case ch <- val:
return nil
default:
}

// If it falls through to here, the channel is not empty.
// Drain the channel.
done := false
for !done {
select {
case <-ch:
continue
default:
done = true
}
}

// Attempt to send again. If it is not empty, throw an error
select {
case ch <- val:
return nil
default:
return fmt.Errorf("failed to deliver latest event: chan full again after draining")
}
}

0 comments on commit caaff73

Please sign in to comment.