Skip to content

Commit

Permalink
fix(v2): drain inflight requests (#3639)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Oct 21, 2024
1 parent 49dbcb5 commit 09b5ad1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 74 deletions.
70 changes: 1 addition & 69 deletions pkg/experiment/ingester/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"flag"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -74,7 +73,6 @@ type SegmentWriterService struct {
logger log.Logger
reg prometheus.Registerer

requests requests
lifecycler *ring.Lifecycler
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand Down Expand Up @@ -142,11 +140,7 @@ func New(
}

func (i *SegmentWriterService) starting(ctx context.Context) error {
if err := services.StartManagerAndAwaitHealthy(ctx, i.subservices); err != nil {
return err
}
i.requests.open()
return nil
return services.StartManagerAndAwaitHealthy(ctx, i.subservices)
}

func (i *SegmentWriterService) running(ctx context.Context) error {
Expand All @@ -159,19 +153,13 @@ func (i *SegmentWriterService) running(ctx context.Context) error {
}

func (i *SegmentWriterService) stopping(_ error) error {
i.requests.drain()
errs := multierror.New()
errs.Add(services.StopManagerAndAwaitStopped(context.Background(), i.subservices))
errs.Add(i.segmentWriter.Stop())
return errs.Err()
}

func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.PushRequest) (*segmentwriterv1.PushResponse, error) {
if !i.requests.add() {
return nil, status.Error(codes.Unavailable, "service is unavailable")
} else {
defer i.requests.done()
}
if req.TenantId == "" {
return nil, status.Error(codes.InvalidArgument, tenant.ErrNoTenantID.Error())
}
Expand Down Expand Up @@ -238,59 +226,3 @@ func (i *SegmentWriterService) CheckReady(ctx context.Context) error {
}
return i.lifecycler.CheckReady(ctx)
}

// The requests utility emerged due to the need to handle request draining at
// the service level.
//
// Ideally, this should be the responsibility of the server using the service.
// However, since the server is a dependency of the service and is only
// shut down after the service is stopped, requests may still arrive
// after the Stop call. This issue arises from how we initialize modules.
//
// In other scenarios, request draining could be managed at a higher level,
// such as in a load balancer or service discovery mechanism. The goal would
// be to stop routing requests to an instance that is about to shut down.
//
// In our case, segment writer service instances are not directly exposed to
// the outside world but are discoverable via the ring (see lifecycler).
// There's no _reliable_ mechanism to ensure that all the ring members are
// aware of fact that the instance is leaving, so requests may continue to
// arrive within a short period of time, until the membership state converges.
type requests struct {
mu sync.RWMutex
wg sync.WaitGroup
allowed bool // Indicates if new requests are allowed
}

// open allows new requests to be accepted.
func (r *requests) open() {
r.mu.Lock()
r.allowed = true
r.mu.Unlock()
}

// add increments the WaitGroup if new requests are allowed.
// Returns true if the request was accepted, false otherwise.
func (r *requests) add() bool {
r.mu.RLock()
defer r.mu.RUnlock()
if !r.allowed {
return false
}
r.wg.Add(1)
return true
}

// done decrements the WaitGroup, indicating a request has completed.
func (r *requests) done() {
r.wg.Done()
}

// drain prevents new requests from being accepted and waits for
// all ongoing requests to complete.
func (r *requests) drain() {
r.mu.Lock()
r.allowed = false
r.mu.Unlock()
r.wg.Wait()
}
5 changes: 3 additions & 2 deletions pkg/phlare/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
CompactionWorker string = "compaction-worker"
PlacementAgent string = "placement-agent"
PlacementManager string = "placement-manager"
ShutdownHelper string = "shutdown-helper"
)

var objectStoreTypeStats = usagestats.NewString("store_object_type")
Expand Down Expand Up @@ -753,9 +754,9 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S
return nil
case err := <-serverDone:
if err != nil {
return err
return fmt.Errorf("server stopped unexpectedly: %w", err)
}
return fmt.Errorf("server stopped unexpectedly")
return nil
}
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/phlare/modules_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"slices"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -198,3 +199,32 @@ func (f *Phlare) adaptivePlacementStore() adaptiveplacement.Store {
}
return adaptiveplacement.NewStore(f.storageBucket)
}

// The shutdown helper utility emerged due to the need to handle request
// draining at the server level.
//
// Since the server is a dependency of many services that handle requests
// and is only shut down after the services have stopped, there's a possibility
// that a de-initialized component may receive requests, which causes undefined
// behaviour.
//
// In other scenarios, request draining could be managed at a higher level,
// such as in a load balancer or the service discovery mechanism. However,
// there's no _reliable_ mechanism to ensure that all the clients are informed
// of the server's shutdown and confirmed that they have stopped sending
// requests to this specific instance.
//
// The helper should be de-initialized first in the dependency chain;
// immediately, it drains the gRPC server, thereby preventing any further
// requests from being processed. THe helper does not affect the HTTP
// server that serves metrics and profiles.
func (f *Phlare) initShutdownHelper() (services.Service, error) {
shutdownServer := func(error) error {
if f.Server.GRPC != nil {
level.Info(f.logger).Log("msg", "shutting down gRPC server")
f.Server.GRPC.GracefulStop()
}
return nil
}
return services.NewIdleService(nil, shutdownServer), nil
}
9 changes: 6 additions & 3 deletions pkg/phlare/phlare.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,25 +415,28 @@ func (f *Phlare) setupModuleManager() error {
SegmentWriterClient: {Overrides, API, SegmentWriterRing, PlacementAgent},
PlacementAgent: {Overrides, API, Storage},
PlacementManager: {Overrides, API, Storage},
ShutdownHelper: {Distributor, SegmentWriter, Metastore, QueryBackend},
}
for k, v := range experimentalModules {
deps[k] = v
}

deps[All] = append(deps[All], SegmentWriter, Metastore, CompactionWorker, QueryBackend)
deps[All] = append(deps[All], SegmentWriter, Metastore, CompactionWorker, QueryBackend, ShutdownHelper)
deps[QueryFrontend] = append(deps[QueryFrontend], MetastoreClient, QueryBackendClient)
deps[Distributor] = append(deps[Distributor], SegmentWriterClient)

mm.RegisterModule(SegmentWriter, f.initSegmentWriter)
mm.RegisterModule(SegmentWriterRing, f.initSegmentWriterRing, modules.UserInvisibleModule)
mm.RegisterModule(SegmentWriterClient, f.initSegmentWriterClient, modules.UserInvisibleModule)
mm.RegisterModule(Metastore, f.initMetastore)
mm.RegisterModule(CompactionWorker, f.initCompactionWorker)
mm.RegisterModule(QueryBackend, f.initQueryBackend)

mm.RegisterModule(SegmentWriterRing, f.initSegmentWriterRing, modules.UserInvisibleModule)
mm.RegisterModule(SegmentWriterClient, f.initSegmentWriterClient, modules.UserInvisibleModule)
mm.RegisterModule(MetastoreClient, f.initMetastoreClient, modules.UserInvisibleModule)
mm.RegisterModule(QueryBackendClient, f.initQueryBackendClient, modules.UserInvisibleModule)
mm.RegisterModule(PlacementAgent, f.initPlacementAgent, modules.UserInvisibleModule)
mm.RegisterModule(PlacementManager, f.initPlacementManager, modules.UserInvisibleModule)
mm.RegisterModule(ShutdownHelper, f.initShutdownHelper, modules.UserInvisibleModule)
}

for mod, targets := range deps {
Expand Down

0 comments on commit 09b5ad1

Please sign in to comment.