Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle cancellations before requests #742

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
* [BUGFIX] Ruler: fix formatting of rule groups in `/ruler/rule_groups` endpoint. #655
* [BUGFIX] Querier: Disable query scheduler SRV DNS lookup. #689
* [BUGFIX] Query-frontend: fix API error messages that were mentioning Prometheus `--enable-feature=promql-negative-offset` and `--enable-feature=promql-at-modifier` flags. #688
* [BUGFIX] Query-frontend: worker's cancellation channels are now buffered to ensure that all request cancellations are properly handled. #741

### Mixin (changes since `grafana/cortex-jsonnet` `1.9.0`)

Expand Down
12 changes: 3 additions & 9 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,14 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.

enqueueAgain:
var cancelCh chan<- uint64
select {
case <-ctx.Done():
return nil, ctx.Err()

case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}

var cancelCh chan<- uint64

select {
case <-ctx.Done():
return nil, ctx.Err()

case enqRes := <-freq.enqueue:
enqRes := <-freq.enqueue
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
Expand All @@ -232,6 +225,7 @@ enqueueAgain:
// cancellation sent.
default:
// failed to cancel, ignore.
level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full")
}
}
return nil, ctx.Err()
Expand Down
68 changes: 51 additions & 17 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ import (
"github.com/grafana/mimir/pkg/util"
)

const schedulerAddressLabel = "scheduler_address"
const (
schedulerAddressLabel = "scheduler_address"
// schedulerWorkerCancelChanCapacity should be at least as big as the number of sub-queries issued by a single query
// per scheduler (after splitting and sharding) in order to allow all of them being canceled while scheduler worker is busy.
// Since the channel holds uint64, this is 8KB per scheduler worker.
schedulerWorkerCancelChanCapacity = 1000
)

type frontendSchedulerWorkers struct {
services.Service
Expand Down Expand Up @@ -197,7 +203,7 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestCh: requestCh,
cancelCh: make(chan uint64),
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueuedRequests: enqueuedRequests,
}
w.ctx, w.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -272,6 +278,8 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
ctx := loop.Context()

for {
// First check if we're closing or there are canceled requests.
// These are more important than handling new requests.
select {
case <-ctx.Done():
// No need to report error if our internal context is canceled. This can happen during shutdown,
Expand All @@ -282,6 +290,21 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
level.Debug(w.log).Log("msg", "stream context finished", "err", ctx.Err())
return nil

case reqID := <-w.cancelCh:
if err := w.handleCancel(loop, reqID); err != nil {
return err
}
// Start over, maybe there are more cancellations pending.
continue

default:
// No high-priority events, lets see if there are new requests or just wait for the first event otherwise.
}

select {
case <-ctx.Done():
return nil // Not an error, see above.

case req := <-w.requestCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
Expand Down Expand Up @@ -331,27 +354,38 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
Body: []byte("too many outstanding requests"),
},
}

default:
level.Error(w.log).Log("msg", "unknown response status from the scheduler", "status", resp.Status, "queryID", req.queryID)
req.enqueue <- enqueueResult{status: failed}
}

case reqID := <-w.cancelCh:
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
QueryID: reqID,
})

if err != nil {
if err := w.handleCancel(loop, reqID); err != nil {
return err
}
}
}
}

resp, err := loop.Recv()
if err != nil {
return err
}
func (w *frontendSchedulerWorker) handleCancel(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient, reqID uint64) error {
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
QueryID: reqID,
})

// Scheduler may be shutting down, report that.
if resp.Status != schedulerpb.OK {
return errors.Errorf("unexpected status received for cancellation: %v", resp.Status)
}
}
if err != nil {
return err
}

resp, err := loop.Recv()
if err != nil {
return err
}

// Scheduler may be shutting down, report that.
if resp.Status != schedulerpb.OK {
return errors.Errorf("unexpected status received for cancellation: %v", resp.Status)
}
return nil
}
49 changes: 49 additions & 0 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,55 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// If FrontendWorkers are busy, cancellation passed by Query frontend may not reach
// all the frontend workers thus not reaching the scheduler as well.
// Issue: https://github.com/grafana/mimir/issues/740
func TestFrontendWorkerCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
msgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{}
for _, msg := range ms.msgs {
msgTypeCounts[msg.Type]++
}
expectedMsgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{
schedulerpb.ENQUEUE: reqCount,
schedulerpb.CANCEL: reqCount,
}
require.Equalf(t, expectedMsgTypeCounts, msgTypeCounts,
"Should receive %d enqueue (%d) requests, and %d cancel (%d) requests.", reqCount, schedulerpb.ENQUEUE, reqCount, schedulerpb.CANCEL,
)
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)

Expand Down