Skip to content

Commit

Permalink
query-frontend: cancel stream to scheduler when streaming failed (gra…
Browse files Browse the repository at this point in the history
…fana#3302)

* query-frontend: cancel stream to scheduler when streaming failed

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* Add changelog entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* Add test for proper stream cleanup

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Peter Štibraný <pstibrany@gmail.com>

* Simplify checkStreamGoroutines

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Co-authored-by: Peter Štibraný <pstibrany@gmail.com>
  • Loading branch information
2 people authored and mason committed Nov 4, 2022
1 parent c6edae0 commit 040fa26
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Distributor: Include `X-Scope-OrgId` header in requests forwarded to configured forwarding endpoint. #3283
* [BUGFIX] Flusher: Add `Overrides` as a dependency to prevent panics when starting with `-target=flusher`. #3151
* [BUGFIX] Updated `golang.org/x/text` dependency to fix CVE-2022-32149. #3285
* [BUGFIX] Query-frontend: properly close gRPC streams to the query-scheduler to stop memory and goroutines leak. #3302

### Mixin

Expand Down
30 changes: 19 additions & 11 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,15 @@ func (w *frontendSchedulerWorker) stop() {
}

func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb.SchedulerForFrontendClient) {
backoffConfig := backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
}
// attemptLoop returns false if there was any error with forwarding requests to scheduler.
attemptLoop := func() bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel the stream after we are done to release resources

backoff := backoff.New(ctx, backoffConfig)
for backoff.Ongoing() {
loop, loopErr := client.FrontendLoop(ctx)
if loopErr != nil {
level.Error(w.log).Log("msg", "error contacting scheduler", "err", loopErr, "addr", w.schedulerAddr)
backoff.Wait()
continue
return false
}

loopErr = w.schedulerLoop(loop)
Expand All @@ -287,11 +284,22 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb

if loopErr != nil {
level.Error(w.log).Log("msg", "error sending requests to scheduler", "err", loopErr, "addr", w.schedulerAddr)
backoff.Wait()
continue
return false
}
return true
}

backoff.Reset()
backoffConfig := backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
}
backoff := backoff.New(ctx, backoffConfig)
for backoff.Ongoing() {
if !attemptLoop() {
backoff.Wait()
} else {
backoff.Reset()
}
}
}

Expand Down
74 changes: 65 additions & 9 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/querier/stats"
Expand All @@ -39,10 +42,14 @@ import (
const testFrontendWorkerConcurrency = 5

func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend) (*Frontend, *mockScheduler) {
return setupFrontendWithConcurrencyAndServerOptions(t, reg, schedulerReplyFunc, testFrontendWorkerConcurrency)
}

func setupFrontendWithConcurrencyAndServerOptions(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend, concurrency int, opts ...grpc.ServerOption) (*Frontend, *mockScheduler) {
l, err := net.Listen("tcp", "")
require.NoError(t, err)

server := grpc.NewServer()
server := grpc.NewServer(opts...)

h, p, err := net.SplitHostPort(l.Addr().String())
require.NoError(t, err)
Expand All @@ -53,12 +60,11 @@ func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc f
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.SchedulerAddress = l.Addr().String()
cfg.WorkerConcurrency = testFrontendWorkerConcurrency
cfg.WorkerConcurrency = concurrency
cfg.Addr = h
cfg.Port = grpcPort

//logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stdout)
f, err := NewFrontend(cfg, logger, reg)
require.NoError(t, err)

Expand All @@ -67,17 +73,18 @@ func setupFrontend(t *testing.T, reg prometheus.Registerer, schedulerReplyFunc f
ms := newMockScheduler(t, f, schedulerReplyFunc)
schedulerpb.RegisterSchedulerForFrontendServer(server, ms)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), f))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), f)
})

go func() {
_ = server.Serve(l)
}()

t.Cleanup(func() {
_ = l.Close()
server.GracefulStop()
})

require.NoError(t, services.StartAndAwaitRunning(context.Background(), f))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), f)
})

// Wait for frontend to connect to scheduler.
Expand Down Expand Up @@ -425,3 +432,52 @@ func TestConfig_Validate(t *testing.T) {
})
}
}

func TestWithClosingGrpcServer(t *testing.T) {
// This test is easier with single frontend worker.
const frontendConcurrency = 1
const userID = "test"

f, _ := setupFrontendWithConcurrencyAndServerOptions(t, nil, func(f *Frontend, msg *schedulerpb.FrontendToScheduler) *schedulerpb.SchedulerToFrontend {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.TOO_MANY_REQUESTS_PER_TENANT}
}, frontendConcurrency, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 100 * time.Millisecond,
MaxConnectionAge: 100 * time.Millisecond,
MaxConnectionAgeGrace: 100 * time.Millisecond,
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}))

// Connection will be established on the first roundtrip.
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int(resp.Code), http.StatusTooManyRequests)

// Verify that there is one stream open.
require.Equal(t, 1, checkStreamGoroutines())

// Wait a bit, to make sure that server closes connection.
time.Sleep(1 * time.Second)

// Despite server closing connections, stream-related goroutines still exist.
require.Equal(t, 1, checkStreamGoroutines())

// Another request will work as before, because worker will recreate connection.
resp, err = f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int(resp.Code), http.StatusTooManyRequests)

// There should still be only one stream open, and one goroutine created for it.
// Previously frontend leaked goroutine because stream that received "EOF" due to server closing the connection, never stopped its goroutine.
require.Equal(t, 1, checkStreamGoroutines())
}

func checkStreamGoroutines() int {
const streamGoroutineStackFrameTrailer = "created by google.golang.org/grpc.newClientStreamWithParams"

buf := make([]byte, 1000000)
stacklen := runtime.Stack(buf, true)

goroutineStacks := string(buf[:stacklen])
return strings.Count(goroutineStacks, streamGoroutineStackFrameTrailer)
}

0 comments on commit 040fa26

Please sign in to comment.