Skip to content

Commit

Permalink
Add stayalive endpoint
Browse files Browse the repository at this point in the history
This is used by Zui to prevent orhpaned Zed processes by exiting when a client
stops polling the stayalive endpoint.

Closes 4863
  • Loading branch information
mattnibs committed Jan 3, 2024
1 parent ec5165f commit f569eb2
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 162 deletions.
65 changes: 0 additions & 65 deletions cmd/mockzui/main.go

This file was deleted.

30 changes: 3 additions & 27 deletions cmd/zed/serve/command.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package serve

import (
"context"
"errors"
"flag"
"io"
"net"
"os"
"os/signal"
"runtime"
"syscall"

"github.com/brimdata/zed/cli"
Expand All @@ -31,7 +29,7 @@ The serve command listens for Zed lake API requests on the provided
interface and port, executes the requests, and returns results.
Requests may be issued to this service via the "zed api" command.
`,
HiddenFlags: "brimfd,portfile",
HiddenFlags: "stayalivetimeout,portfile",
New: New,
}

Expand All @@ -40,9 +38,6 @@ type Command struct {
conf service.Config
logflags logflags.Flags

// brimfd is a file descriptor passed through by Zui desktop. If set the
// command will exit if the fd is closed.
brimfd int
listenAddr string
portFile string
rootContentFile string
Expand All @@ -53,7 +48,6 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
c.conf.Auth.SetFlags(f)
c.conf.Version = cli.Version()
c.logflags.SetFlags(f)
f.IntVar(&c.brimfd, "brimfd", -1, "pipe read fd passed by Zui to signal Zui closure")
f.Func("cors.origin", "CORS allowed origin (may be repeated)", func(s string) error {
c.conf.CORSAllowedOrigins = append(c.conf.CORSAllowedOrigins, s)
return nil
Expand All @@ -62,6 +56,7 @@ func New(parent charm.Command, f *flag.FlagSet) (charm.Command, error) {
f.StringVar(&c.listenAddr, "l", ":9867", "[addr]:port to listen on")
f.StringVar(&c.portFile, "portfile", "", "write listen port to file")
f.StringVar(&c.rootContentFile, "rootcontentfile", "", "file to serve for GET /")
f.DurationVar(&c.conf.StayAliveTimeout, "stayalivetimeout", 0, "duration to shutdown after inactivity on stayalive endpoint (used by Zui to prevent orphaned Zed processes)")
return c, nil
}

Expand Down Expand Up @@ -92,16 +87,12 @@ func (c *Command) Run(args []string) error {
return err
}
defer logger.Sync()
if c.brimfd != -1 {
if ctx, err = c.watchBrimFd(ctx, logger); err != nil {
return err
}
}
c.conf.Logger = logger
core, err := service.NewCore(ctx, c.conf)
if err != nil {
return err
}
ctx = core.StayAliveContext(ctx)
defer core.Shutdown()
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt)
Expand All @@ -122,21 +113,6 @@ func (c *Command) Run(args []string) error {
return srv.Wait()
}

func (c *Command) watchBrimFd(ctx context.Context, logger *zap.Logger) (context.Context, error) {
if runtime.GOOS == "windows" {
return nil, errors.New("flag -brimfd not applicable to windows")
}
f := os.NewFile(uintptr(c.brimfd), "brimfd")
logger.Info("Listening to Zui process pipe", zap.String("fd", f.Name()))
ctx, cancel := context.WithCancel(ctx)
go func() {
io.Copy(io.Discard, f)
logger.Info("Brim fd closed, shutting down")
cancel()
}()
return ctx, nil
}

func (c *Command) writePortFile(addr string) error {
_, port, err := net.SplitHostPort(addr)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/httpd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (s *Server) serve(ctx context.Context, ln net.Listener) {
reason = "server error"
case <-ctx.Done():
reason = "context closed"
if err := context.Cause(ctx); !errors.Is(err, context.Canceled) {
reason = err.Error()
}
}

s.logger.Info("Shutting down", zap.String("reason", reason), zap.Error(srvErr))
Expand Down
63 changes: 52 additions & 11 deletions service/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/pprof"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/brimdata/zed/api"
Expand Down Expand Up @@ -49,6 +50,12 @@ type Config struct {
RootContent io.ReadSeeker
Version string
Logger *zap.Logger
// StayAliveTimeout when greater than 0 actiavtes the stayalive endpoint.
// If the stayalive endpoint has not been actively polled for this duration
// the Zed process will exit. This is used by the Zui application to ensure
// that the Zed process exits in the event that the Zui process is
// terminated with SIGKILL.
StayAliveTimeout time.Duration
}

type Core struct {
Expand All @@ -65,6 +72,9 @@ type Core struct {
runningQueriesMu sync.Mutex
subscriptions map[chan event]struct{}
subscriptionsMu sync.RWMutex
stayAliveCancel context.CancelCauseFunc
stayAliveCount atomic.Int32
stayAliveNotify chan struct{}
}

func NewCore(ctx context.Context, conf Config) (*Core, error) {
Expand Down Expand Up @@ -142,17 +152,18 @@ func NewCore(ctx context.Context, conf Config) (*Core, error) {
routerAPI.Use(corsMiddleware(conf.CORSAllowedOrigins))

c := &Core{
auth: authenticator,
compiler: compiler.NewLakeCompiler(root),
conf: conf,
engine: engine,
logger: conf.Logger.Named("core"),
root: root,
registry: registry,
routerAPI: routerAPI,
routerAux: routerAux,
runningQueries: make(map[string]*queryStatus),
subscriptions: make(map[chan event]struct{}),
auth: authenticator,
compiler: compiler.NewLakeCompiler(root),
conf: conf,
engine: engine,
logger: conf.Logger.Named("core"),
root: root,
registry: registry,
routerAPI: routerAPI,
routerAux: routerAux,
runningQueries: make(map[string]*queryStatus),
subscriptions: make(map[chan event]struct{}),
stayAliveNotify: make(chan struct{}),
}

c.addAPIServerRoutes()
Expand Down Expand Up @@ -244,6 +255,36 @@ func (c *Core) Shutdown() {
c.logger.Info("Shutdown")
}

// StayAliveContext returns a context that will be canceled if the
// StayAliveTimeout is reached. This call also enables the stayalive monitoring
// goroutine and should only be called once.
func (c *Core) StayAliveContext(ctx context.Context) context.Context {
if c.conf.StayAliveTimeout > 0 {
ctx, c.stayAliveCancel = context.WithCancelCause(ctx)
c.authhandle("/stayalive", handleStayAlive).Methods("GET")
go c.runStayAlive()
}
return ctx
}

func (c *Core) runStayAlive() {
// There's two states here: either the stayalive endpoint is active or it's
// not and we're counting down to shutdown.
for {
if c.stayAliveCount.Load() == 0 {
select {
case <-time.After(c.conf.StayAliveTimeout):
c.logger.Info("StayAlive timeout reached, shutting down")
c.stayAliveCancel(errors.New("stay alive timeout reached"))
return
case <-c.stayAliveNotify:
}
} else {
<-c.stayAliveNotify
}
}
}

func (c *Core) publishEvent(w *ResponseWriter, name string, data interface{}) {
marshaler := zson.NewZNGMarshaler()
marshaler.Decorate(zson.StyleSimple)
Expand Down
13 changes: 13 additions & 0 deletions service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,3 +782,16 @@ func handleEvents(c *Core, w *ResponseWriter, r *Request) {
}
}
}

func handleStayAlive(c *Core, w *ResponseWriter, r *Request) {
w.WriteHeader(http.StatusOK)
if flusher, ok := w.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
r.Logger.Info("Stay Alive entered")
c.stayAliveCount.Add(1)
c.stayAliveNotify <- struct{}{}
<-r.Context().Done()
c.stayAliveCount.Add(-1)
c.stayAliveNotify <- struct{}{}
}
49 changes: 0 additions & 49 deletions service/ztests/mockzui.sh

This file was deleted.

30 changes: 21 additions & 9 deletions service/ztests/orphaned-by-zui.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
script: |
source mockzui.sh
zed create test
echo ===
kill -9 $MOCKZUI_PID 2>/dev/null
LAKE_EXTRA_FLAGS="-stayalivetimeout=100ms -log.level=info" source service.sh
source await.sh
! curl -m 0.05 $ZED_LAKE/stayalive 2> /dev/null
awaitdeadservice
zq -z 'msg == "Shutting down" | drop ts' lake.log
inputs:
- name: mockzui.sh
source: mockzui.sh
- name: service.sh
- name: await.sh
data: |
function awaitdeadservice {
i=0
function servicealive { kill -0 $ZED_PID 2> /dev/null; }
while servicealive ; do
let i+=1
if [ $i -gt 50 ]; then
echo "timed out waiting for service to exit"
exit 1
fi
sleep 0.1
done
}
outputs:
- name: stdout
regexp: |
pool created: test \w{27}
===
data: |
{level:"info",logger:"httpd",msg:"Shutting down",reason:"stay alive timeout reached"}
3 changes: 2 additions & 1 deletion service/ztests/service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ zed serve -l=localhost:0 -lake=$lakeroot -portfile=$portdir/lake -log.level=warn
lakepid=$!
awaitfile $portdir/lake

trap "rm -rf $portdir; kill $lakepid;" EXIT
trap "rm -rf $portdir; kill $lakepid || true" EXIT

export ZED_LAKE=http://localhost:$(cat $portdir/lake)
export ZED_PID=$lakepid
export LAKE_PATH=$lakeroot

0 comments on commit f569eb2

Please sign in to comment.