From bacae248590d634c1a5dc99870697d051b84331d Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 27 Nov 2024 21:05:47 +0100 Subject: [PATCH] Expose node component management --- cmd/node.go | 64 ++++++++++++++---- cmd/node_builder.go | 17 +++-- cmd/scaffold.go | 137 ++++++++++++++++++++------------------- cmd/scaffold_test.go | 2 +- module/metrics/server.go | 65 ++++++++++++++++--- 5 files changed, 188 insertions(+), 97 deletions(-) diff --git a/cmd/node.go b/cmd/node.go index f17b8181f5c..02ff4aad9ef 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -29,18 +29,53 @@ type Node interface { // The Run function starts all the components, and is blocked until either a termination // signal is received or a irrecoverable error is encountered. type FlowNodeImp struct { - component.Component + NodeImp *NodeConfig +} + +// NodeImp can be used to create a node instance from: +// - a logger: to be used during startup and shutdown +// - a component: that will be started with Run +// - a cleanup function: that will be called after the component has been stopped +// - a fatal error handler: to handle any error received from the component +type NodeImp struct { + component.Component logger zerolog.Logger postShutdown func() error fatalHandler func(error) } // NewNode returns a new node instance -func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logger, cleanup func() error, handleFatal func(error)) Node { +func NewNode( + component component.Component, + cfg *NodeConfig, + logger zerolog.Logger, + cleanup func() error, + handleFatal func(error), +) Node { return &FlowNodeImp{ + NodeConfig: cfg, + NodeImp: NewBaseNode( + component, + logger.With(). + Str("node_role", cfg.BaseConfig.NodeRole). + Hex("spork_id", logging.ID(cfg.SporkID)). + Logger(), + cleanup, + handleFatal, + ), + } +} + +// NewBaseNode returns a new base node instance +func NewBaseNode( + component component.Component, + logger zerolog.Logger, + cleanup func() error, + handleFatal func(error), +) NodeImp { + return NodeImp{ Component: component, - NodeConfig: cfg, logger: logger, postShutdown: cleanup, fatalHandler: handleFatal, @@ -51,13 +86,11 @@ func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logg // which point it gracefully shuts down. // Any unhandled irrecoverable errors thrown in child components will propagate up to here and // result in a fatal error. -func (node *FlowNodeImp) Run() { - // Cancelling this context notifies all child components that it's time to shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func (node *NodeImp) Run() { + ctx := context.Background() // Block until node is shutting down - err := node.run(ctx, cancel) + err := node.run(ctx) // Any error received is considered fatal. if err != nil { @@ -73,14 +106,18 @@ func (node *FlowNodeImp) Run() { node.logger.Error().Err(err).Msg("error encountered during cleanup") } - node.logger.Info().Msgf("%s node shutdown complete", node.BaseConfig.NodeRole) + node.logger.Info().Msg("node shutdown complete") } // run starts the node and blocks until a SIGINT/SIGTERM is received or an error is encountered. // It returns: // - nil if a termination signal is received, and all components have been gracefully stopped. -// - error if a irrecoverable error is received -func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) error { +// - error if an irrecoverable error is received +func (node *NodeImp) run(ctx context.Context) error { + // Cancelling this context notifies all child components that it's time to shut down + ctx, shutdown := context.WithCancel(ctx) + defer shutdown() + // Components will pass unhandled irrecoverable errors to this channel via signalerCtx (or a // child context). Any errors received on this channel should halt the node. signalerCtx, errChan := irrecoverable.WithSignaler(ctx) @@ -97,8 +134,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e select { case <-node.Ready(): node.logger.Info(). - Hex("spork_id", logging.ID(node.SporkID)). - Msgf("%s node startup complete", node.BaseConfig.NodeRole) + Msg("node startup complete") case <-ctx.Done(): } }() @@ -118,7 +154,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e // 3: Shut down // Send shutdown signal to components - node.logger.Info().Msgf("%s node shutting down", node.BaseConfig.NodeRole) + node.logger.Info().Msg("node shutting down") shutdown() // Block here until all components have stopped or an irrecoverable error is received. diff --git a/cmd/node_builder.go b/cmd/node_builder.go index 4c8aeaeb263..dc203f66ad4 100644 --- a/cmd/node_builder.go +++ b/cmd/node_builder.go @@ -33,7 +33,10 @@ import ( const NotSet = "not set" type BuilderFunc func(nodeConfig *NodeConfig) error -type ReadyDoneFactory func(node *NodeConfig) (module.ReadyDoneAware, error) + +// ReadyDoneFactory is a function that returns a ReadyDoneAware component or an error if +// the factory cannot create the component +type ReadyDoneFactory[Input any] func(input Input) (module.ReadyDoneAware, error) // NodeBuilder declares the initialization methods needed to bootstrap up a Flow node type NodeBuilder interface { @@ -73,7 +76,7 @@ type NodeBuilder interface { // The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance. // In both cases, the object is started according to its interface when the node is run, // and the node will wait for the component to exit gracefully. - Component(name string, f ReadyDoneFactory) NodeBuilder + Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder // DependableComponent adds a new component to the node that conforms to the ReadyDoneAware // interface. The builder will wait until all of the components in the dependencies list are ready @@ -86,7 +89,7 @@ type NodeBuilder interface { // IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all // dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method // MUST be idempotent. - DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder + DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder // RestartableComponent adds a new component to the node that conforms to the ReadyDoneAware // interface, and calls the provided error handler when an irrecoverable error is encountered. @@ -94,7 +97,7 @@ type NodeBuilder interface { // can/should be independently restarted when an irrecoverable error is encountered. // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. - RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder + RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder // ShutdownFunc adds a callback function that is called after all components have exited. // All shutdown functions are called regardless of errors returned by previous callbacks. Any @@ -299,16 +302,16 @@ func DefaultBaseConfig() *BaseConfig { // DependencyList is a slice of ReadyDoneAware implementations that are used by DependableComponent // to define the list of dependencies that must be ready before starting the component. type DependencyList struct { - components []module.ReadyDoneAware + Components []module.ReadyDoneAware } func NewDependencyList(components ...module.ReadyDoneAware) *DependencyList { return &DependencyList{ - components: components, + Components: components, } } // Add adds a new ReadyDoneAware implementation to the list of dependencies. func (d *DependencyList) Add(component module.ReadyDoneAware) { - d.components = append(d.components, component) + d.Components = append(d.Components, component) } diff --git a/cmd/scaffold.go b/cmd/scaffold.go index a3a95cfff15..c55ba5e51fd 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -108,12 +108,17 @@ type namedModuleFunc struct { name string } -type namedComponentFunc struct { - fn ReadyDoneFactory - name string +// NamedComponentFunc is wrapper for ReadyDoneFactory with additional fields: +// Name - name of the component +// ErrorHandler - error handler for the component +// Dependencies - list of dependencies for the component that should be ready before +// the component is started +type NamedComponentFunc[Input any] struct { + FN ReadyDoneFactory[Input] + Name string - errorHandler component.OnError - dependencies *DependencyList + ErrorHandler component.OnError + Dependencies *DependencyList } // FlowNodeBuilder is the default builder struct used for all flow nodes @@ -129,7 +134,7 @@ type FlowNodeBuilder struct { *NodeConfig flags *pflag.FlagSet modules []namedModuleFunc - components []namedComponentFunc + components []NamedComponentFunc[*NodeConfig] postShutdownFns []func() error preInitFns []BuilderFunc postInitFns []BuilderFunc @@ -1573,10 +1578,20 @@ func (fnb *FlowNodeBuilder) handleModules() error { return nil } -// handleComponents registers the component's factory method with the ComponentManager to be run +func (fnb *FlowNodeBuilder) handleComponents() error { + AddWorkersFromComponents(fnb.Logger, fnb.NodeConfig, fnb.componentBuilder, fnb.components) + return nil +} + +// AddWorkersFromComponents registers the component's factory method with the ComponentManager to be run // when the node starts. // It uses signal channels to ensure that components are started serially. -func (fnb *FlowNodeBuilder) handleComponents() error { +func AddWorkersFromComponents[Input any]( + log zerolog.Logger, + input Input, + componentBuilder component.ComponentManagerBuilder, + components []NamedComponentFunc[Input], +) { // The parent/started channels are used to enforce serial startup. // - parent is the started channel of the previous component. // - when a component is ready, it closes its started channel by calling the provided callback. @@ -1587,27 +1602,22 @@ func (fnb *FlowNodeBuilder) handleComponents() error { parent := make(chan struct{}) close(parent) - var err error - asyncComponents := []namedComponentFunc{} + asyncComponents := []NamedComponentFunc[Input]{} // Run all components - for _, f := range fnb.components { + for _, f := range components { // Components with explicit dependencies are not started serially - if f.dependencies != nil { + if f.Dependencies != nil { asyncComponents = append(asyncComponents, f) continue } started := make(chan struct{}) - if f.errorHandler != nil { - err = fnb.handleRestartableComponent(f, parent, func() { close(started) }) + if f.ErrorHandler != nil { + componentBuilder.AddWorker(WorkerFromRestartableComponent(log, input, f, parent, func() { close(started) })) } else { - err = fnb.handleComponent(f, parent, func() { close(started) }) - } - - if err != nil { - return fmt.Errorf("could not handle component %s: %w", f.name, err) + componentBuilder.AddWorker(WorkerFromComponent(log, input, f, parent, func() { close(started) })) } parent = started @@ -1616,17 +1626,12 @@ func (fnb *FlowNodeBuilder) handleComponents() error { // Components with explicit dependencies are run asynchronously, which means dependencies in // the dependency list must be initialized outside of the component factory. for _, f := range asyncComponents { - fnb.Logger.Debug().Str("component", f.name).Int("dependencies", len(f.dependencies.components)).Msg("handling component asynchronously") - err = fnb.handleComponent(f, util.AllReady(f.dependencies.components...), func() {}) - if err != nil { - return fmt.Errorf("could not handle dependable component %s: %w", f.name, err) - } + log.Debug().Str("component", f.Name).Int("dependencies", len(f.Dependencies.Components)).Msg("handling component asynchronously") + componentBuilder.AddWorker(WorkerFromComponent(log, input, f, util.AllReady(f.Dependencies.Components...), func() {})) } - - return nil } -// handleComponent constructs a component using the provided ReadyDoneFactory, and registers a +// WorkerFromComponent constructs a component using the provided ReadyDoneFactory, and registers a // worker with the ComponentManager to be run when the node is started. // // The ComponentManager starts all workers in parallel. Since some components have non-idempotent @@ -1639,27 +1644,27 @@ func (fnb *FlowNodeBuilder) handleComponents() error { // using their ReadyDoneAware interface. After components are updated to use the idempotent // ReadyDoneAware interface and explicitly wait for their dependencies to be ready, we can remove // this channel chaining. -func (fnb *FlowNodeBuilder) handleComponent(v namedComponentFunc, dependencies <-chan struct{}, started func()) error { +func WorkerFromComponent[Input any](log zerolog.Logger, input Input, v NamedComponentFunc[Input], dependencies <-chan struct{}, started func()) component.ComponentWorker { // Add a closure that starts the component when the node is started, and then waits for it to exit // gracefully. // Startup for all components will happen in parallel, and components can use their dependencies' // ReadyDoneAware interface to wait until they are ready. - fnb.componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + return func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { // wait for the dependencies to be ready before starting if err := util.WaitClosed(ctx, dependencies); err != nil { return } - logger := fnb.Logger.With().Str("component", v.name).Logger() + logger := log.With().Str("component", v.Name).Logger() logger.Info().Msg("component initialization started") // First, build the component using the factory method. - readyAware, err := v.fn(fnb.NodeConfig) + readyAware, err := v.FN(input) if err != nil { - ctx.Throw(fmt.Errorf("component %s initialization failed: %w", v.name, err)) + ctx.Throw(fmt.Errorf("component %s initialization failed: %w", v.Name, err)) } if readyAware == nil { - ctx.Throw(fmt.Errorf("component %s initialization failed: nil component", v.name)) + ctx.Throw(fmt.Errorf("component %s initialization failed: nil component", v.Name)) } logger.Info().Msg("component initialization complete") @@ -1695,20 +1700,24 @@ func (fnb *FlowNodeBuilder) handleComponent(v namedComponentFunc, dependencies < // Finally, wait until component has finished shutting down. <-readyAware.Done() logger.Info().Msg("component shutdown complete") - }) - - return nil + } } -// handleRestartableComponent constructs a component using the provided ReadyDoneFactory, and +// WorkerFromRestartableComponent constructs a component using the provided ReadyDoneFactory, and // registers a worker with the ComponentManager to be run when the node is started. // // Restartable Components are components that can be restarted after successfully handling // an irrecoverable error. // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. -func (fnb *FlowNodeBuilder) handleRestartableComponent(v namedComponentFunc, parentReady <-chan struct{}, started func()) error { - fnb.componentBuilder.AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { +func WorkerFromRestartableComponent[Input any]( + log zerolog.Logger, + input Input, + v NamedComponentFunc[Input], + parentReady <-chan struct{}, + started func(), +) component.ComponentWorker { + return func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { // wait for the previous component to be ready before starting if err := util.WaitClosed(ctx, parentReady); err != nil { return @@ -1723,12 +1732,12 @@ func (fnb *FlowNodeBuilder) handleRestartableComponent(v namedComponentFunc, par // from within the componentFactory started() - log := fnb.Logger.With().Str("component", v.name).Logger() + log := log.With().Str("component", v.Name).Logger() // This may be called multiple times if the component is restarted componentFactory := func() (component.Component, error) { log.Info().Msg("component initialization started") - c, err := v.fn(fnb.NodeConfig) + c, err := v.FN(input) if err != nil { return nil, err } @@ -1747,15 +1756,13 @@ func (fnb *FlowNodeBuilder) handleRestartableComponent(v namedComponentFunc, par return c.(component.Component), nil } - err := component.RunComponent(ctx, componentFactory, v.errorHandler) + err := component.RunComponent(ctx, componentFactory, v.ErrorHandler) if err != nil && !errors.Is(err, ctx.Err()) { - ctx.Throw(fmt.Errorf("component %s encountered an unhandled irrecoverable error: %w", v.name, err)) + ctx.Throw(fmt.Errorf("component %s encountered an unhandled irrecoverable error: %w", v.Name, err)) } log.Info().Msg("component shutdown complete") - }) - - return nil + } } // ExtraFlags enables binding additional flags beyond those defined in BaseConfig. @@ -1790,10 +1797,10 @@ func (fnb *FlowNodeBuilder) AdminCommand(command string, f func(config *NodeConf // The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance. // In both cases, the object is started when the node is run, and the node will wait for the // component to exit gracefully. -func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory) NodeBuilder { - fnb.components = append(fnb.components, namedComponentFunc{ - fn: f, - name: name, +func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder { + fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, }) return fnb } @@ -1809,26 +1816,26 @@ func (fnb *FlowNodeBuilder) Component(name string, f ReadyDoneFactory) NodeBuild // IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all // dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method // MUST be idempotent. -func (fnb *FlowNodeBuilder) DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder { +func (fnb *FlowNodeBuilder) DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder { // Note: dependencies are passed as a struct to allow updating the list after calling this method. // Passing a slice instead would result in out of sync metadata since slices are passed by reference - fnb.components = append(fnb.components, namedComponentFunc{ - fn: f, - name: name, - dependencies: dependencies, + fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, + Dependencies: dependencies, }) return fnb } // OverrideComponent adds given builder function to the components set of the node builder. If a builder function with that name // already exists, it will be overridden. -func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory) NodeBuilder { +func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder { for i := 0; i < len(fnb.components); i++ { - if fnb.components[i].name == name { + if fnb.components[i].Name == name { // found component with the name, override it. - fnb.components[i] = namedComponentFunc{ - fn: f, - name: name, + fnb.components[i] = NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, } return fnb @@ -1852,11 +1859,11 @@ func (fnb *FlowNodeBuilder) OverrideComponent(name string, f ReadyDoneFactory) N // Note: The ReadyDoneFactory method may be called multiple times if the component is restarted. // // Any irrecoverable errors thrown by the component will be passed to the provided error handler. -func (fnb *FlowNodeBuilder) RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder { - fnb.components = append(fnb.components, namedComponentFunc{ - fn: f, - name: name, - errorHandler: errorHandler, +func (fnb *FlowNodeBuilder) RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder { + fnb.components = append(fnb.components, NamedComponentFunc[*NodeConfig]{ + FN: f, + Name: name, + ErrorHandler: errorHandler, }) return fnb } diff --git a/cmd/scaffold_test.go b/cmd/scaffold_test.go index a37994356cc..d23663ec3b3 100644 --- a/cmd/scaffold_test.go +++ b/cmd/scaffold_test.go @@ -284,7 +284,7 @@ func TestOverrideModules(t *testing.T) { type testComponentDefinition struct { name string - factory ReadyDoneFactory + factory ReadyDoneFactory[*NodeConfig] errorHandler component.OnError } diff --git a/module/metrics/server.go b/module/metrics/server.go index cd8187b1fbd..a14cdcef653 100644 --- a/module/metrics/server.go +++ b/module/metrics/server.go @@ -3,20 +3,31 @@ package metrics import ( "context" "errors" + "net" "net/http" "strconv" "time" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" + + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" ) +// metricsServerShutdownTimeout is the time to wait for the server to shut down gracefully +const metricsServerShutdownTimeout = 5 * time.Second + // Server is the http server that will be serving the /metrics request for prometheus type Server struct { server *http.Server log zerolog.Logger + + startupCompleted chan struct{} } +var _ component.Component = (*Server)(nil) + // NewServer creates a new server that will start on the specified port, // and responds to only the `/metrics` endpoint func NewServer(log zerolog.Logger, port uint) *Server { @@ -28,16 +39,23 @@ func NewServer(log zerolog.Logger, port uint) *Server { log.Info().Str("address", addr).Str("endpoint", endpoint).Msg("metrics server started") m := &Server{ - server: &http.Server{Addr: addr, Handler: mux}, - log: log, + server: &http.Server{Addr: addr, Handler: mux}, + log: log, + startupCompleted: make(chan struct{}), } return m } -// Ready returns a channel that will close when the network stack is ready. -func (m *Server) Ready() <-chan struct{} { - ready := make(chan struct{}) +func (m *Server) Start(signalerContext irrecoverable.SignalerContext) { + defer close(m.startupCompleted) + + // pass the signaler context to the server so that the signaler context + // can control the server's lifetime + m.server.BaseContext = func(_ net.Listener) context.Context { + return signalerContext + } + go func() { if err := m.server.ListenAndServe(); err != nil { // http.ErrServerClosed is returned when Close or Shutdown is called @@ -45,13 +63,21 @@ func (m *Server) Ready() <-chan struct{} { if errors.Is(err, http.ErrServerClosed) { m.log.Debug().Err(err).Msg("metrics server shutdown") } else { - m.log.Err(err).Msg("error shutting down metrics server") + m.log.Err(err).Msg("error running metrics server") } } }() +} + +// Ready returns a channel that will close when the network stack is ready. +func (m *Server) Ready() <-chan struct{} { + ready := make(chan struct{}) + go func() { + <-m.startupCompleted close(ready) }() + return ready } @@ -59,10 +85,29 @@ func (m *Server) Ready() <-chan struct{} { func (m *Server) Done() <-chan struct{} { done := make(chan struct{}) go func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - _ = m.server.Shutdown(ctx) - cancel() - close(done) + <-m.startupCompleted + defer close(done) + + ctx, cancel := context.WithTimeout(context.Background(), metricsServerShutdownTimeout) + defer cancel() + + // shutdown the server gracefully + err := m.server.Shutdown(ctx) + if err == nil { + m.log.Info().Msg("metrics server graceful shutdown completed") + return + } + + if errors.Is(err, ctx.Err()) { + m.log.Warn().Msg("metrics server graceful shutdown timed out") + // shutdown the server forcefully + err := m.server.Close() + if err != nil { + m.log.Err(err).Msg("error closing metrics server") + } + } else { + m.log.Err(err).Msg("error shutting down metrics server") + } }() return done }