Skip to content

Commit

Permalink
fix: reduce spammy logs for pings (#3150)
Browse files Browse the repository at this point in the history
We call Ping over grpc at a frequent rate when waiting for services to
be ready, with each grpc failure being automatically logged (debug).
Also, soon language plugins will be using grpc and so the amount of ping
logs at start up will roughly double.
This PR tries to reduce the noise.

### Changes:
- Ping logs:
- Constant ping call failures are silenced (noop log sync passed into
the call)
- Refactor: moved more logic into `rpc.Ping()` so that each caller
doesn't need to write extra code, including
   - deadline handling
- returning the relevant error (the latest ping error if deadline
reached, otherwise the context error)

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
matt2e and github-actions[bot] authored Oct 29, 2024
1 parent a22b133 commit 3daf151
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 53 deletions.
15 changes: 2 additions & 13 deletions common/plugin/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,32 +161,21 @@ func Spawn[Client PingableClient](
return nil, nil, err
}

dialCtx, cancel := context.WithTimeout(ctx, opts.startTimeout)
defer cancel()

// Wait for the plugin to start.
client := rpc.Dial(makeClient, pluginEndpoint.String(), log.Trace)
pingErr := make(chan error)
go func() {
retry := backoff.Backoff{Min: pluginRetryDelay, Max: pluginRetryDelay}
err := rpc.Wait(dialCtx, retry, client)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Deliberately don't close pingErr because the select loop below
// will catch dialCtx closing and return a better error.
return
}
err := rpc.Wait(ctx, retry, opts.startTimeout, client)
pingErr <- err
close(pingErr)
}()

select {
case <-dialCtx.Done():
return nil, nil, fmt.Errorf("plugin timed out while starting: %w", dialCtx.Err())

case <-cmdCtx.Done():
return nil, nil, fmt.Errorf("plugin process died: %w", cmdCtx.Err())

case err := <-pingErr:
case err = <-pingErr:
if err != nil {
return nil, nil, fmt.Errorf("plugin failed to respond to ping: %w", err)
}
Expand Down
4 changes: 1 addition & 3 deletions frontend/cli/cmd_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ type benchCmd struct {
}

func (c *benchCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient) error {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil {
return fmt.Errorf("FTL cluster did not become ready: %w", err)
}
logger := log.FromContext(ctx)
Expand Down
4 changes: 1 addition & 3 deletions frontend/cli/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er

// Wait for the controller to come up.
client := ftlv1connect.NewControllerServiceClient(rpc.GetHTTPClient(b.Bind.String()), b.Bind.String())
waitCtx, cancel := context.WithTimeout(ctx, b.ControllerTimeout)
defer cancel()
if err := rpc.Wait(waitCtx, backoff.Backoff{}, client); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{}, b.ControllerTimeout, client); err != nil {
return fmt.Errorf("controller failed to start: %w", err)
}

Expand Down
4 changes: 1 addition & 3 deletions frontend/cli/cmd_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ type callCmd struct {
}

func (c *callCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli ftlv1connect.ControllerServiceClient) error {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil {
return err
}

Expand Down
4 changes: 1 addition & 3 deletions frontend/cli/cmd_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,5 @@ type pingCmd struct {
}

func (c *pingCmd) Run(ctx context.Context, controller ftlv1connect.ControllerServiceClient) error {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
return rpc.Wait(ctx, backoff.Backoff{Max: time.Second}, controller)
return rpc.Wait(ctx, backoff.Backoff{Max: time.Second}, c.Wait, controller) //nolint:wrapcheck
}
9 changes: 5 additions & 4 deletions frontend/cli/cmd_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ type replayCmd struct {
}

func (c *replayCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli ftlv1connect.ControllerServiceClient) error {
ctx, cancel := context.WithTimeout(ctx, c.Wait)
defer cancel()
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil {
// Wait timeout is for both pings to complete, not each ping individually
startTime := time.Now()

if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait, client); err != nil {
return fmt.Errorf("failed to wait for client: %w", err)
}

consoleServiceClient := rpc.Dial(pbconsoleconnect.NewConsoleServiceClient, cli.Endpoint.String(), log.Error)
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, consoleServiceClient); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait-time.Since(startTime), consoleServiceClient); err != nil {
return fmt.Errorf("failed to wait for console service client: %w", err)
}

Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
return err
}
if s.Provisioners > 0 {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, provisionerClient); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, s.StartupTimeout, provisionerClient); err != nil {
return fmt.Errorf("provisioner failed to start: %w", err)
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini
return fmt.Errorf("controller failed to start: %w", err)
}
if s.Provisioners > 0 {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, provisionerClient); err != nil {
if err := rpc.Wait(ctx, backoff.Backoff{Max: s.StartupTimeout}, s.StartupTimeout, provisionerClient); err != nil {
return fmt.Errorf("provisioner failed to start: %w", err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ func (p *externalPluginImpl) start(ctx context.Context, bind *url.URL, language
}

func (p *externalPluginImpl) ping(ctx context.Context) error {
retry := backoff.Backoff{}
heartbeatCtx, cancel := context.WithTimeout(ctx, launchTimeout)
defer cancel()
err := rpc.Wait(heartbeatCtx, retry, p.client)
err := rpc.Wait(ctx, backoff.Backoff{}, launchTimeout, p.client)
if err != nil {
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to connect to runner: %w", err))
}
Expand Down
71 changes: 53 additions & 18 deletions internal/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,66 @@ func (m mergedContext) Value(key any) any {
return m.values.Value(key)
}

type noopLogSync struct{}

var _ log.Sink = noopLogSync{}

func (noopLogSync) Log(entry log.Entry) error { return nil }

// Wait for a client to become available.
//
// This will repeatedly call Ping() every 100ms until the service becomes
// ready. TODO: This will probably need to be smarter at some point.
// This will repeatedly call Ping() according to the retry policy until the client is
// ready or the deadline is reached.
//
// If "ctx" is cancelled this will return ctx.Err()
func Wait(ctx context.Context, retry backoff.Backoff, client Pingable) error {
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
resp, err := client.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err == nil {
if resp.Msg.NotReady == nil {
return nil
//
// Usually rpc errors are logged, but this function will silence ping call errors, and
// returns the last error if the deadline is reached.
func Wait(ctx context.Context, retry backoff.Backoff, deadline time.Duration, client Pingable) error {
errChan := make(chan error)
ctx, cancel := context.WithTimeout(ctx, deadline)
defer cancel()

go func() {
logger := log.FromContext(ctx)
// create a context logger with a new one that does not log debug messages (which include each ping call failures)
silencedCtx := log.ContextWithLogger(ctx, log.New(log.Error, noopLogSync{}))

start := time.Now()
// keep track of the last ping error
var err error
for {
select {
case <-ctx.Done():
if err != nil && errors.Is(ctx.Err(), context.DeadlineExceeded) {
errChan <- err
} else {
errChan <- ctx.Err()
}
return
default:
}
var resp *connect.Response[ftlv1.PingResponse]
resp, err = client.Ping(silencedCtx, connect.NewRequest(&ftlv1.PingRequest{}))
if err == nil {
if resp.Msg.NotReady == nil {
logger.Debugf("Ping succeeded in %.2fs", time.Since(start).Seconds())
errChan <- nil
return
}
err = fmt.Errorf("service is not ready: %s", *resp.Msg.NotReady)
}
err = fmt.Errorf("service is not ready: %s", *resp.Msg.NotReady)
delay := retry.Duration()
logger.Tracef("Ping failed waiting %s for client: %+v", delay, err)
time.Sleep(delay)
}
delay := retry.Duration()
logger.Tracef("Ping failed waiting %s for client: %+v", delay, err)
time.Sleep(delay)
}()

err := <-errChan
if err != nil {
return err
}
return nil
}

// RetryStreamingClientStream will repeatedly call handler with the stream
Expand Down

0 comments on commit 3daf151

Please sign in to comment.