Skip to content

Commit

Permalink
operator: print callstack when exiting before preparing done (pingcap…
Browse files Browse the repository at this point in the history
…#51371) (pingcap#46) (pingcap#61)

close pingcap#51370

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
  • Loading branch information
2 people authored and GitHub Enterprise committed Apr 25, 2024
1 parent bfd813b commit 7967b0b
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 25 deletions.
26 changes: 2 additions & 24 deletions br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,17 @@ package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

func main() {
gCtx := context.Background()
ctx, cancel := context.WithCancel(gCtx)
defer cancel()

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

go func() {
sig := <-sc
fmt.Printf("\nGot signal [%v] to exit.\n", sig)
log.Warn("received signal to exit", zap.Stringer("signal", sig))
cancel()
fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit")
<-sc
// Even user use SIGTERM to exit, there isn't any checkpoint for resuming,
// hence returning fail exit code.
os.Exit(1)
}()
ctx, cancel := utils.StartExitSingleListener(gCtx)

rootCmd := &cobra.Command{
Use: "br",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ func (p *Preparer) workOnPendingRanges(ctx context.Context) error {
}

func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) error {
logutil.CL(ctx).Info("about to send wait apply to stores", zap.Int("to-stores", len(reqs)))
for store, req := range reqs {
logutil.CL(ctx).Info("sending wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions)))
stream, err := p.streamOf(ctx, store)
if err != nil {
return errors.Annotatef(err, "failed to dial the store %d", store)
Expand All @@ -379,7 +381,6 @@ func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) erro
if err != nil {
return errors.Annotatef(err, "failed to send message to the store %d", store)
}
logutil.CL(ctx).Info("sent wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions)))
}
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func hintAllReady() {
// AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config.
// This function will block until the context being canceled.
func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
utils.DumpGoroutineWhenExit.Store(true)
mgr, err := dialPD(ctx, &cfg.Config)
if err != nil {
return errors.Annotate(err, "failed to dial PD")
Expand Down Expand Up @@ -143,6 +144,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
if cfg.OnAllReady != nil {
cfg.OnAllReady()
}
utils.DumpGoroutineWhenExit.Store(false)
hintAllReady()
}()
defer func() {
Expand Down
57 changes: 57 additions & 0 deletions br/pkg/utils/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ package utils
import (
"context"
"crypto/tls"
"fmt"
"os"
"os/signal"
"runtime"
"strings"
"sync/atomic"
"syscall"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -155,3 +163,52 @@ func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context)
log.Warn("Encountered but ignored error while cleaning up.", zap.Error(err))
}
}

func AllStackInfo() []byte {
res := make([]byte, 256*units.KiB)
for {
n := runtime.Stack(res, true)
if n < len(res) {
return res[:n]
}
res = make([]byte, len(res)*2)
}
}

var (
DumpGoroutineWhenExit atomic.Bool
)

func StartExitSingleListener(ctx context.Context) (context.Context, context.CancelFunc) {
cx, cancel := context.WithCancel(ctx)
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
go func() {
sig := <-sc
dumpGoroutine := DumpGoroutineWhenExit.Load()
padding := strings.Repeat("=", 8)
printDelimate := func(s string) {
fmt.Printf("%s[ %s ]%s\n", padding, s, padding)
}
fmt.Println()
printDelimate(fmt.Sprintf("Got signal %v to exit.", sig))
printDelimate(fmt.Sprintf("Required Goroutine Dump = %v", dumpGoroutine))
if dumpGoroutine {
printDelimate("Start Dumping Goroutine")
_, _ = os.Stdout.Write(AllStackInfo())
printDelimate("End of Dumping Goroutine")
}
log.Warn("received signal to exit", zap.Stringer("signal", sig))
cancel()
fmt.Fprintln(os.Stderr, "gracefully shutting down, press ^C again to force exit")
<-sc
// Even user use SIGTERM to exit, there isn't any checkpoint for resuming,
// hence returning fail exit code.
os.Exit(1)
}()
return cx, cancel
}

0 comments on commit 7967b0b

Please sign in to comment.