Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2): wire telemetry + server refactors #21746

Merged
merged 11 commits into from
Sep 27, 2024
9 changes: 2 additions & 7 deletions server/v2/api/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ import "math"

func DefaultConfig() *Config {
return &Config{
Enable: true,
// DefaultGRPCAddress defines the default address to bind the gRPC server to.
Address: "localhost:9090",
// DefaultGRPCMaxRecvMsgSize defines the default gRPC max message size in
// bytes the server can receive.
Enable: true,
Address: "localhost:9090",
MaxRecvMsgSize: 1024 * 1024 * 10,
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
MaxSendMsgSize: math.MaxInt32,
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s *Server[T]) Name() string {
}

func (s *Server[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
if s.config == nil || s.config.Address == "" {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
Expand Down
7 changes: 5 additions & 2 deletions server/v2/api/grpcgateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
serverv2 "cosmossdk.io/server/v2"
)

var _ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)
var (
_ serverv2.ServerComponent[transaction.Tx] = (*GRPCGatewayServer[transaction.Tx])(nil)
_ serverv2.HasConfig = (*GRPCGatewayServer[transaction.Tx])(nil)
)

const (
ServerName = "grpc-gateway"
Expand Down Expand Up @@ -69,7 +72,7 @@ func (g *GRPCGatewayServer[T]) Name() string {
}

func (s *GRPCGatewayServer[T]) Config() any {
if s.config == nil || s.config == (&Config{}) {
if s.config == nil {
cfg := DefaultConfig()
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
Expand Down
29 changes: 24 additions & 5 deletions server/v2/api/telemetry/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
package telemetry

type Config struct {
// Prefixed with keys to separate services
ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."`
func DefaultConfig() *Config {
return &Config{
Enable: true,
Address: "localhost:1338",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v2, as discussed yesterday, the telemetry endpoint will be in another port than grpc gateway.

ServiceName: "",
EnableHostname: false,
EnableHostnameLabel: false,
EnableServiceLabel: false,
PrometheusRetentionTime: 0,
GlobalLabels: nil,
MetricsSink: "",
StatsdAddr: "",
DatadogHostname: "",
}
}

// Enabled enables the application telemetry functionality. When enabled,
type Config struct {
// Enable enables the application telemetry functionality. When enabled,
// an in-memory sink is also enabled by default. Operators may also enabled
// other sinks such as Prometheus.
Enabled bool `mapstructure:"enabled" toml:"enabled" comment:"Enabled enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."`
Enable bool `mapstructure:"enable" toml:"enable" comment:"Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus."`

// Address defines the API server to listen on
Address string `mapstructure:"address" toml:"address" comment:"Address defines the metrics server address to bind to."`

// Prefixed with keys to separate services
ServiceName string `mapstructure:"service-name" toml:"service-name" comment:"Prefixed with keys to separate services."`

// Enable prefixing gauge values with hostname
EnableHostname bool `mapstructure:"enable-hostname" toml:"enable-hostname" comment:"Enable prefixing gauge values with hostname."`
Expand Down
9 changes: 2 additions & 7 deletions server/v2/api/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ type GatherResponse struct {
}

// New creates a new instance of Metrics
func New(cfg Config) (_ *Metrics, rerr error) {
if !cfg.Enabled {
return nil, nil
}

func NewMetrics(cfg *Config) (*Metrics, error) {
if numGlobalLabels := len(cfg.GlobalLabels); numGlobalLabels > 0 {
parsedGlobalLabels := make([]metrics.Label, numGlobalLabels)
for i, gl := range cfg.GlobalLabels {
Expand All @@ -89,12 +85,11 @@ func New(cfg Config) (_ *Metrics, rerr error) {
sink = memSink
inMemSig := metrics.DefaultInmemSignal(memSink)
defer func() {
if rerr != nil {
if err != nil {
inMemSig.Stop()
}
}()
}

if err != nil {
return nil, err
}
Expand Down
130 changes: 105 additions & 25 deletions server/v2/api/telemetry/server.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,127 @@
package telemetry

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/gorilla/mux"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
)

func RegisterMetrics(r mux.Router, cfg Config) (*Metrics, error) {
m, err := New(cfg)
var (
_ serverv2.ServerComponent[transaction.Tx] = (*TelemetryServer[transaction.Tx])(nil)
_ serverv2.HasConfig = (*TelemetryServer[transaction.Tx])(nil)
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
)

const ServerName = "telemetry"

type TelemetryServer[T transaction.Tx] struct {
config *Config
logger log.Logger
server *http.Server
metrics *Metrics
}

// New creates a new telemetry server.
func New[T transaction.Tx]() *TelemetryServer[T] {
return &TelemetryServer[T]{}
}

// Name returns the server name.
func (s *TelemetryServer[T]) Name() string {
return ServerName
}

func (s *TelemetryServer[T]) Config() any {
if s.config == nil || s.config.Address == "" {
return DefaultConfig()
}

return s.config
}

// Init implements serverv2.ServerComponent.
func (s *TelemetryServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logger log.Logger) error {
serverCfg := s.Config().(*Config)
if len(cfg) > 0 {
if err := serverv2.UnmarshalSubConfig(cfg, s.Name(), &serverCfg); err != nil {
return fmt.Errorf("failed to unmarshal config: %w", err)
}
}
s.config = serverCfg
s.logger = logger

metrics, err := NewMetrics(s.config)
if err != nil {
return nil, err
return fmt.Errorf("failed to initialize metrics: %w", err)
}
s.metrics = metrics

metricsHandler := func(w http.ResponseWriter, r *http.Request) {
format := strings.TrimSpace(r.FormValue("format"))
return nil
}

gr, err := m.Gather(format)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)})
if err != nil {
return
}
_, _ = w.Write(bz)
func (s *TelemetryServer[T]) Start(ctx context.Context) error {
if !s.config.Enable {
return nil
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}

return
}
mux := http.NewServeMux()
mux.HandleFunc("/", s.metricsHandler)
// keeping /metrics for backwards compatibility
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/", http.StatusMovedPermanently)
})

w.Header().Set("Content-Type", gr.ContentType)
_, _ = w.Write(gr.Metrics)
s.server = &http.Server{
Addr: s.config.Address,
Handler: mux,
}

r.HandleFunc("/metrics", metricsHandler).Methods("GET")
go func() {
s.logger.Info("Starting telemetry server", "address", s.config.Address)
if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Error("Failed to start telemetry server", "error", err)
}
}()
Fixed Show fixed Hide fixed

return m, nil
return nil
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}

// errorResponse defines the attributes of a JSON error response.
type errorResponse struct {
Code int `json:"code,omitempty"`
Error string `json:"error"`
func (s *TelemetryServer[T]) Stop(ctx context.Context) error {
if !s.config.Enable || s.server == nil {
return nil
}

s.logger.Info("Stopping telemetry server")
return s.server.Shutdown(ctx)
}

func (s *TelemetryServer[T]) metricsHandler(w http.ResponseWriter, r *http.Request) {
format := strings.TrimSpace(r.FormValue("format"))

// errorResponse defines the attributes of a JSON error response.
type errorResponse struct {
Code int `json:"code,omitempty"`
Error string `json:"error"`
}

gr, err := s.metrics.Gather(format)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
bz, err := json.Marshal(errorResponse{Code: 400, Error: fmt.Sprintf("failed to gather metrics: %s", err)})
if err != nil {
return
}
_, _ = w.Write(bz)

return
}

w.Header().Set("Content-Type", gr.ContentType)
_, _ = w.Write(gr.Metrics)
}
2 changes: 1 addition & 1 deletion server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *CometBFTServer[T]) CLICommands() serverv2.CLIConfig {

// Config returns the (app.toml) server configuration.
func (s *CometBFTServer[T]) Config() any {
if s.config.AppTomlConfig == nil || s.config.AppTomlConfig == (&AppTomlConfig{}) {
if s.config.AppTomlConfig == nil || s.config.AppTomlConfig.Address == "" {
cfg := &Config{AppTomlConfig: DefaultAppTomlConfig()}
// overwrite the default config with the provided options
for _, opt := range s.cfgOptions {
Expand Down
6 changes: 3 additions & 3 deletions server/v2/store/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func (s *StoreComponent[T]) CLICommands() serverv2.CLIConfig {
}
}

func (g *StoreComponent[T]) Config() any {
if g.config == nil || g.config == (&Config{}) {
func (s *StoreComponent[T]) Config() any {
if s.config == nil || s.config.AppDBBackend == "" {
return DefaultConfig()
}

return g.config
return s.config
}
2 changes: 2 additions & 0 deletions simapp/v2/simdv2/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
runtimev2 "cosmossdk.io/runtime/v2"
serverv2 "cosmossdk.io/server/v2"
"cosmossdk.io/server/v2/api/grpc"
"cosmossdk.io/server/v2/api/telemetry"
"cosmossdk.io/server/v2/cometbft"
"cosmossdk.io/server/v2/store"
"cosmossdk.io/simapp/v2"
Expand Down Expand Up @@ -81,6 +82,7 @@ func initRootCmd[T transaction.Tx](
),
grpc.New[T](),
store.New[T](newApp),
telemetry.New[T](),
); err != nil {
panic(err)
}
Expand Down
Loading