Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: de-duplicate alloc updates and gate during restore #17074

Merged
merged 1 commit into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17074.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: de-duplicate allocation client status updates and prevent allocation client status updates from being sent until clients have first synchronized with the server
```
57 changes: 55 additions & 2 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"golang.org/x/exp/maps"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand Down Expand Up @@ -123,6 +124,10 @@ type allocRunner struct {
state *state.State
stateLock sync.RWMutex

// lastAcknowledgedState is the alloc runner state that was last
// acknowledged by the server (may lag behind ar.state)
lastAcknowledgedState *state.State

stateDB cstate.StateDB

// allocDir is used to build the allocations directory structure.
Expand Down Expand Up @@ -738,8 +743,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
return states
}

// clientAlloc takes in the task states and returns an Allocation populated
// with Client specific fields
// clientAlloc takes in the task states and returns an Allocation populated with
// Client specific fields. Note: this mutates the allocRunner's state to store
// the taskStates!
func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
Expand Down Expand Up @@ -1394,3 +1400,50 @@ func (ar *allocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capa

return tr.DriverCapabilities()
}

// AcknowledgeState is called by the client's alloc sync when a given client
// state has been acknowledged by the server
func (ar *allocRunner) AcknowledgeState(a *state.State) {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
ar.lastAcknowledgedState = a
ar.persistLastAcknowledgedState(a)
}

// persistLastAcknowledgedState stores the last client state acknowledged by the server
func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) {
if err := ar.stateDB.PutAcknowledgedState(ar.id, a); err != nil {
// While any persistence errors are very bad, the worst case scenario
// for failing to persist last acknowledged state is that if the agent
// is restarted it will send the update again.
ar.logger.Error("error storing acknowledged allocation status", "error", err)
}
}

// LastAcknowledgedStateIsCurrent returns true if the current state matches the
// state that was last acknowledged from a server update. This is called from
// the client in the same goroutine that called AcknowledgeState so that we
// can't get a TOCTOU error.
func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()

last := ar.lastAcknowledgedState
if last == nil {
return false
}

switch {
case last.ClientStatus != a.ClientStatus:
return false
case last.ClientDescription != a.ClientDescription:
return false
case !last.DeploymentStatus.Equal(a.DeploymentStatus):
return false
case !last.NetworkStatus.Equal(a.NetworkStatus):
return false
}
return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {
return st.Equal(o)
})
}
64 changes: 61 additions & 3 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (

"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allochealth"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/tasklifecycle"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
Expand All @@ -26,9 +31,6 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
)

// destroy does a blocking destroy on an alloc runner
Expand Down Expand Up @@ -2443,3 +2445,59 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) {
wait.Gap(500*time.Millisecond),
))
}

func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
ci.Parallel(t)

alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{"run_for": "2ms"}
alloc.DesiredStatus = "stop"

conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
t.Cleanup(cleanup)

ar, err := NewAllocRunner(conf)
must.NoError(t, err)

ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})

calloc := ar.clientAlloc(map[string]*structs.TaskState{})
ar.AcknowledgeState(&arstate.State{
ClientStatus: calloc.ClientStatus,
ClientDescription: calloc.ClientDescription,
DeploymentStatus: calloc.DeploymentStatus,
TaskStates: calloc.TaskStates,
NetworkStatus: calloc.NetworkStatus,
})

must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))

// clientAlloc mutates the state, so verify this doesn't break the check
// without state having been updated
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))

// make a no-op state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))

// make a state update that should be detected as a change
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.2.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc))
}
66 changes: 58 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type AllocRunner interface {
Signal(taskName, signal string) error
GetTaskEventHandler(taskName string) drivermanager.EventHandler
PersistState() error
AcknowledgeState(*arstate.State)
LastAcknowledgedStateIsCurrent(*structs.Allocation) bool

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartRunning(taskEvent *structs.TaskEvent) error
Expand Down Expand Up @@ -512,7 +514,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService)

// Batching of initial fingerprints is done to reduce the number of node
// updates sent to the server on startup. This is the first RPC to the servers
// updates sent to the server on startup.
go c.batchFirstFingerprints()

// create heartbeatStop. We go after the first attempt to connect to the server, so
Expand Down Expand Up @@ -1270,6 +1272,14 @@ func (c *Client) restoreState() error {
continue
}

allocState, err := c.stateDB.GetAcknowledgedState(alloc.ID)
if err != nil {
c.logger.Error("error restoring last acknowledged alloc state, will update again",
err, "alloc_id", alloc.ID)
} else {
ar.AcknowledgeState(allocState)
}

// Maybe mark the alloc for halt on missing server heartbeats
if c.heartbeatStop.shouldStop(alloc) {
err = c.heartbeatStop.stopAlloc(alloc.ID)
Expand Down Expand Up @@ -2144,10 +2154,20 @@ func (c *Client) allocSync() {
if len(updates) == 0 {
continue
}
// Ensure we never send an update before we've had at least one sync
// from the server
select {
case <-c.serversContactedCh:
default:
continue
}

sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
sync := c.filterAcknowledgedUpdates(updates)
if len(sync) == 0 {
// No updates to send
updates = make(map[string]*structs.Allocation, len(updates))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Go 1.21 this can become clear(updates) which... doesn't really matter because a single malloc and single piece of garbage here isn't going to make any noticeable performance difference 😅

syncTicker.Reset(allocSyncIntv)
continue
}

// Send to server.
Expand All @@ -2162,21 +2182,51 @@ func (c *Client) allocSync() {
// Error updating allocations, do *not* clear
// updates and retry after backoff
c.logger.Error("error updating allocations", "error", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
syncTicker.Reset(c.retryIntv(allocSyncRetryIntv))
continue
}

c.allocLock.RLock()
for _, update := range sync {
if ar, ok := c.allocs[update.ID]; ok {
ar.AcknowledgeState(&arstate.State{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wanted to avoid storing a bunch of duplicated client alloc state and implementing a deep equality, I think we could make this a uint64, increment it on every Client.AllocStateUpdated call, and have the Client persist it here.

That relies on ARs/TRs not calling Client.AllocStateUpdated even when nothing has been updated though, so I think your approach is safer.

Copy link
Member Author

@tgross tgross May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wanted to do exactly that sort of thing by retaining an index on both the current client state and the last-acknowledged state. But that doesn't help across a restore because the AR/TR will make repeat updates as their hooks complete.

If the index were a hash it'd require less storage and potentially be less complex, but we wouldn't be able to return early on the comparison checks (whereas with the check we've got if e.g. the ClientStatus field changes we can immediately bail out and say "yes, need to update"). That being said, maybe we can probably cut down on the complexity of the equality check by not making it an equality check -- a bunch of the ClientState is immutable events, so we can get away with simple length check on those. (edit: as it turns out, not so much)

That relies on ARs/TRs not calling Client.AllocStateUpdated even when nothing has been updated though, so I think your approach is safer.

I tried pushing the LastAcknowledgedStateIsCurrent check into the AllocStateUpdated method in an earlier draft but then the check and the write+ACK are happening in different goroutines and there's a TOCTOU bug.

ClientStatus: update.ClientStatus,
ClientDescription: update.ClientDescription,
DeploymentStatus: update.DeploymentStatus,
TaskStates: update.TaskStates,
NetworkStatus: update.NetworkStatus,
})
}
}
c.allocLock.RUnlock()

// Successfully updated allocs, reset map and ticker.
// Always reset ticker to give loop time to receive
// alloc updates. If the RPC took the ticker interval
// we may call it in a tight loop before draining
// buffered updates.
updates = make(map[string]*structs.Allocation, len(updates))
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
syncTicker.Reset(allocSyncIntv)
}
}
}

func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation {
sync := make([]*structs.Allocation, 0, len(updates))
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for allocID, update := range updates {
if ar, ok := c.allocs[allocID]; ok {
if !ar.LastAcknowledgedStateIsCurrent(update) {
sync = append(sync, update)
}
} else {
// no allocrunner (typically a failed placement), so we need
// to send update
sync = append(sync, update)
}
}
return sync
}

// allocUpdates holds the results of receiving updated allocations from the
Expand Down
Loading