Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2980] DaemonSet preemption: don't flood the logs if victim selection fails #998

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ const PreemptionPreconditionsFailed = "Preemption preconditions failed"
const PreemptionDoesNotGuarantee = "Preemption queue guarantees check failed"
const PreemptionShortfall = "Preemption helped but short of resources"
const PreemptionDoesNotHelp = "Preemption does not help"
const NoVictimForRequiredNode = "No fit on required node, preemption does not help"
5 changes: 5 additions & 0 deletions pkg/scheduler/objects/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ func (a *Allocation) SendPredicatesFailedEvent(predicateErrors map[string]int) {
a.askEvents.SendPredicatesFailed(a.allocationKey, a.applicationID, predicateErrors, a.GetAllocatedResource())
}

// SendRequiredNodePreemptionFailedEvent updates the event system with required node preemption failed event.
func (a *Allocation) SendRequiredNodePreemptionFailedEvent(node string) {
a.askEvents.SendRequiredNodePreemptionFailed(a.allocationKey, a.applicationID, node, a.GetAllocatedResource())
}

// GetAllocationLog returns a list of log entries corresponding to allocation preconditions not being met.
func (a *Allocation) GetAllocationLog() []*AllocationLogEntry {
a.RLock()
Expand Down
8 changes: 2 additions & 6 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,9 +1409,6 @@ func (sa *Application) tryPreemption(headRoom *resources.Resource, preemptionDel
}

func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allocation) bool {
log.Log(log.SchedApplication).Info("Triggering preemption process for daemon set ask",
zap.String("ds allocation key", ask.GetAllocationKey()))

// try preemption and see if we can free up resource
preemptor := NewRequiredNodePreemptor(reserve.node, ask)
preemptor.filterAllocations()
Expand All @@ -1434,9 +1431,8 @@ func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allo
"preempting allocations to free up resources to run daemon set ask: "+ask.GetAllocationKey())
return true
}
log.Log(log.SchedApplication).Warn("Problem in finding the victims for preempting resources to meet required ask requirements",
zap.String("ds allocation key", ask.GetAllocationKey()),
zap.String("node id", reserve.nodeID))
ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
return false
}

Expand Down
175 changes: 175 additions & 0 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package objects
import (
"fmt"
"math"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -2916,6 +2917,172 @@ func TestPredicateFailedEvents(t *testing.T) {
assert.Equal(t, "Unschedulable request 'alloc-0': fake predicate plugin failed (2x); ", event.Message)
}

func TestRequiredNodePreemption(t *testing.T) {
// tests successful RequiredNode (DaemonSet) preemption
app := newApplication(appID0, "default", "root.default")
var releaseEvents []*rmevent.RMReleaseAllocationEvent
app.rmEventHandler = &mockAppEventHandler{
callback: func(ev interface{}) {
if rmEvent, ok := ev.(*rmevent.RMReleaseAllocationEvent); ok {
releaseEvents = append(releaseEvents, rmEvent)
go func() {
rmEvent.Channel <- &rmevent.Result{
Succeeded: true,
}
}()
}
},
}
node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
iterator := getNodeIteratorFn(node)
getNode := func(nodeID string) *Node {
return node
}

// set queue
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"})
assert.NilError(t, err)
app.SetQueue(childQ)

// add an ask
mockEvents := mock.NewEventSystem()
askRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
ask1 := newAllocationAsk("ask-1", "app-1", askRes)
ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
err = app.AddAllocationAsk(ask1)
assert.NilError(t, err, "could not add ask-1")
preemptionAttemptsRemaining := 0

// allocate ask
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1")
assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key")

// add ask2 with required node
ask2 := newAllocationAsk("ask-2", "app-1", askRes)
ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
ask2.requiredNode = nodeID1
err = app.AddAllocationAsk(ask2)
assert.NilError(t, err, "could not add ask-2")

// try to allocate ask2 with node being full - expect a reservation
result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved")
assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key")
err = app.Reserve(node, ask2)
assert.NilError(t, err, "reservation failed")

// preemption
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, ask1.IsPreempted(), "ask1 has not been preempted")
assert.Assert(t, ask2.HasTriggeredPreemption(), "ask2 has not triggered preemption")
assert.Equal(t, 1, len(releaseEvents), "unexpected number of release events")
assert.Equal(t, 1, len(releaseEvents[0].ReleasedAllocations), "unexpected number of release allocations")
assert.Equal(t, "ask-1", releaseEvents[0].ReleasedAllocations[0].AllocationKey, "allocation key")

// 2nd attempt - no preemption this time
releaseEvents = nil
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, releaseEvents == nil, "unexpected release event")

// check for preemption related events
for _, event := range mockEvents.Events {
assert.Assert(t, !strings.Contains(strings.ToLower(event.Message), "preemption"), "received a preemption related event")
}
}

func TestRequiredNodePreemptionFailed(t *testing.T) {
// tests RequiredNode (DaemonSet) preemption where the victim pod has a high priority, hence preemption is not possible
app := newApplication(appID0, "default", "root.default")
var releaseEvents []*rmevent.RMReleaseAllocationEvent
app.rmEventHandler = &mockAppEventHandler{
callback: func(ev interface{}) {
if rmEvent, ok := ev.(*rmevent.RMReleaseAllocationEvent); ok {
releaseEvents = append(releaseEvents, rmEvent)
go func() {
rmEvent.Channel <- &rmevent.Result{
Succeeded: true,
}
}()
}
},
}
node := newNode(nodeID1, map[string]resources.Quantity{"first": 20})
node.nodeEvents = schedEvt.NewNodeEvents(mock.NewEventSystemDisabled())
iterator := getNodeIteratorFn(node)
getNode := func(nodeID string) *Node {
return node
}

// set queue
rootQ, err := createRootQueue(map[string]string{"first": "20"})
assert.NilError(t, err)
childQ, err := createManagedQueue(rootQ, "default", false, map[string]string{"first": "20"})
assert.NilError(t, err)
app.SetQueue(childQ)

// add an ask with high priority
mockEvents := mock.NewEventSystem()
askRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 15})
ask1 := newAllocationAsk("ask-1", "app-1", askRes)
ask1.askEvents = schedEvt.NewAskEvents(mockEvents)
ask1.priority = 1000
err = app.AddAllocationAsk(ask1)
assert.NilError(t, err, "could not add ask-1")
preemptionAttemptsRemaining := 0

// allocate ask
headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50})
result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1")
assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key")

// add ask2 with required node
ask2 := newAllocationAsk("ask-2", "app-1", askRes)
ask2.askEvents = schedEvt.NewAskEvents(mockEvents)
ask2.requiredNode = nodeID1
err = app.AddAllocationAsk(ask2)
assert.NilError(t, err, "could not add ask-2")

// try to allocate ask2 with node being full - expect a reservation
result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode)
assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved")
assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key")
err = app.Reserve(node, ask2)
assert.NilError(t, err, "reservation failed")

// try preemption - should not succeed
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, !ask1.IsPreempted(), "unexpected preemption of ask1")
assert.Assert(t, !ask2.HasTriggeredPreemption(), "unexpected preemption triggered from ask2")
assert.Equal(t, 0, len(releaseEvents), "unexpected number of release events")
// check for events
noEvents := 0
var requestEvt *si.EventRecord
for _, event := range mockEvents.Events {
if event.Type == si.EventRecord_REQUEST && strings.Contains(strings.ToLower(event.Message), "preemption") {
noEvents++
requestEvt = event
}
}
assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
assert.Equal(t, "Unschedulable request 'ask-2' with required node 'node-1', no preemption victim found", requestEvt.Message)
assert.Equal(t, 1, len(ask2.allocLog), "unexpected number of entries in the allocation log")
assert.Equal(t, int32(1), ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry count")
assert.Equal(t, common.NoVictimForRequiredNode, ask2.allocLog[common.NoVictimForRequiredNode].Message, "unexpected log message")

// check counting & event throttling
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Assert(t, app.tryReservedAllocate(headRoom, iterator) == nil, "unexpected result from reserved allocation")
assert.Equal(t, 1, noEvents, "unexpected number of REQUEST events")
assert.Equal(t, int32(4), ask2.allocLog[common.NoVictimForRequiredNode].Count, "incorrect number of entry count")
}

type testIterator struct{}

func (testIterator) ForEachNode(fn func(*Node) bool) {
Expand Down Expand Up @@ -3035,3 +3202,11 @@ func TestGetUint64Tag(t *testing.T) {
})
}
}

type mockAppEventHandler struct {
callback func(ev interface{})
}

func (m mockAppEventHandler) HandleEvent(ev interface{}) {
m.callback(ev)
}
22 changes: 17 additions & 5 deletions pkg/scheduler/objects/events/ask_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (

// AskEvents Request-specific events. These events are of REQUEST type, so they are eventually sent to the respective pods in K8s.
type AskEvents struct {
eventSystem events.EventSystem
limiter *rate.Limiter
eventSystem events.EventSystem
predicateLimiter *rate.Limiter
reqNodeLimiter *rate.Limiter
}

func (ae *AskEvents) SendRequestExceedsQueueHeadroom(allocKey, appID string, headroom, allocatedResource *resources.Resource, queuePath string) {
Expand Down Expand Up @@ -73,7 +74,7 @@ func (ae *AskEvents) SendRequestFitsInUserQuota(allocKey, appID string, allocate
}

func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateErrors map[string]int, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.limiter.Allow() {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.predicateLimiter.Allow() {
return
}

Expand All @@ -94,13 +95,24 @@ func (ae *AskEvents) SendPredicatesFailed(allocKey, appID string, predicateError
ae.eventSystem.AddEvent(event)
}

func (ae *AskEvents) SendRequiredNodePreemptionFailed(allocKey, appID, node string, allocatedResource *resources.Resource) {
if !ae.eventSystem.IsEventTrackingEnabled() || !ae.reqNodeLimiter.Allow() {
return
}

message := fmt.Sprintf("Unschedulable request '%s' with required node '%s', no preemption victim found", allocKey, node)
event := events.CreateRequestEventRecord(allocKey, appID, message, allocatedResource)
ae.eventSystem.AddEvent(event)
}

func NewAskEvents(evt events.EventSystem) *AskEvents {
return newAskEventsWithRate(evt, 15*time.Second, 1)
}

func newAskEventsWithRate(evt events.EventSystem, interval time.Duration, burst int) *AskEvents {
return &AskEvents{
eventSystem: evt,
limiter: rate.NewLimiter(rate.Every(interval), burst),
eventSystem: evt,
predicateLimiter: rate.NewLimiter(rate.Every(interval), burst),
reqNodeLimiter: rate.NewLimiter(rate.Every(interval), burst),
}
}
40 changes: 38 additions & 2 deletions pkg/scheduler/objects/events/ask_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,48 @@ func TestPredicateFailedEvents(t *testing.T) {
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, "app-0", event.ReferenceID)

eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
events.SendPredicatesFailed("alloc-0", "app-0", errors, resource)
events.SendPredicatesFailed("alloc-1", "app-0", errors, resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message)
assert.Equal(t, "Unschedulable request 'alloc-1': error#0 (2x); error#1 (123x); error#2 (44x); ", event.Message)
}

func TestRequiredNodePreemptionFailedEvents(t *testing.T) {
resource := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 1})
eventSystem := mock.NewEventSystemDisabled()
events := NewAskEvents(eventSystem)
events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, resource)
assert.Equal(t, 0, len(eventSystem.Events))

eventSystem = mock.NewEventSystem()
events = newAskEventsWithRate(eventSystem, 50*time.Millisecond, 1)
// only the first event is expected to be emitted due to rate limiting
for i := 0; i < 200; i++ {
events.SendRequiredNodePreemptionFailed("alloc-0", "app-0", nodeID1, resource)
}
assert.Equal(t, 1, len(eventSystem.Events))
event := eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-0' with required node 'node-1', no preemption victim found", event.Message)
assert.Equal(t, si.EventRecord_REQUEST, event.Type)
assert.Equal(t, si.EventRecord_DETAILS_NONE, event.EventChangeDetail)
assert.Equal(t, "alloc-0", event.ObjectID)
assert.Equal(t, "app-0", event.ReferenceID)
protoRes := resources.NewResourceFromProto(event.Resource)
assert.DeepEqual(t, resource, protoRes)

eventSystem.Reset()
// wait a bit, a new event is expected
time.Sleep(100 * time.Millisecond)
events.SendRequiredNodePreemptionFailed("alloc-1", "app-0", nodeID1, resource)
assert.Equal(t, 1, len(eventSystem.Events))
event = eventSystem.Events[0]
assert.Equal(t, "Unschedulable request 'alloc-1' with required node 'node-1', no preemption victim found", event.Message)
}