Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(v2): drain inflight requests #3639

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 1 addition & 69 deletions pkg/experiment/ingester/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"flag"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -74,7 +73,6 @@ type SegmentWriterService struct {
logger log.Logger
reg prometheus.Registerer

requests requests
lifecycler *ring.Lifecycler
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand Down Expand Up @@ -142,11 +140,7 @@ func New(
}

func (i *SegmentWriterService) starting(ctx context.Context) error {
if err := services.StartManagerAndAwaitHealthy(ctx, i.subservices); err != nil {
return err
}
i.requests.open()
return nil
return services.StartManagerAndAwaitHealthy(ctx, i.subservices)
}

func (i *SegmentWriterService) running(ctx context.Context) error {
Expand All @@ -159,19 +153,13 @@ func (i *SegmentWriterService) running(ctx context.Context) error {
}

func (i *SegmentWriterService) stopping(_ error) error {
i.requests.drain()
errs := multierror.New()
errs.Add(services.StopManagerAndAwaitStopped(context.Background(), i.subservices))
errs.Add(i.segmentWriter.Stop())
return errs.Err()
}

func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.PushRequest) (*segmentwriterv1.PushResponse, error) {
if !i.requests.add() {
return nil, status.Error(codes.Unavailable, "service is unavailable")
} else {
defer i.requests.done()
}
if req.TenantId == "" {
return nil, status.Error(codes.InvalidArgument, tenant.ErrNoTenantID.Error())
}
Expand Down Expand Up @@ -238,59 +226,3 @@ func (i *SegmentWriterService) CheckReady(ctx context.Context) error {
}
return i.lifecycler.CheckReady(ctx)
}

// The requests utility emerged due to the need to handle request draining at
// the service level.
//
// Ideally, this should be the responsibility of the server using the service.
// However, since the server is a dependency of the service and is only
// shut down after the service is stopped, requests may still arrive
// after the Stop call. This issue arises from how we initialize modules.
//
// In other scenarios, request draining could be managed at a higher level,
// such as in a load balancer or service discovery mechanism. The goal would
// be to stop routing requests to an instance that is about to shut down.
//
// In our case, segment writer service instances are not directly exposed to
// the outside world but are discoverable via the ring (see lifecycler).
// There's no _reliable_ mechanism to ensure that all the ring members are
// aware of fact that the instance is leaving, so requests may continue to
// arrive within a short period of time, until the membership state converges.
type requests struct {
mu sync.RWMutex
wg sync.WaitGroup
allowed bool // Indicates if new requests are allowed
}

// open allows new requests to be accepted.
func (r *requests) open() {
r.mu.Lock()
r.allowed = true
r.mu.Unlock()
}

// add increments the WaitGroup if new requests are allowed.
// Returns true if the request was accepted, false otherwise.
func (r *requests) add() bool {
r.mu.RLock()
defer r.mu.RUnlock()
if !r.allowed {
return false
}
r.wg.Add(1)
return true
}

// done decrements the WaitGroup, indicating a request has completed.
func (r *requests) done() {
r.wg.Done()
}

// drain prevents new requests from being accepted and waits for
// all ongoing requests to complete.
func (r *requests) drain() {
r.mu.Lock()
r.allowed = false
r.mu.Unlock()
r.wg.Wait()
}
5 changes: 3 additions & 2 deletions pkg/phlare/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const (
CompactionWorker string = "compaction-worker"
PlacementAgent string = "placement-agent"
PlacementManager string = "placement-manager"
ShutdownHelper string = "shutdown-helper"
)

var objectStoreTypeStats = usagestats.NewString("store_object_type")
Expand Down Expand Up @@ -753,9 +754,9 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S
return nil
case err := <-serverDone:
if err != nil {
return err
return fmt.Errorf("server stopped unexpectedly: %w", err)
}
return fmt.Errorf("server stopped unexpectedly")
return nil
}
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/phlare/modules_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"slices"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -198,3 +199,32 @@ func (f *Phlare) adaptivePlacementStore() adaptiveplacement.Store {
}
return adaptiveplacement.NewStore(f.storageBucket)
}

// The shutdown helper utility emerged due to the need to handle request
// draining at the server level.
//
// Since the server is a dependency of many services that handle requests
// and is only shut down after the services have stopped, there's a possibility
// that a de-initialized component may receive requests, which causes undefined
// behaviour.
//
// In other scenarios, request draining could be managed at a higher level,
// such as in a load balancer or the service discovery mechanism. However,
// there's no _reliable_ mechanism to ensure that all the clients are informed
// of the server's shutdown and confirmed that they have stopped sending
// requests to this specific instance.
//
// The helper should be de-initialized first in the dependency chain;
// immediately, it drains the gRPC server, thereby preventing any further
// requests from being processed. THe helper does not affect the HTTP
// server that serves metrics and profiles.
func (f *Phlare) initShutdownHelper() (services.Service, error) {
shutdownServer := func(error) error {
if f.Server.GRPC != nil {
level.Info(f.logger).Log("msg", "shutting down gRPC server")
f.Server.GRPC.GracefulStop()
}
return nil
}
return services.NewIdleService(nil, shutdownServer), nil
}
9 changes: 6 additions & 3 deletions pkg/phlare/phlare.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,25 +415,28 @@ func (f *Phlare) setupModuleManager() error {
SegmentWriterClient: {Overrides, API, SegmentWriterRing, PlacementAgent},
PlacementAgent: {Overrides, API, Storage},
PlacementManager: {Overrides, API, Storage},
ShutdownHelper: {Distributor, SegmentWriter, Metastore, QueryBackend},
}
for k, v := range experimentalModules {
deps[k] = v
}

deps[All] = append(deps[All], SegmentWriter, Metastore, CompactionWorker, QueryBackend)
deps[All] = append(deps[All], SegmentWriter, Metastore, CompactionWorker, QueryBackend, ShutdownHelper)
deps[QueryFrontend] = append(deps[QueryFrontend], MetastoreClient, QueryBackendClient)
deps[Distributor] = append(deps[Distributor], SegmentWriterClient)

mm.RegisterModule(SegmentWriter, f.initSegmentWriter)
mm.RegisterModule(SegmentWriterRing, f.initSegmentWriterRing, modules.UserInvisibleModule)
mm.RegisterModule(SegmentWriterClient, f.initSegmentWriterClient, modules.UserInvisibleModule)
mm.RegisterModule(Metastore, f.initMetastore)
mm.RegisterModule(CompactionWorker, f.initCompactionWorker)
mm.RegisterModule(QueryBackend, f.initQueryBackend)

mm.RegisterModule(SegmentWriterRing, f.initSegmentWriterRing, modules.UserInvisibleModule)
mm.RegisterModule(SegmentWriterClient, f.initSegmentWriterClient, modules.UserInvisibleModule)
mm.RegisterModule(MetastoreClient, f.initMetastoreClient, modules.UserInvisibleModule)
mm.RegisterModule(QueryBackendClient, f.initQueryBackendClient, modules.UserInvisibleModule)
mm.RegisterModule(PlacementAgent, f.initPlacementAgent, modules.UserInvisibleModule)
mm.RegisterModule(PlacementManager, f.initPlacementManager, modules.UserInvisibleModule)
mm.RegisterModule(ShutdownHelper, f.initShutdownHelper, modules.UserInvisibleModule)
}

for mod, targets := range deps {
Expand Down
Loading