Skip to content

Commit

Permalink
Merge pull request #3144 from dperny/fix-volume-scheduling
Browse files Browse the repository at this point in the history
Free unused volumes in more cases
  • Loading branch information
dperny authored Jul 18, 2023
2 parents 467b19e + 570d132 commit 88a8e0a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
16 changes: 13 additions & 3 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,18 @@ func (s *Scheduler) tick(ctx context.Context) {
}

func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDecisions map[string]schedulingDecision) (successful, failed []schedulingDecision) {
// applySchedulingDecisions is the only place where we make store
// transactions in the scheduler. the scheduler is responsible for freeing
// volumes that are no longer in use. this means that volumes should be
// freed in this function. sometimes, there are no scheduling decisions to
// be made, so we return early in the if statement below.
//
// however, in all cases, any activity that results in a tick could result
// in needing volumes to be freed, even if nothing new is scheduled. this
// freeing of volumes should always happen *after* all of the scheduling
// decisions have been committed, hence the defer.
defer s.store.Batch(s.volumes.freeVolumes)

if len(schedulingDecisions) == 0 {
return
}
Expand Down Expand Up @@ -619,9 +631,7 @@ func (s *Scheduler) applySchedulingDecisions(ctx context.Context, schedulingDeci
}
// finally, every time we make new scheduling decisions, take the
// opportunity to release volumes.
return batch.Update(func(tx store.Tx) error {
return s.volumes.freeVolumes(tx)
})
return nil
})

if err != nil {
Expand Down
37 changes: 23 additions & 14 deletions manager/scheduler/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,33 @@ func (vs *volumeSet) releaseVolume(volumeID, taskID string) {
//
// TODO(dperny): this is messy and has a lot of overhead. it should be reworked
// to something more streamlined.
func (vs *volumeSet) freeVolumes(tx store.Tx) error {
func (vs *volumeSet) freeVolumes(batch *store.Batch) error {
for volumeID, info := range vs.volumes {
v := store.GetVolume(tx, volumeID)
if v == nil {
continue
}
if err := batch.Update(func(tx store.Tx) error {
v := store.GetVolume(tx, volumeID)
if v == nil {
return nil
}

changed := false
for _, status := range v.PublishStatus {
if info.nodes[status.NodeID] == 0 && status.State == api.VolumePublishStatus_PUBLISHED {
status.State = api.VolumePublishStatus_PENDING_NODE_UNPUBLISH
changed = true
// when we are freeing a volume, we may update more than one of the
// volume's PublishStatuses. this means we can't simply put the
// Update call inside of the if statement; we need to know if we've
// changed anything once we've checked *all* of the statuses.
changed := false
for _, status := range v.PublishStatus {
if info.nodes[status.NodeID] == 0 && status.State == api.VolumePublishStatus_PUBLISHED {
status.State = api.VolumePublishStatus_PENDING_NODE_UNPUBLISH
changed = true
}
}
}
if changed {
if err := store.UpdateVolume(tx, v); err != nil {
return err
if changed {
if err := store.UpdateVolume(tx, v); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
return nil
Expand Down
4 changes: 1 addition & 3 deletions manager/scheduler/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,9 +652,7 @@ var _ = Describe("volumeSet", func() {
vs.releaseVolume(volumes[0].ID, tasks[0].ID)
vs.releaseVolume(allVolume.ID, tasks[0].ID)

err := s.Update(func(tx store.Tx) error {
return vs.freeVolumes(tx)
})
err := s.Batch(vs.freeVolumes)
Expect(err).ToNot(HaveOccurred())

var freshVolumes []*api.Volume
Expand Down

0 comments on commit 88a8e0a

Please sign in to comment.