-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support best effort FIFO #135
Support best effort FIFO #135
Conversation
980513e
to
e4b539d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to merge this on its own, but we can iterate over it, and then once "lgmted" we add another commit to the same PR with the reset of the implementation, why do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing to add to @ahg-g's comments.
e4b539d
to
aac6f51
Compare
aac6f51
to
1f2edb2
Compare
sgtm. I add the reset of the implementation in this commit. I've tested it in a real cluster to make sure it works. And integration test will be added in the follow-up commit in this pr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Alex, this is great as a first initial implementation. I have a couple of macro comments:
-
It would be nice if we can come up with an "interface" for QueueingStrategy that each queueing strategy implements and gets invoked agnostically by queue manager. This will allow us to establish a clear framework for when the queueing strategy could impact the logic rather than hardcoding things per strategy all over the place.
-
More immediate, we should document all events that could make a workload admissible, this is helpful to cross check if we actually covered all cases (and typically a test for each case).
1f2edb2
to
f23d697
Compare
Thanks @ahg-g I copy from https://docs.google.com/document/d/1VQ0qxWA-jwgvLq_WYG46OkXWW00O6q7b1BsR_Uv-acs/edit?usp=sharing I documented it before. It is similar to the implementation of the default-scheduler, we need to trigger the re-queue operation based on the relevant events.
The first version a rough and simple operation. Next step, we need to select the workloads to move based on the exact event in the next step, like RequestableResources / NamespaceSelector / Cohort changed .... |
I created a new pr to come up with an |
can we please document those in the code. |
f23d697
to
f828b1c
Compare
Done
documented. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of review, we need integration tests, but lest leave that until we converge on the logic itself.
func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) { | ||
cqImpl := ClusterQueueImpl{ | ||
heap: heapImpl{ | ||
less: strictFIFO, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit surprising to see "strictFIFO" here, lets rename it byCreationTime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
cqImpl := ClusterQueueImpl{ | ||
heap: heapImpl{ | ||
less: strictFIFO, | ||
items: make(map[string]*heapItem), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a followup, I would put the heap in its own pkg, and have a NewHeap function, this is leaking implementation details.
item := cq.heap.items[workload.Key(w)] | ||
info := *workload.NewInfo(w) | ||
if item == nil { | ||
heap.Push(&cq.heap, info) | ||
return | ||
} | ||
item.obj = info | ||
heap.Fix(&cq.heap, item.index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call ClusterQueueImpl.PushOrUpdate(w)
instead of replicating this code, that was my hope with the base/derived class analogy.
Same thing with the Delete function below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this in the first version. But it's hard to determine if the workload is new or re-queued.
One idea is to add a flag in workload info to be updated if the scheduling fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the flag? this piece of code seems like an exact replica of the PushOrUpdate
logic; to be clear, I am suggesting the following:
func (cq *ClusterQueueBestEffortFIFO) PushOrUpdate(w *kueue.QueuedWorkload) {
if oldInfo := cq.inadmissibleWorkloads.get(w); oldInfo != nil {
cq.inadmissibleWorkloads.delete(w)
}
ClusterQueueImpl.PushOrUpdate(w)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
// Delete deletes a workload from the workloads. | ||
func (i *InadmissibleWorkloads) delete(w *kueue.QueuedWorkload) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Input parameter should be the string key only, not the whole type. Same thing with the get function below
} | ||
|
||
// Clear removes all the entries from the workloads. | ||
func (i *InadmissibleWorkloads) clear() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be used in the unit test only, so I would remove the function (just so we offer a smaller api surface) and just do i.workloads = make(map[string]*workload.Info)
in the unit test itself.
cq.inadmissibleWorkloads.delete(w) | ||
} | ||
|
||
func (cq *ClusterQueueBestEffortFIFO) RequeueWorkload(wInfo *workload.Info) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a new interface? a reimplementation of PushOrUpdate should suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry the reply should be here.
I considered this in the first version. But it's hard to determine if the workload is new or re-queued.
One idea is to add a flag in workload info to be updated if the scheduling fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, lets address the other comments first and then come back to this at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave the function AddInadmissibleIfNotPresent and QueueInadmissibleWorkloads only.
pkg/queue/manager.go
Outdated
@@ -296,7 +305,34 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.QueuedWorkload | |||
cq := m.clusterQueues[q.ClusterQueue] | |||
if cq != nil { | |||
cq.Delete(w) | |||
if w.Spec.Admission != nil && m.queueAllInadmissibleWorkloadsInCohort(cq) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteWorkloadFromQueueAndClusterQueue
is invoked from DeleteWorkload
above. DeleteWorkload
isn't called when the workload was admitted (because it is not supposed to be in the queue):
if wl.Spec.Admission == nil { |
but perhaps we should remove that condition and just call DeleteWorkload in all cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep that logic in the controller. We also need to requeue elements in the case when a workload finishes. I think that's not covered in this PR yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I call the function DeleteWorkload
in manager.go
even if the workload was admitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #135 (comment); I think it is better not to do it here and have an explicit API that we can evolve later to handle finer grained moving of workloads.
pkg/queue/manager.go
Outdated
} | ||
|
||
queued := false | ||
for _, c := range m.clusterQueues { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to do better than that by having a cohort to ClusterQueues index.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine doing it in a follow up. This PR is already complex enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
pkg/queue/queue.go
Outdated
@@ -113,3 +113,11 @@ func (h *heapImpl) Pop() interface{} { | |||
delete(h.items, key) | |||
return obj | |||
} | |||
|
|||
// Get returns the requested item, or sets exists=false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Get returns the requested item, or sets exists=false. | |
// Get returns the requested item, or false if it doesn't exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this implementing an interface? which one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It isn't in the interface.
@@ -0,0 +1,163 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the filename has a typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/scheduler/scheduler.go
Outdated
if ok { | ||
log.V(2).Info("workload re-queued", "queuedWorkload", klog.KObj(w.Obj), "queue", klog.KRef(w.Obj.Namespace, w.Obj.Spec.QueueName)) | ||
} else if err != nil { | ||
log.Error(err, "workload re-queued", "queuedWorkload", klog.KObj(w.Obj), "queue", klog.KRef(w.Obj.Namespace, w.Obj.Spec.QueueName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.Error(err, "workload re-queued", "queuedWorkload", klog.KObj(w.Obj), "queue", klog.KRef(w.Obj.Namespace, w.Obj.Spec.QueueName)) | |
log.Error(err, "Failed re-queuing workload", "queuedWorkload", klog.KObj(w.Obj), "queue", klog.KRef(w.Obj.Namespace, w.Obj.Spec.QueueName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/queue/queue.go
Outdated
@@ -113,3 +113,11 @@ func (h *heapImpl) Pop() interface{} { | |||
delete(h.items, key) | |||
return obj | |||
} | |||
|
|||
// Get returns the requested item, or sets exists=false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this implementing an interface? which one?
pkg/queue/manager.go
Outdated
} | ||
|
||
queued := false | ||
for _, c := range m.clusterQueues { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine doing it in a follow up. This PR is already complex enough.
pkg/queue/manager.go
Outdated
@@ -296,7 +305,34 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.QueuedWorkload | |||
cq := m.clusterQueues[q.ClusterQueue] | |||
if cq != nil { | |||
cq.Delete(w) | |||
if w.Spec.Admission != nil && m.queueAllInadmissibleWorkloadsInCohort(cq) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep that logic in the controller. We also need to requeue elements in the case when a workload finishes. I think that's not covered in this PR yet.
pkg/queue/cluster_queue_interface.go
Outdated
// RequeueWorkload pushes the workload back to ClusterQueue after scheduling failure. | ||
RequeueWorkload(*workload.Info) (bool, error) | ||
// AddInadmissibleIfNotPresent inserts a workload that cannot be admitted into | ||
// the inadmissibleWorkloads in ClusterQueue, unless it is already in the queue. | ||
AddInadmissibleIfNotPresent(*workload.Info) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only have the methods that the scheduler needs in the interface.
AddInadmissibleIfNotPresent
and RequeIfNotPresent
(previously named PushIfNotPresent
).
The implementation of StrictFIFO should be that AddInadmissibleIfNotPresent
just calls RequeIfNotPresent
.
PushOrUpdate
is what you would use to receive objects from the controllers, and that will create a workload.Info
internally. The scheduler needs to push workload.Info
objects as we might want to carry information for the next scheduling attempts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave the function AddInadmissibleIfNotPresent
and QueueInadmissibleWorkloads
only.
Call PushIfNotPresent
in AddInadmissibleIfNotPresent
if it's the implementation of StrictFIFO. Pls take a look.
ddc123f
to
0940bb6
Compare
workload1 := &kueue.QueuedWorkload{} | ||
gomega.Eventually(func() bool { | ||
lookupKey := types.NamespacedName{Name: job1.Name, Namespace: job1.Namespace} | ||
err := k8sClient.Get(ctx, lookupKey, workload1) | ||
return err == nil | ||
}, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this eventually block
@ahg-g Thanks for your review in the early morning. Updated by all the comments. Pls review it again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gomega.Eventually(func() bool { | ||
lookupKey := types.NamespacedName{Name: job2.Name, | ||
Namespace: job2.Namespace} | ||
return k8sClient.Get(ctx, lookupKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
createdJob3 := &batchv1.Job{} | ||
gomega.Eventually(func() bool { | ||
lookupKey := types.NamespacedName{Name: job3.Name, Namespace: job3.Namespace} | ||
return k8sClient.Get(ctx, lookupKey, createdJob3) == nil && !*createdJob3.Spec.Suspend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do the same here, return pointer bool
createdJob2 := &batchv1.Job{} | ||
gomega.Consistently(func() bool { | ||
lookupKey := types.NamespacedName{Name: job2.Name, Namespace: job2.Namespace} | ||
return k8sClient.Get(ctx, lookupKey, createdJob2) == nil && *createdJob2.Spec.Suspend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, return pointer bool
createdJob3 := &batchv1.Job{} | ||
gomega.Eventually(func() bool { | ||
lookupKey := types.NamespacedName{Name: job3.Name, Namespace: job3.Namespace} | ||
return k8sClient.Get(ctx, lookupKey, createdJob3) == nil && !*createdJob3.Spec.Suspend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Name).Request(corev1.ResourceCPU, "8").Obj() | ||
gomega.Expect(k8sClient.Create(ctx, job2)).Should(gomega.Succeed()) | ||
createdJob2 := &batchv1.Job{} | ||
gomega.Consistently(func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, return pointer bool
|
||
ginkgo.By("updating ClusterQueue") | ||
devCq := &kueue.ClusterQueue{} | ||
gomega.Eventually(func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return error
gomega.Eventually(func() bool { | ||
lookupKey := types.NamespacedName{Name: devBEClusterQ.Name} | ||
return k8sClient.Get(ctx, lookupKey, devCq) == nil | ||
}, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}, framework.Timeout, framework.Interval).Should(gomega.BeTrue()) | |
}, framework.Timeout, framework.Interval).Should(gomega.Succeed()) |
Namespace: job1.Namespace, | ||
}})).Should(gomega.Succeed()) | ||
|
||
gomega.Eventually(func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, return pointer bool
job3 := testing.MakeJob("on-demand-job3", ns.Name).Queue(prodBEQueue.Name).Request(corev1.ResourceCPU, "2").Obj() | ||
gomega.Expect(k8sClient.Create(ctx, job3)).Should(gomega.Succeed()) | ||
createdJob3 := &batchv1.Job{} | ||
gomega.Eventually(func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, return pointer bool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is looking great!
w.Finalizers = nil | ||
return w | ||
} | ||
return !reflect.DeepEqual(strip(old), strip(new)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use equality.Semantic.DeepEqual
from k8s.io/apimachinery/pkg/api/equality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// ClusterQueueBestEffortFIFO is the implementation for the ClusterQueue for | ||
// BestEffortFIFO. | ||
type ClusterQueueBestEffortFIFO struct { | ||
*ClusterQueueImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: does it need to be a pointer?
pkg/queue/cluster_queue_interface.go
Outdated
@@ -50,6 +49,13 @@ type ClusterQueue interface { | |||
// queue is empty. | |||
Pop() *workload.Info | |||
|
|||
// AddInadmissibleIfNotPresent inserts a workload that cannot be admitted into | |||
// the inadmissibleWorkloads in ClusterQueue, unless it is already in the queue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an implementation detail that doesn't apply to a strictFIFO queue.
Maybe you can say:
inserts a workload that could not be admitted back into the ClusterQueue. The implementation might choose to keep it in temporary placeholder stage where it doesn't compete with other workloads, until cluster events free up quota. The workload should not be reinserted if it's already in the ClusterQueue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
pkg/queue/cluster_queue_interface.go
Outdated
// AddInadmissibleIfNotPresent inserts a workload that cannot be admitted into | ||
// the inadmissibleWorkloads in ClusterQueue, unless it is already in the queue. | ||
AddInadmissibleIfNotPresent(*workload.Info) bool | ||
// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, implementation detail. You can refer to this as a notification. Maybe the name could be:
NotifyPotentialAvailability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the comments to be less implementation detailed.
But maybe we still keep the func name --> QueueInadmissibleWorkloads
. QueueInadmissibleWorkloads is easier to understand.
It's hard to find a proper name.
@@ -109,6 +123,12 @@ func (m *Manager) UpdateClusterQueue(cq *kueue.ClusterQueue) error { | |||
} | |||
// TODO(#8): recreate heap based on a change of queueing policy. | |||
cqImpl.Update(cq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if this changes the cohort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the part to update the cohorts in manager.
But I'm curious if we need to deny users from updating the cohort of ClusterQueue in the future.
pkg/scheduler/scheduler.go
Outdated
@@ -400,6 +400,7 @@ func (e entryOrdering) Less(i, j int) bool { | |||
func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, w *workload.Info, message string) { | |||
added := s.queues.RequeueWorkload(ctx, w) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want to distinguish two scenarios of re-queueing (but fine to do in a follow up).
We have a case where we requeue because the workload didn't fit. But we also have a case where we requeue because we admitted workloads in another CQ that had higher priority, even though they might still fit in the next scheduling cycle. I don't think we should punish those workloads. They should skip the inadmissible stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree it.
As we just admit only one workload in the same Cohort in one scheduling cycle, maybe some other workloads are rejected directly and re-enqueue. We need to distinguish these two scenarios and skip them.
gomega.Eventually(func() error { | ||
lookupKey := types.NamespacedName{Name: devBEClusterQ.Name} | ||
return k8sClient.Get(ctx, lookupKey, devCq) | ||
}, framework.Timeout, framework.Interval).Should(gomega.Succeed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for the eventually
gomega.Eventually(func() error { | |
lookupKey := types.NamespacedName{Name: devBEClusterQ.Name} | |
return k8sClient.Get(ctx, lookupKey, devCq) | |
}, framework.Timeout, framework.Interval).Should(gomega.Succeed()) | |
gomega.Expect(k8sClient.Get(ctx, lookupKey, devCq)).Should(gomega.Succeed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
I will squash these after Aldo confirms. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/approve
Ready for squash
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: alculquicondor, denkensk The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Signed-off-by: Alex Wang <wangqingcan1990@gmail.com>
Signed-off-by: Alex Wang <wangqingcan1990@gmail.com>
fec27b3
to
fdcb6a5
Compare
squashed |
/lgtm |
/hold cancel |
/lgtm Thanks @denkensk , this is great! |
Signed-off-by: Alex Wang wangqingcan1990@gmail.com
What type of PR is this?
/kind feature
What this PR does / why we need it:
In order to facilitate the review and to be able to quickly merge. I split the implementation into multiple prs.
This pr is
add UnscheduleQ in the internal ClusterQueue struct.
Which issue(s) this PR fixes:
2/4 #8
Special notes for your reviewer: