Skip to content

Commit

Permalink
Add stayalive endpoint
Browse files Browse the repository at this point in the history
This is used by Zui to prevent orhpaned Zed processes by exiting when a client
stops polling the stayalive endpoint.

Closes 4863
  • Loading branch information
mattnibs committed Jan 2, 2024
1 parent ec5165f commit eb8429a
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 163 deletions.
65 changes: 0 additions & 65 deletions cmd/mockzui/main.go

This file was deleted.

30 changes: 3 additions & 27 deletions cmd/zed/serve/command.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package serve

import (
"context"
"errors"
"flag"
"io"
"net"
"os"
"os/signal"
"runtime"
"syscall"

"github.com/brimdata/zed/cli"
Expand All @@ -31,7 +29,7 @@ The serve command listens for Zed lake API requests on the provided
interface and port, executes the requests, and returns results.
Requests may be issued to this service via the "zed api" command.
`,
HiddenFlags: "brimfd,portfile",
HiddenFlags: "enablestayalive,portfile",
New: New,
}

Expand All @@ -40,9 +38,6 @@ type Command struct {
conf service.Config
logflags logflags.Flags

// brimfd is a file descriptor passed through by Zui desktop. If set the
// command will exit if the fd is closed.
brimfd int
listenAddr string
portFile string
rootContentFile string
Expand All @@ -53,12 +48,12 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c.conf.Auth.SetFlags(f)
c.conf.Version = cli.Version()
c.logflags.SetFlags(f)
f.IntVar(&c.brimfd, "brimfd", -1, "pipe read fd passed by Zui to signal Zui closure")
f.Func("cors.origin", "CORS allowed origin (may be repeated)", func(s string) error {
c.conf.CORSAllowedOrigins = append(c.conf.CORSAllowedOrigins, s)
return nil
})
f.StringVar(&c.conf.DefaultResponseFormat, "defaultfmt", service.DefaultZedFormat, "default response format")
f.BoolVar(&c.conf.EnableStayAlive, "enablestayalive", false, "enable stayalive endpoint (used by Zui to prevent orphaned Zed processes)")
f.StringVar(&c.listenAddr, "l", ":9867", "[addr]:port to listen on")
f.StringVar(&c.portFile, "portfile", "", "write listen port to file")
f.StringVar(&c.rootContentFile, "rootcontentfile", "", "file to serve for GET /")
Expand Down Expand Up @@ -92,16 +87,12 @@ func (c *Command) Run(args []string) error {
return err
}
defer logger.Sync()
if c.brimfd != -1 {
if ctx, err = c.watchBrimFd(ctx, logger); err != nil {
return err
}
}
c.conf.Logger = logger
core, err := service.NewCore(ctx, c.conf)
if err != nil {
return err
}
ctx = core.StayAliveContext(ctx)
defer core.Shutdown()
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt)
Expand All @@ -122,21 +113,6 @@ func (c *Command) Run(args []string) error {
return srv.Wait()
}

func (c *Command) watchBrimFd(ctx context.Context, logger *zap.Logger) (context.Context, error) {
if runtime.GOOS == "windows" {
return nil, errors.New("flag -brimfd not applicable to windows")
}
f := os.NewFile(uintptr(c.brimfd), "brimfd")
logger.Info("Listening to Zui process pipe", zap.String("fd", f.Name()))
ctx, cancel := context.WithCancel(ctx)
go func() {
io.Copy(io.Discard, f)
logger.Info("Brim fd closed, shutting down")
cancel()
}()
return ctx, nil
}

func (c *Command) writePortFile(addr string) error {
_, port, err := net.SplitHostPort(addr)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/httpd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (s *Server) serve(ctx context.Context, ln net.Listener) {
reason = "server error"
case <-ctx.Done():
reason = "context closed"
if err := context.Cause(ctx); !errors.Is(err, context.Canceled) {
reason = err.Error()
}
}

s.logger.Info("Shutting down", zap.String("reason", reason), zap.Error(srvErr))
Expand Down
40 changes: 29 additions & 11 deletions service/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ type Config struct {
RootContent io.ReadSeeker
Version string
Logger *zap.Logger
// EnableStayAlive when true enables the stayalive endpoint. If a client
// begins polling the stayalive endpoint then exits, the Zed process will
// also exit. This is used by the Zui application to ensure that the Zed
// process exits in the event that the Zui process is terminated with
// SIGKILL.
EnableStayAlive bool
}

type Core struct {
Expand All @@ -65,6 +71,7 @@ type Core struct {
runningQueriesMu sync.Mutex
subscriptions map[chan event]struct{}
subscriptionsMu sync.RWMutex
stayAliveCancel context.CancelCauseFunc
}

func NewCore(ctx context.Context, conf Config) (*Core, error) {
Expand Down Expand Up @@ -142,17 +149,18 @@ func NewCore(ctx context.Context, conf Config) (*Core, error) {
routerAPI.Use(corsMiddleware(conf.CORSAllowedOrigins))

c := &Core{
auth: authenticator,
compiler: compiler.NewLakeCompiler(root),
conf: conf,
engine: engine,
logger: conf.Logger.Named("core"),
root: root,
registry: registry,
routerAPI: routerAPI,
routerAux: routerAux,
runningQueries: make(map[string]*queryStatus),
subscriptions: make(map[chan event]struct{}),
auth: authenticator,
compiler: compiler.NewLakeCompiler(root),
conf: conf,
engine: engine,
logger: conf.Logger.Named("core"),
root: root,
registry: registry,
routerAPI: routerAPI,
routerAux: routerAux,
runningQueries: make(map[string]*queryStatus),
subscriptions: make(map[chan event]struct{}),
stayAliveCancel: func(error) {}, // noop
}

c.addAPIServerRoutes()
Expand Down Expand Up @@ -186,6 +194,9 @@ func (c *Core) addAPIServerRoutes() {
c.authhandle("/pool/{pool}/stats", handlePoolStats).Methods("GET")
c.authhandle("/query", handleQuery).Methods("OPTIONS", "POST")
c.authhandle("/query/status/{requestID}", handleQueryStatus).Methods("GET")
if c.conf.EnableStayAlive {
c.authhandle("/stayalive", handleStayAlive).Methods("GET")
}
}

func (c *Core) handler(f func(*Core, *ResponseWriter, *Request)) http.Handler {
Expand Down Expand Up @@ -244,6 +255,13 @@ func (c *Core) Shutdown() {
c.logger.Info("Shutdown")
}

func (c *Core) StayAliveContext(ctx context.Context) context.Context {
if c.conf.EnableStayAlive {
ctx, c.stayAliveCancel = context.WithCancelCause(ctx)
}
return ctx
}

func (c *Core) publishEvent(w *ResponseWriter, name string, data interface{}) {
marshaler := zson.NewZNGMarshaler()
marshaler.Decorate(zson.StyleSimple)
Expand Down
8 changes: 8 additions & 0 deletions service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,3 +782,11 @@ func handleEvents(c *Core, w *ResponseWriter, r *Request) {
}
}
}

func handleStayAlive(c *Core, w *ResponseWriter, r *Request) {
r.Logger.Info("Stay Alive entered")
w.WriteHeader(http.StatusOK)
ctx := r.Context()
<-ctx.Done()
c.stayAliveCancel(errors.New("stay alive exited"))
}
49 changes: 0 additions & 49 deletions service/ztests/mockzui.sh

This file was deleted.

16 changes: 6 additions & 10 deletions service/ztests/orphaned-by-zui.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
script: |
source mockzui.sh
zed create test
echo ===
kill -9 $MOCKZUI_PID 2>/dev/null
awaitdeadservice
LAKE_EXTRA_FLAGS="-enablestayalive -log.level=info" source service.sh
! curl -m 0.1 $ZED_LAKE/stayalive 2> /dev/null
zq -z 'msg == "Shutting down" | drop ts' lake.log
inputs:
- name: mockzui.sh
source: mockzui.sh
- name: service.sh

outputs:
- name: stdout
regexp: |
pool created: test \w{27}
===
data: |
{level:"info",logger:"httpd",msg:"Shutting down",reason:"stay alive exited"}
2 changes: 1 addition & 1 deletion service/ztests/service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ zed serve -l=localhost:0 -lake=$lakeroot -portfile=$portdir/lake -log.level=warn
lakepid=$!
awaitfile $portdir/lake

trap "rm -rf $portdir; kill $lakepid;" EXIT
trap "rm -rf $portdir; kill $lakepid || true" EXIT

export ZED_LAKE=http://localhost:$(cat $portdir/lake)
export LAKE_PATH=$lakeroot

0 comments on commit eb8429a

Please sign in to comment.