From 7d9993531559c3409e300dd8c082f35fe531af32 Mon Sep 17 00:00:00 2001 From: Matthew Nibecker Date: Tue, 2 Jan 2024 12:32:26 -0700 Subject: [PATCH] Add keepalive endpoint This is used by Zui to prevent orhpaned Zed processes by exiting when a client stops polling the keepalive endpoint. Closes 4863 --- cmd/mockzui/main.go | 65 ----------------------------- cmd/zed/serve/command.go | 33 +++------------ pkg/httpd/server.go | 3 ++ service/core.go | 11 +++++ service/handlers.go | 15 +++++++ service/ztests/mockzui.sh | 49 ---------------------- service/ztests/orphaned-by-zui.yaml | 29 ++++++++----- service/ztests/service.sh | 3 +- 8 files changed, 56 insertions(+), 152 deletions(-) delete mode 100644 cmd/mockzui/main.go delete mode 100644 service/ztests/mockzui.sh diff --git a/cmd/mockzui/main.go b/cmd/mockzui/main.go deleted file mode 100644 index fbdd75d5a5..0000000000 --- a/cmd/mockzui/main.go +++ /dev/null @@ -1,65 +0,0 @@ -// mockzui is a command for testing purposes only. It is designed to simulate -// the exact way Zui launches then forks a separate zed process. zed must be -// in $PATH for this to work. - -package main - -import ( - "bytes" - "flag" - "fmt" - "os" - "os/exec" -) - -func die(err error) { - if err != nil { - panic(err) - } -} - -var ( - pidfile string - portfile string - lakeroot string -) - -func init() { - flag.StringVar(&portfile, "portfile", "", "location to write zed lake serve port") - flag.StringVar(&pidfile, "pidfile", "", "location to write zed lake serve pid") - flag.StringVar(&lakeroot, "lake", "", "Zed lake location") - flag.Parse() -} - -func main() { - r, _, err := os.Pipe() - die(err) - - if portfile == "" { - fmt.Fprintln(os.Stderr, "must provide -portfile arg") - os.Exit(1) - } - if pidfile == "" { - fmt.Fprintln(os.Stderr, "must provide -pidfile arg") - os.Exit(1) - } - args := []string{ - "serve", - "-l=localhost:0", - "-lake=" + lakeroot, - "-log.level=warn", - "-portfile=" + portfile, - fmt.Sprintf("-brimfd=%d", r.Fd()), - } - stderr := bytes.NewBuffer(nil) - cmd := exec.Command("zed", args...) - cmd.Stderr = stderr - cmd.ExtraFiles = []*os.File{r} - - err = cmd.Start() - die(err) - pid := fmt.Sprintf("%d", cmd.Process.Pid) - err = os.WriteFile(pidfile, []byte(pid), 0644) - die(err) - cmd.Wait() -} diff --git a/cmd/zed/serve/command.go b/cmd/zed/serve/command.go index d6e42c8174..af88a248a3 100644 --- a/cmd/zed/serve/command.go +++ b/cmd/zed/serve/command.go @@ -1,14 +1,12 @@ package serve import ( - "context" "errors" "flag" "io" "net" "os" "os/signal" - "runtime" "syscall" "github.com/brimdata/zed/cli" @@ -31,7 +29,7 @@ The serve command listens for Zed lake API requests on the provided interface and port, executes the requests, and returns results. Requests may be issued to this service via the "zed api" command. `, - HiddenFlags: "brimfd,portfile", + HiddenFlags: "keepalive,portfile", New: New, } @@ -40,9 +38,7 @@ type Command struct { conf service.Config logflags logflags.Flags - // brimfd is a file descriptor passed through by Zui desktop. If set the - // command will exit if the fd is closed. - brimfd int + keepalive bool listenAddr string portFile string rootContentFile string @@ -53,7 +49,6 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { c.conf.Auth.SetFlags(f) c.conf.Version = cli.Version() c.logflags.SetFlags(f) - f.IntVar(&c.brimfd, "brimfd", -1, "pipe read fd passed by Zui to signal Zui closure") f.Func("cors.origin", "CORS allowed origin (may be repeated)", func(s string) error { c.conf.CORSAllowedOrigins = append(c.conf.CORSAllowedOrigins, s) return nil @@ -62,6 +57,7 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) { f.StringVar(&c.listenAddr, "l", ":9867", "[addr]:port to listen on") f.StringVar(&c.portFile, "portfile", "", "write listen port to file") f.StringVar(&c.rootContentFile, "rootcontentfile", "", "file to serve for GET /") + f.BoolVar(&c.keepalive, "keepalive", false, "enable keepalive endpoint (used by Zui to prevent orphaned Zed processes)") return c, nil } @@ -92,16 +88,14 @@ func (c *Command) Run(args []string) error { return err } defer logger.Sync() - if c.brimfd != -1 { - if ctx, err = c.watchBrimFd(ctx, logger); err != nil { - return err - } - } c.conf.Logger = logger core, err := service.NewCore(ctx, c.conf) if err != nil { return err } + if c.keepalive { + ctx = core.EnableKeepAlive(ctx) + } defer core.Shutdown() sigch := make(chan os.Signal, 1) signal.Notify(sigch, os.Interrupt) @@ -122,21 +116,6 @@ func (c *Command) Run(args []string) error { return srv.Wait() } -func (c *Command) watchBrimFd(ctx context.Context, logger *zap.Logger) (context.Context, error) { - if runtime.GOOS == "windows" { - return nil, errors.New("flag -brimfd not applicable to windows") - } - f := os.NewFile(uintptr(c.brimfd), "brimfd") - logger.Info("Listening to Zui process pipe", zap.String("fd", f.Name())) - ctx, cancel := context.WithCancel(ctx) - go func() { - io.Copy(io.Discard, f) - logger.Info("Brim fd closed, shutting down") - cancel() - }() - return ctx, nil -} - func (c *Command) writePortFile(addr string) error { _, port, err := net.SplitHostPort(addr) if err != nil { diff --git a/pkg/httpd/server.go b/pkg/httpd/server.go index d1b7e56130..345d7ac295 100644 --- a/pkg/httpd/server.go +++ b/pkg/httpd/server.go @@ -77,6 +77,9 @@ func (s *Server) serve(ctx context.Context, ln net.Listener) { reason = "server error" case <-ctx.Done(): reason = "context closed" + if err := context.Cause(ctx); !errors.Is(err, context.Canceled) { + reason = err.Error() + } } s.logger.Info("Shutting down", zap.String("reason", reason), zap.Error(srvErr)) diff --git a/service/core.go b/service/core.go index ddfd3abed3..096b165fae 100644 --- a/service/core.go +++ b/service/core.go @@ -10,6 +10,7 @@ import ( "net/http/pprof" "strings" "sync" + "sync/atomic" "time" "github.com/brimdata/zed/api" @@ -65,6 +66,8 @@ type Core struct { runningQueriesMu sync.Mutex subscriptions map[chan event]struct{} subscriptionsMu sync.RWMutex + keepAliveCancel context.CancelCauseFunc + keepAliveRunning atomic.Bool } func NewCore(ctx context.Context, conf Config) (*Core, error) { @@ -244,6 +247,14 @@ func (c *Core) Shutdown() { c.logger.Info("Shutdown") } +// EnableKeepAlive enables the keepalive endpoint that once initially polled +// with cancel the returned context if exited. +func (c *Core) EnableKeepAlive(ctx context.Context) context.Context { + ctx, c.keepAliveCancel = context.WithCancelCause(ctx) + c.authhandle("/keepalive", handleKeepAlive).Methods("GET") + return ctx +} + func (c *Core) publishEvent(w *ResponseWriter, name string, data interface{}) { marshaler := zson.NewZNGMarshaler() marshaler.Decorate(zson.StyleSimple) diff --git a/service/handlers.go b/service/handlers.go index 7b22b8f72e..4112d78d8e 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -782,3 +782,18 @@ func handleEvents(c *Core, w *ResponseWriter, r *Request) { } } } + +func handleKeepAlive(c *Core, w *ResponseWriter, r *Request) { + if !c.keepAliveRunning.CompareAndSwap(false, true) { + w.Error(srverr.ErrInvalid("keep alive already being polled")) + return + } + w.WriteHeader(http.StatusOK) + if flusher, ok := w.ResponseWriter.(http.Flusher); ok { + flusher.Flush() + } + r.Logger.Info("Keep Alive entered") + <-r.Context().Done() + r.Logger.Info("Keep Alive exited") + c.keepAliveCancel(errors.New("keep alive exited")) +} diff --git a/service/ztests/mockzui.sh b/service/ztests/mockzui.sh deleted file mode 100644 index 3f6c6e4144..0000000000 --- a/service/ztests/mockzui.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash - -# This file simulates a running version of Zui desktop. It forks a service process -# then sits forever on the main thread. - -function awaitdeadservice { - i=0 - function servicealive { kill -0 $LAKE_PID 2> /dev/null; } - while servicealive ; do - let i+=1 - if [ $i -gt 50 ]; then - echo "timed out waiting for service to exit" - exit 1 - fi - sleep 0.1 - done -} - -function awaitfile { - file=$1 - i=0 - until [ -f $file ]; do - let i+=1 - if [ $i -gt 50 ]; then - echo "timed out waiting for file \"$file\" to appear" - exit 1 - fi - sleep 0.1 - done -} - -mkdir -p lakeroot -lakeroot=lakeroot -tempdir=$(mktemp -d) - -mockzui -lake="$lakeroot" -portfile="$tempdir/port" -pidfile="$tempdir/pid" & -mockzuipid=$! - -# wait for service to start -awaitfile $tempdir/port -awaitfile $tempdir/pid - -export ZED_LAKE=http://localhost:$(cat $tempdir/port) -export LAKE_PID=$(cat $tempdir/pid) -export MOCKZUI_PID=$mockzuipid - -# ensure that lake service process isn't leaked -trap "kill -9 $LAKE_PID 2>/dev/null || :" EXIT -rm -rf $tempdir diff --git a/service/ztests/orphaned-by-zui.yaml b/service/ztests/orphaned-by-zui.yaml index f9ee181df5..d32c3b1602 100644 --- a/service/ztests/orphaned-by-zui.yaml +++ b/service/ztests/orphaned-by-zui.yaml @@ -1,16 +1,25 @@ script: | - source mockzui.sh - zed create test - echo === - kill -9 $MOCKZUI_PID 2>/dev/null - awaitdeadservice + LAKE_EXTRA_FLAGS="-keepalive -log.level=info" source service.sh + ! curl -m 0.1 $ZED_LAKE/keepalive 2> /dev/null + source await.sh + zq -z 'msg == "Shutting down" | drop ts' lake.log inputs: - - name: mockzui.sh - source: mockzui.sh + - name: service.sh + - name: await.sh + data: | + i=0 + function servicealive { kill -0 $ZED_PID 2> /dev/null; } + while servicealive ; do + let i+=1 + if [ $i -gt 50 ]; then + echo "timed out waiting for service to exit" + exit 1 + fi + sleep 0.1 + done outputs: - name: stdout - regexp: | - pool created: test \w{27} - === + data: | + {level:"info",logger:"httpd",msg:"Shutting down",reason:"keep alive exited"} diff --git a/service/ztests/service.sh b/service/ztests/service.sh index ee4b8f631f..8e8e4a370a 100644 --- a/service/ztests/service.sh +++ b/service/ztests/service.sh @@ -26,7 +26,8 @@ zed serve -l=localhost:0 -lake=$lakeroot -portfile=$portdir/lake -log.level=warn lakepid=$! awaitfile $portdir/lake -trap "rm -rf $portdir; kill $lakepid;" EXIT +trap "rm -rf $portdir; kill $lakepid || true" EXIT export ZED_LAKE=http://localhost:$(cat $portdir/lake) +export ZED_PID=$lakepid export LAKE_PATH=$lakeroot