Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: volume watcher shutdown fixes #12439

Merged
merged 5 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2278,10 +2278,10 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
err := store.UpsertNode(structs.MsgTypeTestSetup, index, node)
require.NoError(t, err)

// Note that for volume writes in this test we need to use the
// RPCs rather than StateStore methods directly so that the GC
// job's RPC call updates a later index. otherwise the
// volumewatcher won't trigger for the final GC
// *Important*: for volume writes in this test we must use RPCs
// rather than StateStore methods directly, or the blocking query
// in volumewatcher won't get the final update for GC because it's
// watching on a different store at that point

// Register a volume
vols := []*structs.CSIVolume{{
Expand All @@ -2302,11 +2302,11 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
volReq, &structs.CSIVolumeRegisterResponse{})
require.NoError(t, err)

// Create a job with two allocations that claim the volume.
// Create a job with two allocs that claim the volume.
// We use two allocs here, one of which is not running, so
// that we can assert that the volumewatcher has made one
// complete pass (and removed the 2nd alloc) before running
// the GC.
// that we can assert the volumewatcher has made one
// complete pass (and removed the 2nd alloc) before we
// run the GC
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
index++
Expand All @@ -2331,6 +2331,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {

alloc2.NodeID = node.ID
alloc2.ClientStatus = structs.AllocClientStatusComplete
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
Expand All @@ -2344,42 +2345,66 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
index++
require.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2}))

// Claim the volume for the alloc
req := &structs.CSIVolumeClaimRequest{
AllocationID: alloc1.ID,
NodeID: node.ID,
VolumeID: volID,
Claim: structs.CSIVolumeClaimWrite,
VolumeID: volID,
AllocationID: alloc1.ID,
NodeID: uuid.Generate(), // doesn't exist so we don't get errors trying to unmount volumes from it
Claim: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
WriteRequest: structs.WriteRequest{
Namespace: ns,
Region: srv.config.Region,
},
}
req.Namespace = ns
req.Region = srv.config.Region
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(t, err)
require.NoError(t, err, "write claim should succeed")

req.AllocationID = alloc2.ID
req.State = structs.CSIVolumeClaimStateUnpublishing

err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(t, err, "unpublishing claim should succeed")

// Delete allocation and job
require.Eventually(t, func() bool {
vol, err := store.CSIVolumeByID(ws, ns, volID)
require.NoError(t, err)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
}, time.Second*1, 100*time.Millisecond,
"volumewatcher should have released unpublishing claim without GC")

// At this point we can guarantee that volumewatcher is waiting
// for new work. Delete allocation and job so that the next pass
// thru volumewatcher has more work to do
index, _ = store.LatestIndex()
index++
err = store.DeleteJob(index, ns, job.ID)
require.NoError(t, err)
index, _ = store.LatestIndex()
index++
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID, alloc2.ID})
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID})
require.NoError(t, err)

// Create a core scheduler and attempt the volume claim GC
snap, err := store.Snapshot()
require.NoError(t, err)

core := NewCoreScheduler(srv, snap)

index, _ = snap.LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(t, c.csiVolumeClaimGC(gc))

// sending the GC claim will trigger the volumewatcher's normal
// code path. the volumewatcher will hit an error here because
// there's no path to the node, but this is a node-only plugin so
// we accept that the node has been GC'd and there's no point
// holding onto the claim
// the only remaining claim is for a deleted alloc with no path to
// the non-existent node, so volumewatcher will release the
// remaining claim
require.Eventually(t, func() bool {
vol, _ := store.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 0 &&
Expand Down
4 changes: 3 additions & 1 deletion nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ NODE_DETACHED:
}

RELEASE_CLAIM:
v.logger.Trace("releasing claim", "vol", vol.ID)
// advance a CSIVolumeClaimStateControllerDetached claim
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = v.checkpointClaim(vol, claim)
Expand All @@ -684,6 +685,7 @@ RELEASE_CLAIM:
}

func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("node unpublish", "vol", vol.ID)
if claim.AllocationID != "" {
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
Expand Down Expand Up @@ -757,7 +759,7 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
}

func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {

v.logger.Trace("controller unpublish", "vol", vol.ID)
if !vol.ControllerRequired {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil
Expand Down
68 changes: 46 additions & 22 deletions nomad/volumewatcher/volume_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package volumewatcher
import (
"context"
"sync"
"time"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -30,6 +32,11 @@ type volumeWatcher struct {
shutdownCtx context.Context // parent context
ctx context.Context // own context
exitFn context.CancelFunc
deleteFn func()

// quiescentTimeout is the time we wait until the volume has "settled"
// before stopping the child watcher goroutines
quiescentTimeout time.Duration

// updateCh is triggered when there is an updated volume
updateCh chan *structs.CSIVolume
Expand All @@ -43,13 +50,15 @@ type volumeWatcher struct {
func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher {

w := &volumeWatcher{
updateCh: make(chan *structs.CSIVolume, 1),
v: vol,
state: parent.state,
rpc: parent.rpc,
leaderAcl: parent.leaderAcl,
logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace),
shutdownCtx: parent.ctx,
updateCh: make(chan *structs.CSIVolume, 1),
v: vol,
state: parent.state,
rpc: parent.rpc,
leaderAcl: parent.leaderAcl,
logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace),
shutdownCtx: parent.ctx,
deleteFn: func() { parent.remove(vol.ID + vol.Namespace) },
quiescentTimeout: parent.quiescentTimeout,
}

// Start the long lived watcher that scans for allocation updates
Expand Down Expand Up @@ -111,6 +120,9 @@ func (vw *volumeWatcher) watch() {
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)

timer, stop := helper.NewSafeTimer(vw.quiescentTimeout)
defer stop()

for {
select {
// TODO(tgross): currently server->client RPC have no cancellation
Expand All @@ -120,17 +132,28 @@ func (vw *volumeWatcher) watch() {
case <-vw.ctx.Done():
return
case vol := <-vw.updateCh:
// while we won't make raft writes if we get a stale update,
// we can still fire extra CSI RPC calls if we don't check this
if vol.ModifyIndex >= vw.v.ModifyIndex {
vol = vw.getVolume(vol)
if vol == nil {
return
}
vw.volumeReap(vol)
vol = vw.getVolume(vol)
if vol == nil {
// We stop the goroutine whenever we have no more
// work, but only delete the watcher when the volume
// is gone to avoid racing the blocking query
vw.deleteFn()
vw.Stop()
return
}
vw.volumeReap(vol)
timer.Reset(vw.quiescentTimeout)
case <-timer.C:
// Wait until the volume has "settled" before stopping
// this goroutine so that the race between shutdown and
// the parent goroutine sending on <-updateCh is pushed to
// after the window we most care about quick freeing of
// claims (and the GC job will clean up anything we miss)
vol = vw.getVolume(vol)
if vol == nil {
vw.deleteFn()
}
default:
vw.Stop() // no pending work
vw.Stop()
return
}
}
Expand All @@ -145,9 +168,12 @@ func (vw *volumeWatcher) getVolume(vol *structs.CSIVolume) *structs.CSIVolume {
var err error
ws := memdb.NewWatchSet()

vol, err = vw.state.CSIVolumeDenormalizePlugins(ws, vol.Copy())
vol, err = vw.state.CSIVolumeByID(ws, vol.Namespace, vol.ID)
if err != nil {
vw.logger.Error("could not query plugins for volume", "error", err)
vw.logger.Error("could not query for volume", "error", err)
return nil
}
if vol == nil {
return nil
}

Expand All @@ -168,9 +194,6 @@ func (vw *volumeWatcher) volumeReap(vol *structs.CSIVolume) {
if err != nil {
vw.logger.Error("error releasing volume claims", "error", err)
}
if vw.isUnclaimed(vol) {
vw.Stop()
}
}

func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool {
Expand Down Expand Up @@ -230,6 +253,7 @@ func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIV
}

func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
vw.logger.Trace("unpublishing volume", "alloc", claim.AllocationID)
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: vol.ID,
Claim: claim,
Expand Down
29 changes: 22 additions & 7 deletions nomad/volumewatcher/volumes_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package volumewatcher
import (
"context"
"sync"
"time"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -34,9 +35,15 @@ type Watcher struct {
ctx context.Context
exitFn context.CancelFunc

// quiescentTimeout is the time we wait until the volume has "settled"
// before stopping the child watcher goroutines
quiescentTimeout time.Duration

wlock sync.RWMutex
}

var defaultQuiescentTimeout = time.Minute * 5

// NewVolumesWatcher returns a volumes watcher that is used to watch
// volumes and trigger the scheduler as needed.
func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *Watcher {
Expand All @@ -47,11 +54,12 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W
ctx, exitFn := context.WithCancel(context.Background())

return &Watcher{
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
leaderAcl: leaderAcl,
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
leaderAcl: leaderAcl,
quiescentTimeout: defaultQuiescentTimeout,
}
}

Expand Down Expand Up @@ -157,10 +165,10 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in
}

// add adds a volume to the watch list
func (w *Watcher) add(d *structs.CSIVolume) error {
func (w *Watcher) add(v *structs.CSIVolume) error {
w.wlock.Lock()
defer w.wlock.Unlock()
_, err := w.addLocked(d)
_, err := w.addLocked(v)
return err
}

Expand All @@ -182,3 +190,10 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {
w.watchers[v.ID+v.Namespace] = watcher
return watcher, nil
}

// removes a volume from the watch list
func (w *Watcher) remove(volID string) {
w.wlock.Lock()
defer w.wlock.Unlock()
delete(w.watchers, volID)
}
Loading