Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancel issue between Query Frontend and Query Schdeduler #5113

Merged
merged 12 commits into from
Jan 13, 2022
9 changes: 3 additions & 6 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ enqueueAgain:

select {
case <-ctx.Done():
level.Warn(f.log).Log("msg", "request enqueued, but not cancelable", "queryID", freq.queryID)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
return nil, ctx.Err()

case enqRes := <-freq.enqueue:
Expand All @@ -223,12 +224,8 @@ enqueueAgain:
select {
case <-ctx.Done():
if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, ignore.
}
// Let it block until it's workers receives it, We don't want to exist RoundTripGRPC without cancelling the downstream request started by this request.
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
f.schedulerWorkers.sendRequestCancel(freq.queryID, cancelCh)
}
return nil, ctx.Err()

Expand Down
14 changes: 14 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ func (f *frontendSchedulerWorkers) getWorkersCount() int {
return len(f.workers)
}

// sendRequestCancel sends cancellation to the "already scheduled" frontendRequest.
// It will make sure the frontend worker that is responsible for the `reqID`
// receives the cancel signal.
func (f *frontendSchedulerWorkers) sendRequestCancel(reqID uint64, cancelCh chan<- uint64) {
f.mu.Lock()
defer f.mu.Unlock()
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved

// There can be the case, where Frontend is running without any Frontend workers,
// sending cancel shoudn't block in those cases.
if len(f.workers) > 0 {
cancelCh <- reqID
}
}

func (f *frontendSchedulerWorkers) connectToScheduler(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := f.cfg.GRPCClientConfig.DialOption(nil, nil)
Expand Down
39 changes: 39 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,45 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// Bug: If FrontendWorkers are busy, cancellation passed by Query frontend may not reach
// all the frontend workers thus not reaching the scheduler as well.
func TestFrontendWorkerCancellation(t *testing.T) {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
f, ms := setupFrontend(t, nil)

// fmt.Println("workers count", f.schedulerWorkers.getWorkersCount(), "max-concurrency per worker", f.cfg.WorkerConcurrency)

cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil)

Expand Down