Skip to content

Commit

Permalink
Merge pull request kubernetes#105050 from shyamjvs/automated-cherry-p…
Browse files Browse the repository at this point in the history
…ick-of-#104833-upstream-release-1.21

Automated cherry pick of kubernetes#104833 (1.21): Refine locking in API Priority and Fairness config controller
  • Loading branch information
k8s-ci-robot authored Sep 17, 2021
2 parents f383f14 + 71a7a04 commit 4dac5c1
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ const timeFmt = "2006-01-02T15:04:05.999"
// undesired becomes completely unused, all the config objects are
// read and processed as a whole.

// The funcs in this package follow the naming convention that the suffix
// "Locked" means the relevant mutex must be locked at the start of each
// call and will be locked upon return. For a configController, the
// suffix "ReadLocked" stipulates a read lock while just "Locked"
// stipulates a full lock. Absence of either suffix means that either
// (a) the lock must NOT be held at call time and will not be held
// upon return or (b) locking is irrelevant.

// StartFunction begins the process of handling a request. If the
// request gets queued then this function uses the given hashValue as
// the source of entropy as it shuffle-shards the request into a
Expand Down Expand Up @@ -123,10 +131,22 @@ type configController struct {
// requestWaitLimit comes from server configuration.
requestWaitLimit time.Duration

// the most recent update attempts, ordered by increasing age.
// Consumer trims to keep only the last minute's worth of entries.
// The controller uses this to limit itself to at most six updates
// to a given FlowSchema in any minute.
// This may only be accessed from the one and only worker goroutine.
mostRecentUpdates []updateAttempt

// This must be locked while accessing flowSchemas or
// priorityLevelStates. It is the lock involved in
// LockingWriteMultiple.
lock sync.Mutex
// priorityLevelStates. A lock for writing is needed
// for writing to any of the following:
// - the flowSchemas field
// - the slice held in the flowSchemas field
// - the priorityLevelStates field
// - the map held in the priorityLevelStates field
// - any field of a priorityLevelState held in that map
lock sync.RWMutex

// flowSchemas holds the flow schema objects, sorted by increasing
// numerical (decreasing logical) matching precedence. Every
Expand All @@ -137,13 +157,6 @@ type configController struct {
// name to the state for that level. Every name referenced from a
// member of `flowSchemas` has an entry here.
priorityLevelStates map[string]*priorityLevelState

// the most recent update attempts, ordered by increasing age.
// Consumer trims to keep only the last minute's worth of entries.
// The controller uses this to limit itself to at most six updates
// to a given FlowSchema in any minute.
// This may only be accessed from the one and only worker goroutine.
mostRecentUpdates []updateAttempt
}

type updateAttempt struct {
Expand Down Expand Up @@ -276,8 +289,8 @@ func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
}

func (cfgCtlr *configController) updateObservations() {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock()
for _, plc := range cfgCtlr.priorityLevelStates {
if plc.queues != nil {
plc.queues.UpdateObservations()
Expand Down Expand Up @@ -760,8 +773,8 @@ func (immediateRequest) Finish(execute func()) bool {
// waiting in its queue, or `Time{}` if this did not happen.
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
klog.V(7).Infof("startRequest(%#+v)", rd)
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock()
var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
for _, fs := range cfgCtlr.flowSchemas {
if matchesFlowSchema(rd, fs) {
Expand Down Expand Up @@ -806,7 +819,7 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
req, idle := plState.queues.StartRequest(ctx, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
if idle {
cfgCtlr.maybeReapLocked(plName, plState)
cfgCtlr.maybeReapReadLocked(plName, plState)
}
return selectedFlowSchema, plState.pl, false, req, startWaitingTime
}
Expand All @@ -815,8 +828,8 @@ func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDig
// priority level if it has no more use. Call this after getting a
// clue that the given priority level is undesired and idle.
func (cfgCtlr *configController) maybeReap(plName string) {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
cfgCtlr.lock.RLock()
defer cfgCtlr.lock.RUnlock()
plState := cfgCtlr.priorityLevelStates[plName]
if plState == nil {
klog.V(7).Infof("plName=%s, plState==nil", plName)
Expand All @@ -838,7 +851,7 @@ func (cfgCtlr *configController) maybeReap(plName string) {
// it has no more use. Call this if both (1) plState.queues is
// non-nil and reported being idle, and (2) cfgCtlr's lock has not
// been released since then.
func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
func (cfgCtlr *configController) maybeReapReadLocked(plName string, plState *priorityLevelState) {
if !(plState.quiescing && plState.numPending == 0) {
return
}
Expand Down

0 comments on commit 4dac5c1

Please sign in to comment.