From d8b1c86ac823b67edd7cfb7e9a2b3bc6a2bbbc57 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Mon, 16 Sep 2024 11:20:23 +0200 Subject: [PATCH 1/8] feat(server/v2): wire telemetry --- server/v2/api/grpcgateway/server.go | 5 +- server/v2/api/telemetry/config.go | 15 ++++++ server/v2/api/telemetry/metrics.go | 4 +- server/v2/api/telemetry/register.go | 48 ++++++++++++++++++ server/v2/api/telemetry/server.go | 75 +++++++++++++++++------------ simapp/v2/simdv2/cmd/commands.go | 2 + 6 files changed, 116 insertions(+), 33 deletions(-) create mode 100644 server/v2/api/telemetry/register.go diff --git a/server/v2/api/grpcgateway/server.go b/server/v2/api/grpcgateway/server.go index 05a6bf1aa829..129d6bfdc913 100644 --- a/server/v2/api/grpcgateway/server.go +++ b/server/v2/api/grpcgateway/server.go @@ -17,7 +17,10 @@ import ( serverv2 "cosmossdk.io/server/v2" ) -var _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil) +var ( + _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil) + _ serverv2.HasConfig = (*GRPCGatewayServer[transaction.Tx])(nil) +) const ( ServerName = "grpc-gateway" diff --git a/server/v2/api/telemetry/config.go b/server/v2/api/telemetry/config.go index 63a37ed1f39d..5e2bfed338df 100644 --- a/server/v2/api/telemetry/config.go +++ b/server/v2/api/telemetry/config.go @@ -1,5 +1,20 @@ package telemetry +func DefaultConfig() *Config { + return &Config{ + ServiceName: "", + Enabled: true, + EnableHostname: false, + EnableHostnameLabel: false, + EnableServiceLabel: false, + PrometheusRetentionTime: 0, + GlobalLabels: nil, + MetricsSink: "", + StatsdAddr: "", + DatadogHostname: "", + } +} + type Config struct { // Prefixed with keys to separate services ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."` diff --git a/server/v2/api/telemetry/metrics.go b/server/v2/api/telemetry/metrics.go index 39055af6739b..d9da4bf110fc 100644 --- a/server/v2/api/telemetry/metrics.go +++ b/server/v2/api/telemetry/metrics.go @@ -58,7 +58,7 @@ type GatherResponse struct { } // New creates a new instance of Metrics -func New(cfg Config) (_ *Metrics, rerr error) { +func NewMetrics(cfg *Config) (*Metrics, error) { if !cfg.Enabled { return nil, nil } @@ -89,7 +89,7 @@ func New(cfg Config) (_ *Metrics, rerr error) { sink = memSink inMemSig := metrics.DefaultInmemSignal(memSink) defer func() { - if rerr != nil { + if err != nil { inMemSig.Stop() } }() diff --git a/server/v2/api/telemetry/register.go b/server/v2/api/telemetry/register.go new file mode 100644 index 000000000000..23efe4b4516d --- /dev/null +++ b/server/v2/api/telemetry/register.go @@ -0,0 +1,48 @@ +package telemetry + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/gorilla/mux" +) + +// RegisterMetrics registers the metrics handler to the provided router. +func (s TelemetryServer[T]) RegisterMetrics(r mux.Router) (*Metrics, error) { + m, err := NewMetrics(s.config) + if err != nil { + return nil, err + } + + metricsHandler := func(w http.ResponseWriter, r *http.Request) { + format := strings.TrimSpace(r.FormValue("format")) + + gr, err := m.Gather(format) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)}) + if err != nil { + return + } + _, _ = w.Write(bz) + + return + } + + w.Header().Set("Content-Type", gr.ContentType) + _, _ = w.Write(gr.Metrics) + } + + r.HandleFunc("/metrics", metricsHandler).Methods("GET") + + return m, nil +} + +// errorResponse defines the attributes of a JSON error response. +type errorResponse struct { + Code int `json:"code,omitempty"` + Error string `json:"error"` +} diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index a944fc7f4f03..d455b57153ab 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -1,47 +1,62 @@ package telemetry import ( - "encoding/json" + "context" "fmt" - "net/http" - "strings" - "github.com/gorilla/mux" + "cosmossdk.io/core/transaction" + "cosmossdk.io/log" + serverv2 "cosmossdk.io/server/v2" ) -func RegisterMetrics(r mux.Router, cfg Config) (*Metrics, error) { - m, err := New(cfg) - if err != nil { - return nil, err - } +var ( + _ serverv2.ServerComponent[transaction.Tx] = (*TelemetryServer[transaction.Tx])(nil) + _ serverv2.HasConfig = (*TelemetryServer[transaction.Tx])(nil) +) - metricsHandler := func(w http.ResponseWriter, r *http.Request) { - format := strings.TrimSpace(r.FormValue("format")) +const ServerName = "telemetry" - gr, err := m.Gather(format) - if err != nil { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusBadRequest) - bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)}) - if err != nil { - return - } - _, _ = w.Write(bz) +type TelemetryServer[T transaction.Tx] struct { + config *Config + logger log.Logger +} - return - } +// New creates a new telemtry server. +func New[T transaction.Tx]() *TelemetryServer[T] { + return &TelemetryServer[T]{} +} + +// Name returns the server name. +func (s *TelemetryServer[T]) Name() string { + return ServerName +} - w.Header().Set("Content-Type", gr.ContentType) - _, _ = w.Write(gr.Metrics) +func (s *TelemetryServer[T]) Config() any { + if s.config == nil || s.config == (&Config{}) { + return DefaultConfig() } - r.HandleFunc("/metrics", metricsHandler).Methods("GET") + return s.config +} + +// Init implements serverv2.ServerComponent. +func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error { + serverCfg := s.Config().(*Config) + if len(cfg) > 0 { + if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil { + return fmt.Errorf("failed to unmarshal config: %w", err) + } + } + s.config = serverCfg + s.logger = logger + + return nil +} - return m, nil +func (s *TelemetryServer[T]) Start(context.Context) error { + return nil } -// errorResponse defines the attributes of a JSON error response. -type errorResponse struct { - Code int `json:"code,omitempty"` - Error string `json:"error"` +func (s *TelemetryServer[T]) Stop(context.Context) error { + return nil } diff --git a/simapp/v2/simdv2/cmd/commands.go b/simapp/v2/simdv2/cmd/commands.go index d58619170e0d..059a2d272b66 100644 --- a/simapp/v2/simdv2/cmd/commands.go +++ b/simapp/v2/simdv2/cmd/commands.go @@ -15,6 +15,7 @@ import ( runtimev2 "cosmossdk.io/runtime/v2" serverv2 "cosmossdk.io/server/v2" "cosmossdk.io/server/v2/api/grpc" + "cosmossdk.io/server/v2/api/telemetry" "cosmossdk.io/server/v2/cometbft" "cosmossdk.io/server/v2/store" "cosmossdk.io/simapp/v2" @@ -77,6 +78,7 @@ func initRootCmd[T transaction.Tx]( cometbft.New(&genericTxDecoder[T]{txConfig}, cometbft.DefaultServerOptions[T]()), grpc.New[T](), store.New[T](newApp), + telemetry.New[T](), ); err != nil { panic(err) } From 9203f8fd3ba4de033871f85bd9be387665bf4e85 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 11:51:32 +0200 Subject: [PATCH 2/8] updates --- server/v2/api/grpc/config.go | 9 +--- server/v2/api/telemetry/config.go | 16 +++--- server/v2/api/telemetry/metrics.go | 5 -- server/v2/api/telemetry/register.go | 48 ------------------ server/v2/api/telemetry/server.go | 75 +++++++++++++++++++++++++++-- 5 files changed, 82 insertions(+), 71 deletions(-) delete mode 100644 server/v2/api/telemetry/register.go diff --git a/server/v2/api/grpc/config.go b/server/v2/api/grpc/config.go index 86fb514e70d8..4e9cabc7a418 100644 --- a/server/v2/api/grpc/config.go +++ b/server/v2/api/grpc/config.go @@ -4,14 +4,9 @@ import "math" func DefaultConfig() *Config { return &Config{ - Enable: true, - // DefaultGRPCAddress defines the default address to bind the gRPC server to. - Address: "localhost:9090", - // DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in - // bytes the server can receive. + Enable: true, + Address: "localhost:9090", MaxRecvMsgSize: 1024 * 1024 * 10, - // DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in - // bytes the server can send. MaxSendMsgSize: math.MaxInt32, } } diff --git a/server/v2/api/telemetry/config.go b/server/v2/api/telemetry/config.go index 5e2bfed338df..ba652fb6056a 100644 --- a/server/v2/api/telemetry/config.go +++ b/server/v2/api/telemetry/config.go @@ -2,8 +2,9 @@ package telemetry func DefaultConfig() *Config { return &Config{ + Enable: true, + Address: "localhost:1338", ServiceName: "", - Enabled: true, EnableHostname: false, EnableHostnameLabel: false, EnableServiceLabel: false, @@ -16,13 +17,16 @@ func DefaultConfig() *Config { } type Config struct { - // Prefixed with keys to separate services - ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."` - - // Enabled enables the application telemetry functionality. When enabled, + // Enable enables the application telemetry functionality. When enabled, // an in-memory sink is also enabled by default. Operators may also enabled // other sinks such as Prometheus. - Enabled bool `mapstructure:"enabled" toml:"enabled" comment:"Enabled enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."` + Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."` + + // Address defines the API server to listen on + Address string `mapstructure:"address" toml:"address" comment:"Address defines the metrics server address to bind to."` + + // Prefixed with keys to separate services + ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."` // Enable prefixing gauge values with hostname EnableHostname bool `mapstructure:"enable-hostname" toml:"enable-hostname" comment:"Enable prefixing gauge values with hostname."` diff --git a/server/v2/api/telemetry/metrics.go b/server/v2/api/telemetry/metrics.go index f15b30d3470c..86218630c880 100644 --- a/server/v2/api/telemetry/metrics.go +++ b/server/v2/api/telemetry/metrics.go @@ -59,10 +59,6 @@ type GatherResponse struct { // New creates a new instance of Metrics func NewMetrics(cfg *Config) (*Metrics, error) { - if !cfg.Enabled { - return nil, nil - } - if numGlobalLabels := len(cfg.GlobalLabels); numGlobalLabels > 0 { parsedGlobalLabels := make([]metrics.Label, numGlobalLabels) for i, gl := range cfg.GlobalLabels { @@ -94,7 +90,6 @@ func NewMetrics(cfg *Config) (*Metrics, error) { } }() } - if err != nil { return nil, err } diff --git a/server/v2/api/telemetry/register.go b/server/v2/api/telemetry/register.go deleted file mode 100644 index 23efe4b4516d..000000000000 --- a/server/v2/api/telemetry/register.go +++ /dev/null @@ -1,48 +0,0 @@ -package telemetry - -import ( - "encoding/json" - "fmt" - "net/http" - "strings" - - "github.com/gorilla/mux" -) - -// RegisterMetrics registers the metrics handler to the provided router. -func (s TelemetryServer[T]) RegisterMetrics(r mux.Router) (*Metrics, error) { - m, err := NewMetrics(s.config) - if err != nil { - return nil, err - } - - metricsHandler := func(w http.ResponseWriter, r *http.Request) { - format := strings.TrimSpace(r.FormValue("format")) - - gr, err := m.Gather(format) - if err != nil { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusBadRequest) - bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)}) - if err != nil { - return - } - _, _ = w.Write(bz) - - return - } - - w.Header().Set("Content-Type", gr.ContentType) - _, _ = w.Write(gr.Metrics) - } - - r.HandleFunc("/metrics", metricsHandler).Methods("GET") - - return m, nil -} - -// errorResponse defines the attributes of a JSON error response. -type errorResponse struct { - Code int `json:"code,omitempty"` - Error string `json:"error"` -} diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index d455b57153ab..f2eb07ca4f30 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -2,7 +2,10 @@ package telemetry import ( "context" + "encoding/json" "fmt" + "net/http" + "strings" "cosmossdk.io/core/transaction" "cosmossdk.io/log" @@ -17,8 +20,10 @@ var ( const ServerName = "telemetry" type TelemetryServer[T transaction.Tx] struct { - config *Config - logger log.Logger + config *Config + logger log.Logger + server *http.Server + metrics *Metrics } // New creates a new telemtry server. @@ -50,13 +55,73 @@ func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, log s.config = serverCfg s.logger = logger + metrics, err := NewMetrics(s.config) + if err != nil { + return fmt.Errorf("failed to initialize metrics: %w", err) + } + s.metrics = metrics + return nil } -func (s *TelemetryServer[T]) Start(context.Context) error { +func (s *TelemetryServer[T]) Start(ctx context.Context) error { + if !s.config.Enable { + return nil + } + + mux := http.NewServeMux() + mux.HandleFunc("/", s.metricsHandler) + // keeping /metrics for backwards compatibility + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/", http.StatusMovedPermanently) + }) + + s.server = &http.Server{ + Addr: s.config.Address, + Handler: mux, + } + + go func() { + s.logger.Info("Starting telemetry server", "address", s.config.Address) + if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + s.logger.Error("Failed to start telemetry server", "error", err) + } + }() + return nil } -func (s *TelemetryServer[T]) Stop(context.Context) error { - return nil +func (s *TelemetryServer[T]) Stop(ctx context.Context) error { + if !s.config.Enable || s.server == nil { + return nil + } + + s.logger.Info("Stopping telemetry server") + return s.server.Shutdown(ctx) +} + +func (s *TelemetryServer[T]) metricsHandler(w http.ResponseWriter, r *http.Request) { + format := strings.TrimSpace(r.FormValue("format")) + + // errorResponse defines the attributes of a JSON error response. + type errorResponse struct { + Code int `json:"code,omitempty"` + Error string `json:"error"` + } + + gr, err := s.metrics.Gather(format) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)}) + if err != nil { + return + } + _, _ = w.Write(bz) + + return + } + + w.Header().Set("Content-Type", gr.ContentType) + _, _ = w.Write(gr.Metrics) } From 11b33bb4de16523e1d904733c1b08ab1bd828f7e Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 13:13:02 +0200 Subject: [PATCH 3/8] fix typo --- server/v2/api/telemetry/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index f2eb07ca4f30..5f88388865b7 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -26,7 +26,7 @@ type TelemetryServer[T transaction.Tx] struct { metrics *Metrics } -// New creates a new telemtry server. +// New creates a new telemetry server. func New[T transaction.Tx]() *TelemetryServer[T] { return &TelemetryServer[T]{} } From e422f5d6b827484085ee1f9f182cf99d42998974 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 15:09:52 +0200 Subject: [PATCH 4/8] proper empy config check --- server/v2/api/grpc/server.go | 2 +- server/v2/api/grpcgateway/server.go | 2 +- server/v2/api/telemetry/server.go | 2 +- server/v2/cometbft/server.go | 2 +- server/v2/store/server.go | 6 +++--- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index 0003f2343222..20319d971957 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -148,7 +148,7 @@ func (s *Server[T]) Name() string { } func (s *Server[T]) Config() any { - if s.config == nil || s.config == (&Config{}) { + if s.config == nil || s.config.Address == "" { cfg := DefaultConfig() // overwrite the default config with the provided options for _, opt := range s.cfgOptions { diff --git a/server/v2/api/grpcgateway/server.go b/server/v2/api/grpcgateway/server.go index 129d6bfdc913..f3a7a7a7aa8d 100644 --- a/server/v2/api/grpcgateway/server.go +++ b/server/v2/api/grpcgateway/server.go @@ -72,7 +72,7 @@ func (g *GRPCGatewayServer[T]) Name() string { } func (s *GRPCGatewayServer[T]) Config() any { - if s.config == nil || s.config == (&Config{}) { + if s.config == nil { cfg := DefaultConfig() // overwrite the default config with the provided options for _, opt := range s.cfgOptions { diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index 5f88388865b7..f1230526439f 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -37,7 +37,7 @@ func (s *TelemetryServer[T]) Name() string { } func (s *TelemetryServer[T]) Config() any { - if s.config == nil || s.config == (&Config{}) { + if s.config == nil || s.config.Address == "" { return DefaultConfig() } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 2f402522c099..21b6d59ddc72 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -271,7 +271,7 @@ func (s *CometBFTServer[T]) CLICommands() serverv2.CLIConfig { // Config returns the (app.toml) server configuration. func (s *CometBFTServer[T]) Config() any { - if s.config.AppTomlConfig == nil || s.config.AppTomlConfig == (&AppTomlConfig{}) { + if s.config.AppTomlConfig == nil || s.config.AppTomlConfig.Address == "" { cfg := &Config{AppTomlConfig: DefaultAppTomlConfig()} // overwrite the default config with the provided options for _, opt := range s.cfgOptions { diff --git a/server/v2/store/server.go b/server/v2/store/server.go index a5f464c964a5..524869a18aa3 100644 --- a/server/v2/store/server.go +++ b/server/v2/store/server.go @@ -68,10 +68,10 @@ func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig { } } -func (g *StoreComponent[T]) Config() any { - if g.config == nil || g.config == (&Config{}) { +func (s *StoreComponent[T]) Config() any { + if s.config == nil || s.config.AppDBBackend == "" { return DefaultConfig() } - return g.config + return s.config } From 64068e29cc9fb9328bab6f7b81e3bdc2a877f55d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 15:16:04 +0200 Subject: [PATCH 5/8] log if servers are disabled via config --- server/v2/api/grpc/server.go | 1 + server/v2/api/grpcgateway/server.go | 1 + server/v2/api/telemetry/server.go | 1 + 3 files changed, 3 insertions(+) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index 20319d971957..3bde835a449e 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -163,6 +163,7 @@ func (s *Server[T]) Config() any { func (s *Server[T]) Start(ctx context.Context) error { if !s.config.Enable { + s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name())) return nil } diff --git a/server/v2/api/grpcgateway/server.go b/server/v2/api/grpcgateway/server.go index f3a7a7a7aa8d..60d18309e258 100644 --- a/server/v2/api/grpcgateway/server.go +++ b/server/v2/api/grpcgateway/server.go @@ -104,6 +104,7 @@ func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[ func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error { if !s.config.Enable { + s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name())) return nil } diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index f1230526439f..2b8b0bdb9c8c 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -66,6 +66,7 @@ func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, log func (s *TelemetryServer[T]) Start(ctx context.Context) error { if !s.config.Enable { + s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name())) return nil } From 9117d8526c7d7f51699e2291fdae3bb34e6cfbe4 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 15:17:58 +0200 Subject: [PATCH 6/8] no go rountine --- server/v2/api/telemetry/server.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index 2b8b0bdb9c8c..3d846c8c2637 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -82,12 +82,9 @@ func (s *TelemetryServer[T]) Start(ctx context.Context) error { Handler: mux, } - go func() { - s.logger.Info("Starting telemetry server", "address", s.config.Address) - if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - s.logger.Error("Failed to start telemetry server", "error", err) - } - }() + if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return fmt.Errorf("failed to start telemetry server: %w", err) + } return nil } From 907123e21ee8c1a96c84da7496d79c744699db31 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 15:20:35 +0200 Subject: [PATCH 7/8] no double log --- server/v2/api/telemetry/server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index 3d846c8c2637..7eccbceb9ebc 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -94,7 +94,6 @@ func (s *TelemetryServer[T]) Stop(ctx context.Context) error { return nil } - s.logger.Info("Stopping telemetry server") return s.server.Shutdown(ctx) } From c80099b9a18a7752873ee2d0972e9db613ad9975 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Fri, 27 Sep 2024 15:55:13 +0200 Subject: [PATCH 8/8] updates --- server/v2/api/grpc/server.go | 26 ++++------ server/v2/api/grpcgateway/config.go | 6 ++- server/v2/api/grpcgateway/server.go | 74 ++++++++++++++--------------- server/v2/api/telemetry/config.go | 2 +- server/v2/api/telemetry/server.go | 26 +++++----- server/v2/go.mod | 1 - server/v2/go.sum | 2 - server/v2/store/commands.go | 2 +- server/v2/store/server.go | 30 ++++++------ server/v2/store/snapshot.go | 12 ++--- 10 files changed, 87 insertions(+), 94 deletions(-) diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index 3bde835a449e..170343fd512c 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -172,24 +172,12 @@ func (s *Server[T]) Start(ctx context.Context) error { return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err) } - errCh := make(chan error) - - // Start the gRPC in an external goroutine as Serve is blocking and will return - // an error upon failure, which we'll send on the error channel that will be - // consumed by the for block below. - go func() { - s.logger.Info("starting gRPC server...", "address", s.config.Address) - errCh <- s.grpcSrv.Serve(listener) - }() - - // Start a blocking select to wait for an indication to stop the server or that - // the server failed to start properly. - err = <-errCh - if err != nil { - s.logger.Error("failed to start gRPC server", "err", err) + s.logger.Info("starting gRPC server...", "address", s.config.Address) + if err := s.grpcSrv.Serve(listener); err != nil { + return fmt.Errorf("failed to start gRPC server: %w", err) } - return err + return nil } func (s *Server[T]) Stop(ctx context.Context) error { @@ -199,6 +187,10 @@ func (s *Server[T]) Stop(ctx context.Context) error { s.logger.Info("stopping gRPC server...", "address", s.config.Address) s.grpcSrv.GracefulStop() - return nil } + +// GetGRPCServer returns the underlying gRPC server. +func (s *Server[T]) GetGRPCServer() *grpc.Server { + return s.grpcSrv +} diff --git a/server/v2/api/grpcgateway/config.go b/server/v2/api/grpcgateway/config.go index c5ccb3bfe2d1..773eeb4d58ad 100644 --- a/server/v2/api/grpcgateway/config.go +++ b/server/v2/api/grpcgateway/config.go @@ -2,13 +2,17 @@ package grpcgateway func DefaultConfig() *Config { return &Config{ - Enable: true, + Enable: true, + Address: "localhost:1317", } } type Config struct { // Enable defines if the gRPC-gateway should be enabled. Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable defines if the gRPC-gateway should be enabled."` + + // Address defines the address the gRPC-gateway server binds to. + Address string `mapstructure:"address" toml:"address" comment:"Address defines the address the gRPC-gateway server binds to."` } type CfgOption func(*Config) diff --git a/server/v2/api/grpcgateway/server.go b/server/v2/api/grpcgateway/server.go index 60d18309e258..412c4e4ad81d 100644 --- a/server/v2/api/grpcgateway/server.go +++ b/server/v2/api/grpcgateway/server.go @@ -8,7 +8,6 @@ import ( gateway "github.com/cosmos/gogogateway" "github.com/cosmos/gogoproto/jsonpb" - "github.com/gorilla/mux" "github.com/grpc-ecosystem/grpc-gateway/runtime" "google.golang.org/grpc" @@ -18,28 +17,24 @@ import ( ) var ( - _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil) - _ serverv2.HasConfig = (*GRPCGatewayServer[transaction.Tx])(nil) + _ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil) + _ serverv2.HasConfig = (*Server[transaction.Tx])(nil) ) -const ( - ServerName = "grpc-gateway" +const ServerName = "grpc-gateway" - // GRPCBlockHeightHeader is the gRPC header for block height. - GRPCBlockHeightHeader = "x-cosmos-block-height" -) - -type GRPCGatewayServer[T transaction.Tx] struct { +type Server[T transaction.Tx] struct { logger log.Logger config *Config cfgOptions []CfgOption - GRPCSrv *grpc.Server - GRPCGatewayRouter *runtime.ServeMux + server *http.Server + gRPCSrv *grpc.Server + gRPCGatewayRouter *runtime.ServeMux } // New creates a new gRPC-gateway server. -func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *GRPCGatewayServer[T] { +func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptions ...CfgOption) *Server[T] { // The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields. // Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue. marshalerOption := &gateway.JSONPb{ @@ -49,9 +44,9 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio AnyResolver: ir, } - return &GRPCGatewayServer[T]{ - GRPCSrv: grpcSrv, - GRPCGatewayRouter: runtime.NewServeMux( + return &Server[T]{ + gRPCSrv: grpcSrv, + gRPCGatewayRouter: runtime.NewServeMux( // Custom marshaler option is required for gogo proto runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption), @@ -67,12 +62,12 @@ func New[T transaction.Tx](grpcSrv *grpc.Server, ir jsonpb.AnyResolver, cfgOptio } } -func (g *GRPCGatewayServer[T]) Name() string { +func (s *Server[T]) Name() string { return ServerName } -func (s *GRPCGatewayServer[T]) Config() any { - if s.config == nil { +func (s *Server[T]) Config() any { + if s.config == nil || s.config.Address == "" { cfg := DefaultConfig() // overwrite the default config with the provided options for _, opt := range s.cfgOptions { @@ -85,7 +80,7 @@ func (s *GRPCGatewayServer[T]) Config() any { return s.config } -func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error { +func (s *Server[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[string]any, logger log.Logger) error { serverCfg := s.Config().(*Config) if len(cfg) > 0 { if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil { @@ -93,43 +88,43 @@ func (s *GRPCGatewayServer[T]) Init(appI serverv2.AppI[transaction.Tx], cfg map[ } } - // Register the gRPC-Gateway server. - // appI.RegisterGRPCGatewayRoutes(s.GRPCGatewayRouter, s.GRPCSrv) + // TODO: register the gRPC-Gateway routes - s.logger = logger + s.logger = logger.With(log.ModuleKey, s.Name()) s.config = serverCfg return nil } -func (s *GRPCGatewayServer[T]) Start(ctx context.Context) error { +func (s *Server[T]) Start(ctx context.Context) error { if !s.config.Enable { s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name())) return nil } - // TODO start a normal Go http server (and do not leverage comet's like https://github.com/cosmos/cosmos-sdk/blob/9df6019de6ee7999fe9864bac836deb2f36dd44a/server/api/server.go#L98) + mux := http.NewServeMux() + mux.Handle("/", s.gRPCGatewayRouter) - return nil -} + s.server = &http.Server{ + Addr: s.config.Address, + Handler: mux, + } -func (s *GRPCGatewayServer[T]) Stop(ctx context.Context) error { - if !s.config.Enable { - return nil + s.logger.Info("starting gRPC-Gateway server...", "address", s.config.Address) + if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return fmt.Errorf("failed to start gRPC-Gateway server: %w", err) } return nil } -// Register implements registers a grpc-gateway server -func (s *GRPCGatewayServer[T]) Register(r mux.Router) error { - // configure grpc-gatway server - r.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - // Fall back to grpc gateway server. - s.GRPCGatewayRouter.ServeHTTP(w, req) - })) +func (s *Server[T]) Stop(ctx context.Context) error { + if !s.config.Enable { + return nil + } - return nil + s.logger.Info("stopping gRPC-Gateway server...", "address", s.config.Address) + return s.server.Shutdown(ctx) } // CustomGRPCHeaderMatcher for mapping request headers to @@ -138,6 +133,9 @@ func (s *GRPCGatewayServer[T]) Register(r mux.Router) error { // gRPC metadata after removing prefix 'Grpc-Metadata-'. We can use this // CustomGRPCHeaderMatcher if headers don't start with `Grpc-Metadata-` func CustomGRPCHeaderMatcher(key string) (string, bool) { + // GRPCBlockHeightHeader is the gRPC header for block height. + const GRPCBlockHeightHeader = "x-cosmos-block-height" + switch strings.ToLower(key) { case GRPCBlockHeightHeader: return GRPCBlockHeightHeader, true diff --git a/server/v2/api/telemetry/config.go b/server/v2/api/telemetry/config.go index ba652fb6056a..2619665d1844 100644 --- a/server/v2/api/telemetry/config.go +++ b/server/v2/api/telemetry/config.go @@ -3,7 +3,7 @@ package telemetry func DefaultConfig() *Config { return &Config{ Enable: true, - Address: "localhost:1338", + Address: "localhost:1318", ServiceName: "", EnableHostname: false, EnableHostnameLabel: false, diff --git a/server/v2/api/telemetry/server.go b/server/v2/api/telemetry/server.go index 7eccbceb9ebc..411b1bda797d 100644 --- a/server/v2/api/telemetry/server.go +++ b/server/v2/api/telemetry/server.go @@ -13,13 +13,13 @@ import ( ) var ( - _ serverv2.ServerComponent[transaction.Tx] = (*TelemetryServer[transaction.Tx])(nil) - _ serverv2.HasConfig = (*TelemetryServer[transaction.Tx])(nil) + _ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil) + _ serverv2.HasConfig = (*Server[transaction.Tx])(nil) ) const ServerName = "telemetry" -type TelemetryServer[T transaction.Tx] struct { +type Server[T transaction.Tx] struct { config *Config logger log.Logger server *http.Server @@ -27,16 +27,16 @@ type TelemetryServer[T transaction.Tx] struct { } // New creates a new telemetry server. -func New[T transaction.Tx]() *TelemetryServer[T] { - return &TelemetryServer[T]{} +func New[T transaction.Tx]() *Server[T] { + return &Server[T]{} } // Name returns the server name. -func (s *TelemetryServer[T]) Name() string { +func (s *Server[T]) Name() string { return ServerName } -func (s *TelemetryServer[T]) Config() any { +func (s *Server[T]) Config() any { if s.config == nil || s.config.Address == "" { return DefaultConfig() } @@ -45,7 +45,7 @@ func (s *TelemetryServer[T]) Config() any { } // Init implements serverv2.ServerComponent. -func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error { +func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error { serverCfg := s.Config().(*Config) if len(cfg) > 0 { if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil { @@ -53,7 +53,7 @@ func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, log } } s.config = serverCfg - s.logger = logger + s.logger = logger.With(log.ModuleKey, s.Name()) metrics, err := NewMetrics(s.config) if err != nil { @@ -64,7 +64,7 @@ func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, log return nil } -func (s *TelemetryServer[T]) Start(ctx context.Context) error { +func (s *Server[T]) Start(ctx context.Context) error { if !s.config.Enable { s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name())) return nil @@ -82,6 +82,7 @@ func (s *TelemetryServer[T]) Start(ctx context.Context) error { Handler: mux, } + s.logger.Info("starting telemetry server...", "address", s.config.Address) if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { return fmt.Errorf("failed to start telemetry server: %w", err) } @@ -89,15 +90,16 @@ func (s *TelemetryServer[T]) Start(ctx context.Context) error { return nil } -func (s *TelemetryServer[T]) Stop(ctx context.Context) error { +func (s *Server[T]) Stop(ctx context.Context) error { if !s.config.Enable || s.server == nil { return nil } + s.logger.Info("stopping telemetry server...", "address", s.config.Address) return s.server.Shutdown(ctx) } -func (s *TelemetryServer[T]) metricsHandler(w http.ResponseWriter, r *http.Request) { +func (s *Server[T]) metricsHandler(w http.ResponseWriter, r *http.Request) { format := strings.TrimSpace(r.FormValue("format")) // errorResponse defines the attributes of a JSON error response. diff --git a/server/v2/go.mod b/server/v2/go.mod index 7ccfca6f628c..d996ab3d3681 100644 --- a/server/v2/go.mod +++ b/server/v2/go.mod @@ -22,7 +22,6 @@ require ( github.com/cosmos/gogogateway v1.2.0 github.com/cosmos/gogoproto v1.7.0 github.com/golang/protobuf v1.5.4 - github.com/gorilla/mux v1.8.1 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/go-metrics v0.5.3 diff --git a/server/v2/go.sum b/server/v2/go.sum index 748913b517f3..1a64fd341f6c 100644 --- a/server/v2/go.sum +++ b/server/v2/go.sum @@ -150,8 +150,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= -github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= diff --git a/server/v2/store/commands.go b/server/v2/store/commands.go index 7c50995f923e..8b518724aae4 100644 --- a/server/v2/store/commands.go +++ b/server/v2/store/commands.go @@ -17,7 +17,7 @@ import ( ) // QueryBlockResultsCmd implements the default command for a BlockResults query. -func (s *StoreComponent[T]) PrunesCmd() *cobra.Command { +func (s *Server[T]) PrunesCmd() *cobra.Command { cmd := &cobra.Command{ Use: "prune [pruning-method]", Short: "Prune app history states by keeping the recent heights and deleting old heights", diff --git a/server/v2/store/server.go b/server/v2/store/server.go index 524869a18aa3..93f9cf436648 100644 --- a/server/v2/store/server.go +++ b/server/v2/store/server.go @@ -12,49 +12,49 @@ import ( ) var ( - _ serverv2.ServerComponent[transaction.Tx] = (*StoreComponent[transaction.Tx])(nil) - _ serverv2.HasConfig = (*StoreComponent[transaction.Tx])(nil) - _ serverv2.HasCLICommands = (*StoreComponent[transaction.Tx])(nil) + _ serverv2.ServerComponent[transaction.Tx] = (*Server[transaction.Tx])(nil) + _ serverv2.HasConfig = (*Server[transaction.Tx])(nil) + _ serverv2.HasCLICommands = (*Server[transaction.Tx])(nil) ) const ServerName = "store" -// StoreComponent manages store config -// and contains prune & snapshot commands -type StoreComponent[T transaction.Tx] struct { +// StoreComponent manages store config and contains prune & snapshot commands +type Server[T transaction.Tx] struct { config *Config // saving appCreator for only RestoreSnapshotCmd appCreator serverv2.AppCreator[T] } -func New[T transaction.Tx](appCreator serverv2.AppCreator[T]) *StoreComponent[T] { - return &StoreComponent[T]{appCreator: appCreator} +func New[T transaction.Tx](appCreator serverv2.AppCreator[T]) *Server[T] { + return &Server[T]{appCreator: appCreator} } -func (s *StoreComponent[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error { - serverCfg := DefaultConfig() +func (s *Server[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error { + serverCfg := s.Config().(*Config) if len(cfg) > 0 { if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil { return fmt.Errorf("failed to unmarshal config: %w", err) } } + s.config = serverCfg return nil } -func (s *StoreComponent[T]) Name() string { +func (s *Server[T]) Name() string { return ServerName } -func (s *StoreComponent[T]) Start(ctx context.Context) error { +func (s *Server[T]) Start(ctx context.Context) error { return nil } -func (s *StoreComponent[T]) Stop(ctx context.Context) error { +func (s *Server[T]) Stop(ctx context.Context) error { return nil } -func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig { +func (s *Server[T]) CLICommands() serverv2.CLIConfig { return serverv2.CLIConfig{ Commands: []*cobra.Command{ s.PrunesCmd(), @@ -68,7 +68,7 @@ func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig { } } -func (s *StoreComponent[T]) Config() any { +func (s *Server[T]) Config() any { if s.config == nil || s.config.AppDBBackend == "" { return DefaultConfig() } diff --git a/server/v2/store/snapshot.go b/server/v2/store/snapshot.go index 2c106f85fe69..db0ed9ad93dd 100644 --- a/server/v2/store/snapshot.go +++ b/server/v2/store/snapshot.go @@ -25,7 +25,7 @@ import ( const SnapshotFileName = "_snapshot" // QueryBlockResultsCmd implements the default command for a BlockResults query. -func (s *StoreComponent[T]) ExportSnapshotCmd() *cobra.Command { +func (s *Server[T]) ExportSnapshotCmd() *cobra.Command { cmd := &cobra.Command{ Use: "export", Short: "Export app state to snapshot store", @@ -76,7 +76,7 @@ func (s *StoreComponent[T]) ExportSnapshotCmd() *cobra.Command { } // RestoreSnapshotCmd returns a command to restore a snapshot -func (s *StoreComponent[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *cobra.Command { +func (s *Server[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *cobra.Command { cmd := &cobra.Command{ Use: "restore ", Short: "Restore app state from local snapshot", @@ -113,7 +113,7 @@ func (s *StoreComponent[T]) RestoreSnapshotCmd(newApp serverv2.AppCreator[T]) *c } // ListSnapshotsCmd returns the command to list local snapshots -func (s *StoreComponent[T]) ListSnapshotsCmd() *cobra.Command { +func (s *Server[T]) ListSnapshotsCmd() *cobra.Command { cmd := &cobra.Command{ Use: "list", Short: "List local snapshots", @@ -139,7 +139,7 @@ func (s *StoreComponent[T]) ListSnapshotsCmd() *cobra.Command { } // DeleteSnapshotCmd returns the command to delete a local snapshot -func (s *StoreComponent[T]) DeleteSnapshotCmd() *cobra.Command { +func (s *Server[T]) DeleteSnapshotCmd() *cobra.Command { return &cobra.Command{ Use: "delete ", Short: "Delete a local snapshot", @@ -167,7 +167,7 @@ func (s *StoreComponent[T]) DeleteSnapshotCmd() *cobra.Command { } // DumpArchiveCmd returns a command to dump the snapshot as portable archive format -func (s *StoreComponent[T]) DumpArchiveCmd() *cobra.Command { +func (s *Server[T]) DumpArchiveCmd() *cobra.Command { cmd := &cobra.Command{ Use: "dump ", Short: "Dump the snapshot as portable archive format", @@ -260,7 +260,7 @@ func (s *StoreComponent[T]) DumpArchiveCmd() *cobra.Command { } // LoadArchiveCmd load a portable archive format snapshot into snapshot store -func (s *StoreComponent[T]) LoadArchiveCmd() *cobra.Command { +func (s *Server[T]) LoadArchiveCmd() *cobra.Command { return &cobra.Command{ Use: "load ", Short: "Load a snapshot archive file (.tar.gz) into snapshot store",