Skip to content

Commit

Permalink
next: upd more
Browse files Browse the repository at this point in the history
  • Loading branch information
ainar-g committed Nov 12, 2024
1 parent d729aa1 commit 586b0eb
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 33 deletions.
30 changes: 14 additions & 16 deletions internal/next/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func Main(embeddedFrontend fs.FS) {
frontend, err := frontendFromOpts(ctx, baseLogger, opts, embeddedFrontend)
errors.Check(err)

startCtx, startCancel := context.WithTimeout(ctx, defaultTimeoutStart)
defer startCancel()

confMgrConf := &configmgr.Config{
BaseLogger: baseLogger,
Logger: baseLogger.With(slogutil.KeyPrefix, "configmgr"),
Expand All @@ -58,15 +61,15 @@ func Main(embeddedFrontend fs.FS) {
FileName: opts.confFile,
}

confMgr, err := newConfigMgr(confMgrConf)
confMgr, err := configmgr.New(startCtx, confMgrConf)
errors.Check(err)

web := confMgr.Web()
err = web.Start(ctx)
err = web.Start(startCtx)
errors.Check(err)

dns := confMgr.DNS()
err = dns.Start(ctx)
err = dns.Start(startCtx)
errors.Check(err)

sigHdlr := newSignalHandler(
Expand All @@ -80,21 +83,16 @@ func Main(embeddedFrontend fs.FS) {
os.Exit(sigHdlr.handle(ctx))
}

// defaultTimeout is the timeout used for some operations where another timeout
// hasn't been defined yet.
const defaultTimeout = 5 * time.Second

// ctxWithDefaultTimeout is a helper function that returns a context with
// timeout set to defaultTimeout.
func ctxWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultTimeout)
}
// Default timeouts.
//
// TODO(a.garipov): Make configurable.
const (
defaultTimeoutStart = 1 * time.Minute
defaultTimeoutShutdown = 5 * time.Second
)

// newConfigMgr returns a new configuration manager using defaultTimeout as the
// context timeout.
func newConfigMgr(c *configmgr.Config) (m *configmgr.Manager, err error) {
ctx, cancel := ctxWithDefaultTimeout()
defer cancel()

func newConfigMgr(ctx context.Context, c *configmgr.Config) (m *configmgr.Manager, err error) {
return configmgr.New(ctx, c)
}
9 changes: 6 additions & 3 deletions internal/next/cmd/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ func (h *signalHandler) reconfigure(ctx context.Context) (err error) {

var errs []error

confMgr, err := newConfigMgr(h.confMgrConf)
ctx, cancel := context.WithTimeout(ctx, defaultTimeoutStart)
defer cancel()

confMgr, err := newConfigMgr(ctx, h.confMgrConf)
if err != nil {
errs = append(errs, fmt.Errorf("configuration manager: %w", err))
}
Expand Down Expand Up @@ -142,7 +145,7 @@ func (h *signalHandler) reconfigure(ctx context.Context) (err error) {

// shutdown gracefully shuts down all services.
func (h *signalHandler) shutdown(ctx context.Context) (status int) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
ctx, cancel := context.WithTimeout(ctx, h.shutdownTimeout)
defer cancel()

status = osutil.ExitCodeSuccess
Expand Down Expand Up @@ -173,7 +176,7 @@ func newSignalHandler(
signal: make(chan os.Signal, 1),
pidFile: pidFile,
services: svcs,
shutdownTimeout: defaultTimeout,
shutdownTimeout: defaultTimeoutShutdown,
}

notifier := osutil.DefaultSignalNotifier{}
Expand Down
17 changes: 9 additions & 8 deletions internal/next/websvc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type server struct {
initialAddr netip.AddrPort
}

// loggerKeyServer is the key used by [server] to identify itself.
const loggerKeyServer = "server"

// newServer returns a *server that is ready to serve HTTP queries. The TCP
// listener is not started. handler must not be nil.
func newServer(
Expand All @@ -55,7 +58,7 @@ func newServer(
u.Scheme = urlutil.SchemeHTTPS
}

logger := baseLogger.With("server", u)
logger := baseLogger.With(loggerKeyServer, u)

return &server{
mu: &sync.Mutex{},
Expand Down Expand Up @@ -96,11 +99,9 @@ func (s *server) localAddr() (addr net.Addr) {
func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) {
l, err := net.ListenTCP("tcp", net.TCPAddrFromAddrPort(s.initialAddr))
if err != nil {
err = fmt.Errorf("listening tcp: %w", err)

s.logger.ErrorContext(ctx, "listening tcp", slogutil.KeyError, err)

panic(fmt.Errorf("websvc: %s", err))
panic(fmt.Errorf("websvc: listening tcp: %w", err))
}

func() {
Expand All @@ -111,7 +112,7 @@ func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) {

// Reassign the address in case the port was zero.
s.url.Host = l.Addr().String()
s.logger = baseLogger.With("server", s.url)
s.logger = baseLogger.With(loggerKeyServer, s.url)
s.http.ErrorLog = slog.NewLogLogger(s.logger.Handler(), slog.LevelError)
}()

Expand All @@ -123,9 +124,9 @@ func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) {
return
}

err = fmt.Errorf("serving: %w", err)
s.logger.ErrorContext(ctx, "serve failed", slogutil.KeyError, err)
panic(fmt.Errorf("websvc: %s", err))
s.logger.ErrorContext(ctx, "serving", slogutil.KeyError, err)

panic(fmt.Errorf("websvc: serving: %w", err))
}

// shutdown shuts s down.
Expand Down
28 changes: 22 additions & 6 deletions internal/next/websvc/websvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,38 @@ func (svc *Service) Start(ctx context.Context) (err error) {
go svc.pprof.serve(ctx, svc.logger)
}

started := false
for !started {
return svc.wait(ctx)
}

// wait waits until either the context is canceled or all servers have started.
func (svc *Service) wait(ctx context.Context) (err error) {
for !svc.serversHaveStarted() {
select {
case <-ctx.Done():
return ctx.Err()
default:
started = true
for _, srv := range svc.servers {
started = started && srv.localAddr() != nil
}
// Wait and let the other goroutines do their job.
runtime.Gosched()
}
}

return nil
}

// serversHaveStarted returns true if all servers have started serving.
func (svc *Service) serversHaveStarted() (started bool) {
started = len(svc.servers) != 0
for _, srv := range svc.servers {
started = started && srv.localAddr() != nil
}

if svc.pprof != nil {
started = started && svc.pprof.localAddr() != nil
}

return started
}

// Shutdown implements the [agh.Service] interface for *Service. svc may be
// nil.
func (svc *Service) Shutdown(ctx context.Context) (err error) {
Expand Down

0 comments on commit 586b0eb

Please sign in to comment.