diff --git a/integration/helpers.go b/integration/helpers.go index d21becf30b98a..eea278af97910 100644 --- a/integration/helpers.go +++ b/integration/helpers.go @@ -1103,26 +1103,15 @@ func (i *TeleInstance) Start() error { // Build a list of expected events to wait for before unblocking based off // the configuration passed in. expectedEvents := []string{} - if i.Config.Auth.Enabled { - expectedEvents = append(expectedEvents, service.AuthTLSReady) - } + // Always wait for TeleportReadyEvent. + expectedEvents = append(expectedEvents, service.TeleportReadyEvent) if i.Config.Proxy.Enabled { expectedEvents = append(expectedEvents, service.ProxyReverseTunnelReady) - expectedEvents = append(expectedEvents, service.ProxySSHReady) expectedEvents = append(expectedEvents, service.ProxyAgentPoolReady) if !i.Config.Proxy.DisableWebService { expectedEvents = append(expectedEvents, service.ProxyWebServerReady) } } - if i.Config.SSH.Enabled { - expectedEvents = append(expectedEvents, service.NodeSSHReady) - } - if i.Config.Apps.Enabled { - expectedEvents = append(expectedEvents, service.AppsReady) - } - if i.Config.Databases.Enabled { - expectedEvents = append(expectedEvents, service.DatabasesReady) - } // Start the process and block until the expected events have arrived. receivedEvents, err := startAndWait(i.Process, expectedEvents) diff --git a/integration/integration_test.go b/integration/integration_test.go index 1fe08ec18544c..a075b27fe501c 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -4359,6 +4359,17 @@ func waitForProcessStart(serviceC chan *service.TeleportProcess) (*service.Telep dumpGoroutineProfile() return nil, trace.BadParameter("timeout waiting for service to start") } + + eventC := make(chan service.Event, 1) + svc.WaitForEvent(context.TODO(), service.TeleportReadyEvent, eventC) + select { + case <-eventC: + + case <-time.After(20 * time.Second): + dumpGoroutineProfile() + return nil, trace.BadParameter("timeout waiting for service to broadcast ready status") + } + return svc, nil } diff --git a/lib/service/desktop.go b/lib/service/desktop.go index 8263c7d9f9fed..ff0c9ae40de1d 100644 --- a/lib/service/desktop.go +++ b/lib/service/desktop.go @@ -248,6 +248,10 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. "Windows desktop service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } + + // since srv.Serve is a blocking call, we emit this event right before + // the service has started + process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil}) err := srv.Serve(listener) if err != nil { if err == http.ErrServerClosed { diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index 70d4cca899d50..c5c4aea33c0c9 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -271,6 +271,8 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C "Kubernetes service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, listener.Addr()) } + // since kubeServer.Serve is a blocking call, we emit this event right before + // the service has started process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil}) err := kubeServer.Serve(listener) if err != nil { diff --git a/lib/service/service.go b/lib/service/service.go index 6c66f8565016d..749f9fe4645d2 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -173,10 +173,6 @@ const ( // is ready to start accepting connections. DatabasesReady = "DatabasesReady" - // MetricsReady is generated when the Teleport metrics service is ready to - // start accepting connections. - MetricsReady = "MetricsReady" - // WindowsDesktopReady is generated when the Teleport windows desktop // service is ready to start accepting connections. WindowsDesktopReady = "WindowsDesktopReady" @@ -513,6 +509,20 @@ func Run(ctx context.Context, cfg Config, newTeleport NewProcess) error { if err := srv.Start(); err != nil { return trace.Wrap(err, "startup failed") } + + // Wait for the service to report that it has started. + startTimeoutCtx, startCancel := context.WithTimeout(ctx, signalPipeTimeout) + defer startCancel() + eventC := make(chan Event, 1) + srv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent, eventC) + select { + case <-eventC: + cfg.Log.Infof("Service has started successfully.") + case <-startTimeoutCtx.Done(): + warnOnErr(srv.Close(), cfg.Log) + return trace.BadParameter("service has failed to start") + } + // Wait and reload until called exit. for { srv, err = waitAndReload(ctx, cfg, srv, newTeleport) @@ -761,36 +771,8 @@ func NewTeleport(cfg *Config) (*TeleportProcess, error) { cfg.Keygen = native.New(process.ExitContext(), native.PrecomputeKeys(precomputeCount)) } - // Produce global TeleportReadyEvent - // when all components have started - eventMapping := EventMapping{ - Out: TeleportReadyEvent, - } - if cfg.Auth.Enabled { - eventMapping.In = append(eventMapping.In, AuthTLSReady) - } - if cfg.SSH.Enabled { - eventMapping.In = append(eventMapping.In, NodeSSHReady) - } - if cfg.Proxy.Enabled { - eventMapping.In = append(eventMapping.In, ProxySSHReady) - } - if cfg.Kube.Enabled { - eventMapping.In = append(eventMapping.In, KubernetesReady) - } - if cfg.Apps.Enabled { - eventMapping.In = append(eventMapping.In, AppsReady) - } - if cfg.Databases.Enabled { - eventMapping.In = append(eventMapping.In, DatabasesReady) - } - if cfg.Metrics.Enabled { - eventMapping.In = append(eventMapping.In, MetricsReady) - } - if cfg.WindowsDesktop.Enabled { - eventMapping.In = append(eventMapping.In, WindowsDesktopReady) - } - process.RegisterEventMapping(eventMapping) + // Produce global TeleportReadyEvent when all components have started + process.registerTeleportReadyEvent(cfg) if cfg.Auth.Enabled { if err := process.initAuthService(); err != nil { @@ -1381,7 +1363,7 @@ func (process *TeleportProcess) initAuthService() error { utils.Consolef(cfg.Console, log, teleport.ComponentAuth, "Auth service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, authAddr) - // since tlsServer.Serve is a blocking call, we emit this even right before + // since tlsServer.Serve is a blocking call, we emit this event right before // the service has started process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil}) err := tlsServer.Serve() @@ -3464,6 +3446,44 @@ func (process *TeleportProcess) waitForAppDepend() { } } +// registerTeleportReadyEvent ensures that a TeleportReadyEvent is produced +// when all components have started. +func (process *TeleportProcess) registerTeleportReadyEvent(cfg *Config) { + eventMapping := EventMapping{ + Out: TeleportReadyEvent, + } + + if cfg.Auth.Enabled { + eventMapping.In = append(eventMapping.In, AuthTLSReady) + } + + if cfg.SSH.Enabled { + eventMapping.In = append(eventMapping.In, NodeSSHReady) + } + + if cfg.Proxy.Enabled { + eventMapping.In = append(eventMapping.In, ProxySSHReady) + } + + if cfg.Kube.Enabled { + eventMapping.In = append(eventMapping.In, KubernetesReady) + } + + if cfg.Apps.Enabled { + eventMapping.In = append(eventMapping.In, AppsReady) + } + + if cfg.Databases.Enabled { + eventMapping.In = append(eventMapping.In, DatabasesReady) + } + + if cfg.WindowsDesktop.Enabled { + eventMapping.In = append(eventMapping.In, WindowsDesktopReady) + } + + process.RegisterEventMapping(eventMapping) +} + // appDependEvents is a list of events that the application service depends on. var appDependEvents = []string{ AuthTLSReady, diff --git a/lib/service/signals.go b/lib/service/signals.go index 21fd8db5fe66e..de8799e029acd 100644 --- a/lib/service/signals.go +++ b/lib/service/signals.go @@ -153,7 +153,7 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { } // ErrTeleportReloading is returned when signal waiter exits -// because the teleport process has initiaded shutdown +// because the teleport process has initiated shutdown var ErrTeleportReloading = &trace.CompareFailedError{Message: "teleport process is reloading"} // ErrTeleportExited means that teleport has exited diff --git a/lib/service/supervisor.go b/lib/service/supervisor.go index a059f80766bb7..f744c175491cf 100644 --- a/lib/service/supervisor.go +++ b/lib/service/supervisor.go @@ -55,7 +55,7 @@ type Supervisor interface { Wait() error // Run starts and waits for the service to complete - // it's a combinatioin Start() and Wait() + // it's a combination Start() and Wait() Run() error // Services returns list of running services @@ -362,33 +362,26 @@ func (s *LocalSupervisor) BroadcastEvent(event Event) { s.signalReload() } - s.events[event.Name] = event + sendEvent := func(e Event) { + select { + case s.eventsC <- e: + case <-s.closeContext.Done(): + } + } + s.events[event.Name] = event + go sendEvent(event) // Log all events other than recovered events to prevent the logs from // being flooded. if event.String() != TeleportOKEvent { s.log.WithField("event", event.String()).Debug("Broadcasting event.") } - go func() { - select { - case s.eventsC <- event: - case <-s.closeContext.Done(): - return - } - }() - for _, m := range s.eventMappings { if m.matches(event.Name, s.events) { mappedEvent := Event{Name: m.Out} s.events[mappedEvent.Name] = mappedEvent - go func(e Event) { - select { - case s.eventsC <- e: - case <-s.closeContext.Done(): - return - } - }(mappedEvent) + go sendEvent(mappedEvent) s.log.WithFields(logrus.Fields{ "in": event.String(), "out": m.String(), @@ -416,7 +409,7 @@ func (s *LocalSupervisor) WaitForEvent(ctx context.Context, name string, eventC waiter := &waiter{eventC: eventC, context: ctx} event, ok := s.events[name] if ok { - go s.notifyWaiter(waiter, event) + go waiter.notify(event) return } s.eventWaiters[name] = append(s.eventWaiters[name], waiter) @@ -432,20 +425,13 @@ func (s *LocalSupervisor) getWaiters(name string) []*waiter { return out } -func (s *LocalSupervisor) notifyWaiter(w *waiter, event Event) { - select { - case w.eventC <- event: - case <-w.context.Done(): - } -} - func (s *LocalSupervisor) fanOut() { for { select { case event := <-s.eventsC: waiters := s.getWaiters(event.Name) for _, waiter := range waiters { - go s.notifyWaiter(waiter, event) + go waiter.notify(event) } case <-s.closeContext.Done(): return @@ -458,6 +444,13 @@ type waiter struct { context context.Context } +func (w *waiter) notify(event Event) { + select { + case w.eventC <- event: + case <-w.context.Done(): + } +} + // Service is a running teleport service function type Service interface { // Serve starts the function