From f569eb22a55b44e8fd6f2ea7570a1f292adafa45 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Tue, 2 Jan 2024 12:32:26 -0700 Subject: [PATCH] Add stayalive endpoint This is used by Zui to prevent orhpaned Zed processes by exiting when a client stops polling the stayalive endpoint. Closes 4863 --- cmd/mockzui/main.go | 65 ----------------------------- cmd/zed/serve/command.go | 30 ++----------- pkg/httpd/server.go | 3 ++ service/core.go | 63 +++++++++++++++++++++++----- service/handlers.go | 13 ++++++ service/ztests/mockzui.sh | 49 ---------------------- service/ztests/orphaned-by-zui.yaml | 30 +++++++++---- service/ztests/service.sh | 3 +- 8 files changed, 94 insertions(+), 162 deletions(-) delete mode 100644 cmd/mockzui/main.go delete mode 100644 service/ztests/mockzui.sh diff --git a/cmd/mockzui/main.go b/cmd/mockzui/main.go deleted file mode 100644 index fbdd75d5a5..0000000000 --- a/cmd/mockzui/main.go +++ /dev/null @@ -1,65 +0,0 @@ -// mockzui is a command for testing purposes only. It is designed to simulate -// the exact way Zui launches then forks a separate zed process. zed must be -// in $PATH for this to work. - -package main - -import ( - "bytes" - "flag" - "fmt" - "os" - "os/exec" -) - -func die(err error) { - if err != nil { - panic(err) - } -} - -var ( - pidfile string - portfile string - lakeroot string -) - -func init() { - flag.StringVar(&portfile, "portfile", "", "location to write zed lake serve port") - flag.StringVar(&pidfile, "pidfile", "", "location to write zed lake serve pid") - flag.StringVar(&lakeroot, "lake", "", "Zed lake location") - flag.Parse() -} - -func main() { - r, _, err := os.Pipe() - die(err) - - if portfile == "" { - fmt.Fprintln(os.Stderr, "must provide -portfile arg") - os.Exit(1) - } - if pidfile == "" { - fmt.Fprintln(os.Stderr, "must provide -pidfile arg") - os.Exit(1) - } - args := []string{ - "serve", - "-l=localhost:0", - "-lake=" + lakeroot, - "-log.level=warn", - "-portfile=" + portfile, - fmt.Sprintf("-brimfd=%d", r.Fd()), - } - stderr := bytes.NewBuffer(nil) - cmd := exec.Command("zed", args...) - cmd.Stderr = stderr - cmd.ExtraFiles = []*os.File{r} - - err = cmd.Start() - die(err) - pid := fmt.Sprintf("%d", cmd.Process.Pid) - err = os.WriteFile(pidfile, []byte(pid), 0644) - die(err) - cmd.Wait() -} diff --git a/cmd/zed/serve/command.go b/cmd/zed/serve/command.go index d6e42c8174..e9f92eebcc 100644 --- a/cmd/zed/serve/command.go +++ b/cmd/zed/serve/command.go @@ -1,14 +1,12 @@ package serve import ( - "context" "errors" "flag" "io" "net" "os" "os/signal" - "runtime" "syscall" "github.com/brimdata/zed/cli" @@ -31,7 +29,7 @@ The serve command listens for Zed lake API requests on the provided interface and port, executes the requests, and returns results. Requests may be issued to this service via the "zed api" command. `, - HiddenFlags: "brimfd,portfile", + HiddenFlags: "stayalivetimeout,portfile", New: New, } @@ -40,9 +38,6 @@ type Command struct { conf service.Config logflags logflags.Flags - // brimfd is a file descriptor passed through by Zui desktop. If set the - // command will exit if the fd is closed. - brimfd int listenAddr string portFile string rootContentFile string @@ -53,7 +48,6 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { c.conf.Auth.SetFlags(f) c.conf.Version = cli.Version() c.logflags.SetFlags(f) - f.IntVar(&c.brimfd, "brimfd", -1, "pipe read fd passed by Zui to signal Zui closure") f.Func("cors.origin", "CORS allowed origin (may be repeated)", func(s string) error { c.conf.CORSAllowedOrigins = append(c.conf.CORSAllowedOrigins, s) return nil @@ -62,6 +56,7 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { f.StringVar(&c.listenAddr, "l", ":9867", "[addr]:port to listen on") f.StringVar(&c.portFile, "portfile", "", "write listen port to file") f.StringVar(&c.rootContentFile, "rootcontentfile", "", "file to serve for GET /") + f.DurationVar(&c.conf.StayAliveTimeout, "stayalivetimeout", 0, "duration to shutdown after inactivity on stayalive endpoint (used by Zui to prevent orphaned Zed processes)") return c, nil } @@ -92,16 +87,12 @@ func (c *Command) Run(args []string) error { return err } defer logger.Sync() - if c.brimfd != -1 { - if ctx, err = c.watchBrimFd(ctx, logger); err != nil { - return err - } - } c.conf.Logger = logger core, err := service.NewCore(ctx, c.conf) if err != nil { return err } + ctx = core.StayAliveContext(ctx) defer core.Shutdown() sigch := make(chan os.Signal, 1) signal.Notify(sigch, os.Interrupt) @@ -122,21 +113,6 @@ func (c *Command) Run(args []string) error { return srv.Wait() } -func (c *Command) watchBrimFd(ctx context.Context, logger *zap.Logger) (context.Context, error) { - if runtime.GOOS == "windows" { - return nil, errors.New("flag -brimfd not applicable to windows") - } - f := os.NewFile(uintptr(c.brimfd), "brimfd") - logger.Info("Listening to Zui process pipe", zap.String("fd", f.Name())) - ctx, cancel := context.WithCancel(ctx) - go func() { - io.Copy(io.Discard, f) - logger.Info("Brim fd closed, shutting down") - cancel() - }() - return ctx, nil -} - func (c *Command) writePortFile(addr string) error { _, port, err := net.SplitHostPort(addr) if err != nil { diff --git a/pkg/httpd/server.go b/pkg/httpd/server.go index d1b7e56130..345d7ac295 100644 --- a/pkg/httpd/server.go +++ b/pkg/httpd/server.go @@ -77,6 +77,9 @@ func (s *Server) serve(ctx context.Context, ln net.Listener) { reason = "server error" case <-ctx.Done(): reason = "context closed" + if err := context.Cause(ctx); !errors.Is(err, context.Canceled) { + reason = err.Error() + } } s.logger.Info("Shutting down", zap.String("reason", reason), zap.Error(srvErr)) diff --git a/service/core.go b/service/core.go index ddfd3abed3..9c843f5366 100644 --- a/service/core.go +++ b/service/core.go @@ -10,6 +10,7 @@ import ( "net/http/pprof" "strings" "sync" + "sync/atomic" "time" "github.com/brimdata/zed/api" @@ -49,6 +50,12 @@ type Config struct { RootContent io.ReadSeeker Version string Logger *zap.Logger + // StayAliveTimeout when greater than 0 actiavtes the stayalive endpoint. + // If the stayalive endpoint has not been actively polled for this duration + // the Zed process will exit. This is used by the Zui application to ensure + // that the Zed process exits in the event that the Zui process is + // terminated with SIGKILL. + StayAliveTimeout time.Duration } type Core struct { @@ -65,6 +72,9 @@ type Core struct { runningQueriesMu sync.Mutex subscriptions map[chan event]struct{} subscriptionsMu sync.RWMutex + stayAliveCancel context.CancelCauseFunc + stayAliveCount atomic.Int32 + stayAliveNotify chan struct{} } func NewCore(ctx context.Context, conf Config) (*Core, error) { @@ -142,17 +152,18 @@ func NewCore(ctx context.Context, conf Config) (*Core, error) { routerAPI.Use(corsMiddleware(conf.CORSAllowedOrigins)) c := &Core{ - auth: authenticator, - compiler: compiler.NewLakeCompiler(root), - conf: conf, - engine: engine, - logger: conf.Logger.Named("core"), - root: root, - registry: registry, - routerAPI: routerAPI, - routerAux: routerAux, - runningQueries: make(map[string]*queryStatus), - subscriptions: make(map[chan event]struct{}), + auth: authenticator, + compiler: compiler.NewLakeCompiler(root), + conf: conf, + engine: engine, + logger: conf.Logger.Named("core"), + root: root, + registry: registry, + routerAPI: routerAPI, + routerAux: routerAux, + runningQueries: make(map[string]*queryStatus), + subscriptions: make(map[chan event]struct{}), + stayAliveNotify: make(chan struct{}), } c.addAPIServerRoutes() @@ -244,6 +255,36 @@ func (c *Core) Shutdown() { c.logger.Info("Shutdown") } +// StayAliveContext returns a context that will be canceled if the +// StayAliveTimeout is reached. This call also enables the stayalive monitoring +// goroutine and should only be called once. +func (c *Core) StayAliveContext(ctx context.Context) context.Context { + if c.conf.StayAliveTimeout > 0 { + ctx, c.stayAliveCancel = context.WithCancelCause(ctx) + c.authhandle("/stayalive", handleStayAlive).Methods("GET") + go c.runStayAlive() + } + return ctx +} + +func (c *Core) runStayAlive() { + // There's two states here: either the stayalive endpoint is active or it's + // not and we're counting down to shutdown. + for { + if c.stayAliveCount.Load() == 0 { + select { + case <-time.After(c.conf.StayAliveTimeout): + c.logger.Info("StayAlive timeout reached, shutting down") + c.stayAliveCancel(errors.New("stay alive timeout reached")) + return + case <-c.stayAliveNotify: + } + } else { + <-c.stayAliveNotify + } + } +} + func (c *Core) publishEvent(w *ResponseWriter, name string, data interface{}) { marshaler := zson.NewZNGMarshaler() marshaler.Decorate(zson.StyleSimple) diff --git a/service/handlers.go b/service/handlers.go index 7b22b8f72e..0ab2972ce2 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -782,3 +782,16 @@ func handleEvents(c *Core, w *ResponseWriter, r *Request) { } } } + +func handleStayAlive(c *Core, w *ResponseWriter, r *Request) { + w.WriteHeader(http.StatusOK) + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } + r.Logger.Info("Stay Alive entered") + c.stayAliveCount.Add(1) + c.stayAliveNotify <- struct{}{} + <-r.Context().Done() + c.stayAliveCount.Add(-1) + c.stayAliveNotify <- struct{}{} +} diff --git a/service/ztests/mockzui.sh b/service/ztests/mockzui.sh deleted file mode 100644 index 3f6c6e4144..0000000000 --- a/service/ztests/mockzui.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash - -# This file simulates a running version of Zui desktop. It forks a service process -# then sits forever on the main thread. - -function awaitdeadservice { - i=0 - function servicealive { kill -0 $LAKE_PID 2> /dev/null; } - while servicealive ; do - let i+=1 - if [ $i -gt 50 ]; then - echo "timed out waiting for service to exit" - exit 1 - fi - sleep 0.1 - done -} - -function awaitfile { - file=$1 - i=0 - until [ -f $file ]; do - let i+=1 - if [ $i -gt 50 ]; then - echo "timed out waiting for file \"$file\" to appear" - exit 1 - fi - sleep 0.1 - done -} - -mkdir -p lakeroot -lakeroot=lakeroot -tempdir=$(mktemp -d) - -mockzui -lake="$lakeroot" -portfile="$tempdir/port" -pidfile="$tempdir/pid" & -mockzuipid=$! - -# wait for service to start -awaitfile $tempdir/port -awaitfile $tempdir/pid - -export ZED_LAKE=http://localhost:$(cat $tempdir/port) -export LAKE_PID=$(cat $tempdir/pid) -export MOCKZUI_PID=$mockzuipid - -# ensure that lake service process isn't leaked -trap "kill -9 $LAKE_PID 2>/dev/null || :" EXIT -rm -rf $tempdir diff --git a/service/ztests/orphaned-by-zui.yaml b/service/ztests/orphaned-by-zui.yaml index f9ee181df5..e5c7fa1aac 100644 --- a/service/ztests/orphaned-by-zui.yaml +++ b/service/ztests/orphaned-by-zui.yaml @@ -1,16 +1,28 @@ script: | - source mockzui.sh - zed create test - echo === - kill -9 $MOCKZUI_PID 2>/dev/null + LAKE_EXTRA_FLAGS="-stayalivetimeout=100ms -log.level=info" source service.sh + source await.sh + ! curl -m 0.05 $ZED_LAKE/stayalive 2> /dev/null awaitdeadservice + zq -z 'msg == "Shutting down" | drop ts' lake.log inputs: - - name: mockzui.sh - source: mockzui.sh + - name: service.sh + - name: await.sh + data: | + function awaitdeadservice { + i=0 + function servicealive { kill -0 $ZED_PID 2> /dev/null; } + while servicealive ; do + let i+=1 + if [ $i -gt 50 ]; then + echo "timed out waiting for service to exit" + exit 1 + fi + sleep 0.1 + done + } outputs: - name: stdout - regexp: | - pool created: test \w{27} - === + data: | + {level:"info",logger:"httpd",msg:"Shutting down",reason:"stay alive timeout reached"} diff --git a/service/ztests/service.sh b/service/ztests/service.sh index ee4b8f631f..8e8e4a370a 100644 --- a/service/ztests/service.sh +++ b/service/ztests/service.sh @@ -26,7 +26,8 @@ zed serve -l=localhost:0 -lake=$lakeroot -portfile=$portdir/lake -log.level=warn lakepid=$! awaitfile $portdir/lake -trap "rm -rf $portdir; kill $lakepid;" EXIT +trap "rm -rf $portdir; kill $lakepid || true" EXIT export ZED_LAKE=http://localhost:$(cat $portdir/lake) +export ZED_PID=$lakepid export LAKE_PATH=$lakeroot