Skip to content

Commit

Permalink
Merge pull request #256 from resgateio/feature/gh-255-add-openmetrics…
Browse files Browse the repository at this point in the history
…-exporter

Feature/gh 255 add openmetrics exporter
  • Loading branch information
jirenius authored Jul 2, 2024
2 parents 94b3754 + 68a92ef commit a42f3fd
Show file tree
Hide file tree
Showing 19 changed files with 885 additions and 5 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/resgateio/resgate
go 1.13

require (
github.com/bsm/openmetrics v0.3.1
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2
github.com/jirenius/timerqueue v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/bsm/openmetrics v0.3.1 h1:nhR6QgaKaDmnbnvVP9R0JyPExt8Qa+n1cJk/ouGC4FY=
github.com/bsm/openmetrics v0.3.1/go.mod h1:tabLMhjVjhdhFuwm9YenEVx0s54uvu56faEwYgD6L2g=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Server Options:
-r, --reqtimeout <milliseconds> Timeout duration for NATS requests (default: 3000)
-u, --headauth <method> Resource method for header authentication
-t, --wsheadauth <method> Resource method for WebSocket header authentication
-m, --metricsport <port> HTTP port for OpenMetrics connections (default: disabled)
--apiencoding <type> Encoding for web resources: json, jsonflat (default: json)
--putmethod <methodName> Call method name mapped to HTTP PUT requests
--deletemethod <methodName> Call method name mapped to HTTP DELETE requests
Expand Down Expand Up @@ -127,6 +128,7 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
port uint
headauth string
wsheadauth string
metricsport uint
addr string
natsRootCAs StringSlice
debugTrace bool
Expand Down Expand Up @@ -154,6 +156,8 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
fs.StringVar(&headauth, "headauth", "", "Resource method for header authentication.")
fs.StringVar(&wsheadauth, "t", "", "Resource method for WebSocket header authentication.")
fs.StringVar(&wsheadauth, "wsheadauth", "", "Resource method for WebSocket header authentication.")
fs.UintVar(&metricsport, "m", 0, "HTTP port for OpenMetrics connections (default: disabled)")
fs.UintVar(&metricsport, "metricsport", 0, "HTTP port for OpenMetrics connections (default: disabled)")
fs.BoolVar(&c.TLS, "tls", false, "Enable TLS for HTTP.")
fs.StringVar(&c.TLSCert, "tlscert", "", "HTTP server certificate file.")
fs.StringVar(&c.TLSKey, "tlskey", "", "Private key for HTTP server certificate.")
Expand Down Expand Up @@ -187,6 +191,10 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
printAndDie(fmt.Sprintf(`Invalid port "%d": must be less than 65536`, port), true)
}

if metricsport >= 1<<16 {
printAndDie(fmt.Sprintf(`Invalid metrics port "%d": must be less than 65536`, metricsport), true)
}

if showHelp {
usage()
}
Expand Down Expand Up @@ -219,6 +227,9 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
if port > 0 {
c.Port = uint16(port)
}
if metricsport > 0 {
c.MetricsPort = uint16(metricsport)
}

// Helper function to set string pointers to nil if empty.
setString := func(v string, s **string) {
Expand Down
16 changes: 16 additions & 0 deletions server/apiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (s *Service) apiHandler(w http.ResponseWriter, r *http.Request) {
case "HEAD":
fallthrough
case "GET":
// Metrics
if s.metrics != nil {
s.metrics.HTTPRequestsGet.Add(1)
}

rid = PathToRID(path, r.URL.RawQuery, apiPath)
if !codec.IsValidRID(rid, true) {
notFoundHandler(w, s.enc)
Expand All @@ -107,6 +112,11 @@ func (s *Service) apiHandler(w http.ResponseWriter, r *http.Request) {
return

case "POST":
// Metrics
if s.metrics != nil {
s.metrics.HTTPRequestsPost.Add(1)
}

rid, action = PathToRIDAction(path, r.URL.RawQuery, apiPath)
default:
var m *string
Expand All @@ -129,6 +139,12 @@ func (s *Service) apiHandler(w http.ResponseWriter, r *http.Request) {
httpError(w, reserr.ErrMethodNotAllowed, s.enc)
return
}

// Metrics
if s.metrics != nil {
s.metrics.HTTPRequests.With(r.Method).Add(1)
}

rid = PathToRID(path, r.URL.RawQuery, apiPath)
action = *m
}
Expand Down
9 changes: 9 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
Port uint16 `json:"port"`
WSPath string `json:"wsPath"`
APIPath string `json:"apiPath"`
MetricsPort uint16 `json:"metricsPort"`
APIEncoding string `json:"apiEncoding"`
HeaderAuth *string `json:"headerAuth"`
WSHeaderAuth *string `json:"wsHeaderAuth"`
Expand All @@ -39,6 +40,7 @@ type Config struct {

scheme string
netAddr string
metricsNetAddr string
headerAuthRID string
headerAuthAction string
wsHeaderAuthRID string
Expand Down Expand Up @@ -85,6 +87,10 @@ func (c *Config) prepare() error {
}
}

if c.Port == c.MetricsPort {
return fmt.Errorf(`invalid metrics port "%d": must be different from API port ("%d")`, c.MetricsPort, c.Port)
}

// Resolve network address
c.netAddr = ""
if c.Addr != nil {
Expand All @@ -105,6 +111,9 @@ func (c *Config) prepare() error {
} else {
c.netAddr = DefaultAddr
}
if c.MetricsPort != 0 {
c.metricsNetAddr = c.netAddr + fmt.Sprintf(":%d", c.MetricsPort)
}
c.netAddr += fmt.Sprintf(":%d", c.Port)

if c.HeaderAuth != nil {
Expand Down
6 changes: 6 additions & 0 deletions server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func TestConfigPrepare(t *testing.T) {
{Config{WSPath: "/", DELETEMethod: &method}, Config{Addr: nil, Port: 80, WSPath: "/", APIPath: "/", DELETEMethod: &method, scheme: "http", netAddr: "0.0.0.0:80", allowOrigin: []string{"*"}, allowMethods: "GET, HEAD, OPTIONS, POST, DELETE"}, false},
{Config{WSPath: "/", PATCHMethod: &method}, Config{Addr: nil, Port: 80, WSPath: "/", APIPath: "/", PATCHMethod: &method, scheme: "http", netAddr: "0.0.0.0:80", allowOrigin: []string{"*"}, allowMethods: "GET, HEAD, OPTIONS, POST, PATCH"}, false},
{Config{WSPath: "/", PUTMethod: &method, DELETEMethod: &method, PATCHMethod: &method}, Config{Addr: nil, Port: 80, WSPath: "/", APIPath: "/", PUTMethod: &method, DELETEMethod: &method, PATCHMethod: &method, scheme: "http", netAddr: "0.0.0.0:80", allowOrigin: []string{"*"}, allowMethods: "GET, HEAD, OPTIONS, POST, PUT, DELETE, PATCH"}, false},
// Metrics port
{Config{Addr: &emptyAddr, WSPath: "/", MetricsPort: 8090}, Config{Addr: &emptyAddr, Port: 80, WSPath: "/", APIPath: "/", scheme: "http", netAddr: ":80", metricsNetAddr: ":8090", allowOrigin: []string{"*"}, allowMethods: "GET, HEAD, OPTIONS, POST"}, false},
{Config{Addr: &localAddr, WSPath: "/", MetricsPort: 8090}, Config{Addr: &localAddr, Port: 80, WSPath: "/", APIPath: "/", scheme: "http", netAddr: "127.0.0.1:80", metricsNetAddr: "127.0.0.1:8090", allowOrigin: []string{"*"}, allowMethods: "GET, HEAD, OPTIONS, POST"}, false},
{Config{Addr: &ipv6Addr, WSPath: "/", MetricsPort: 8090}, Config{Addr: &ipv6Addr, Port: 80, WSPath: "/", APIPath: "/", scheme: "http", netAddr: "[::1]:80", metricsNetAddr: "[::1]:8090", allowOrigin: []string{"*"}, allowMethods: "GET, HEAD, OPTIONS, POST"}, false},
// Invalid config
{Config{Addr: &invalidAddr, WSPath: "/"}, Config{}, true},
{Config{HeaderAuth: &invalidHeaderAuth, WSPath: "/"}, Config{}, true},
Expand All @@ -84,6 +88,7 @@ func TestConfigPrepare(t *testing.T) {
{Config{PUTMethod: &invalidMethod, WSPath: "/"}, Config{}, true},
{Config{DELETEMethod: &invalidMethod, WSPath: "/"}, Config{}, true},
{Config{PATCHMethod: &invalidMethod, WSPath: "/"}, Config{}, true},
{Config{Addr: &defaultAddr, Port: 8080, MetricsPort: 8080, WSPath: "/"}, Config{}, true},
}

for i, r := range tbl {
Expand Down Expand Up @@ -112,6 +117,7 @@ func TestConfigPrepare(t *testing.T) {

compareString(t, "scheme", cfg.scheme, r.Expected.scheme, i)
compareString(t, "netAddr", cfg.netAddr, r.Expected.netAddr, i)
compareString(t, "metricsNetAddr", cfg.metricsNetAddr, r.Expected.metricsNetAddr, i)
compareString(t, "headerAuthAction", cfg.headerAuthAction, r.Expected.headerAuthAction, i)
compareString(t, "headerAuthRID", cfg.headerAuthRID, r.Expected.headerAuthRID, i)
compareString(t, "wsHeaderAuthAction", cfg.wsHeaderAuthAction, r.Expected.wsHeaderAuthAction, i)
Expand Down
102 changes: 102 additions & 0 deletions server/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package metrics

import (
"runtime"

"github.com/bsm/openmetrics"
)

type MetricSet struct {
m runtime.MemStats
// Memstats
MemSysBytes openmetrics.Gauge
// WebSocket connectionws
WSConnections openmetrics.Gauge
WSConnectionCount openmetrics.Counter
// WebSocket requests
WSRequestsGet openmetrics.Counter
WSRequestsSubscribe openmetrics.Counter
WSRequestsUnsubscribe openmetrics.Counter
WSRequestsCall openmetrics.Counter
WSRequestsAuth openmetrics.Counter
// Cache
CacheResources openmetrics.Gauge
CacheSubscriptions openmetrics.Gauge
// HTTP requests
HTTPRequests openmetrics.CounterFamily
HTTPRequestsGet openmetrics.Counter
HTTPRequestsPost openmetrics.Counter
}

// Scrape updates the metric set with info on current mem usage.
func (m *MetricSet) Scrape() {
runtime.ReadMemStats(&m.m)
m.MemSysBytes.Set(float64(m.m.Sys))
}

func (m *MetricSet) Register(reg *openmetrics.Registry, version string, protocolVersion string) {
// Go info
reg.Info(openmetrics.Desc{
Name: "go",
Help: "Information about the Go environment.",
Labels: []string{"version"},
}).With(runtime.Version())

// Resgate info
reg.Info(openmetrics.Desc{
Name: "resgate",
Help: "Information about resgate.",
Labels: []string{"version", "protocol"},
}).With(version, protocolVersion)

// Memory stats
m.MemSysBytes = reg.Gauge(openmetrics.Desc{
Name: "go_memstats_sys_bytes",
Help: "Number of bytes obtained from system.",
}).With()
m.MemSysBytes.Set(0)

// WebSocket connections
m.WSConnections = reg.Gauge(openmetrics.Desc{
Name: "resgate_ws_current_connections",
Help: "Current established WebSocket connections.",
}).With()
m.WSConnections.Set(0)
m.WSConnectionCount = reg.Counter(openmetrics.Desc{
Name: "resgate_ws_connections",
Help: "Total established WebSocket connections.",
}).With()

// WebSocket requests
wsRequests := reg.Counter(openmetrics.Desc{
Name: "resgate_ws_requests",
Help: "Total WebSocket client requests.",
Labels: []string{"method"},
})
m.WSRequestsGet = wsRequests.With("get")
m.WSRequestsSubscribe = wsRequests.With("subscribe")
m.WSRequestsUnsubscribe = wsRequests.With("unsubscribe")
m.WSRequestsCall = wsRequests.With("call")
m.WSRequestsAuth = wsRequests.With("auth")

// HTTP requests
m.HTTPRequests = reg.Counter(openmetrics.Desc{
Name: "resgate_http_requests",
Help: "Total HTTP client requests.",
Labels: []string{"method"},
})
m.HTTPRequestsGet = m.HTTPRequests.With("GET")
m.HTTPRequestsPost = m.HTTPRequests.With("POST")

// Cache
m.CacheResources = reg.Gauge(openmetrics.Desc{
Name: "resgate_cache_resources",
Help: "Current number of resources stored in the cache.",
}).With()
m.CacheResources.Set(0)
m.CacheSubscriptions = reg.Gauge(openmetrics.Desc{
Name: "resgate_cache_subscriptions",
Help: "Current number of subscriptions on cached resources.",
}).With()
m.CacheSubscriptions.Set(0)
}
102 changes: 102 additions & 0 deletions server/metricsServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package server

import (
"context"
"net"
"net/http"
"time"

"github.com/bsm/openmetrics"
"github.com/bsm/openmetrics/omhttp"
"github.com/resgateio/resgate/server/metrics"
)

const MetricsPattern = "/metrics"

func (s *Service) initMetricsServer() {
if s.cfg.MetricsPort == 0 {
return
}
s.metrics = &metrics.MetricSet{}
}

// MetricsHandler returns any metrics HTTP handler for testing purposes.
func (s *Service) MetricsHandler() http.Handler {
return s.metricsh
}

// startMetricsServer initializes the server and starts a goroutine with a prometheus metrics server
func (s *Service) startMetricsServer() {
if s.cfg.MetricsPort == 0 {
return
}

reg := openmetrics.NewConsistentRegistry(func() time.Time { return time.Now() })
ms := s.metrics
ms.Register(reg, Version, ProtocolVersion)

h := omhttp.NewHandler(reg)
s.metricsh = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ms.Scrape()
h.ServeHTTP(w, r)
})

// For testing
if s.cfg.NoHTTP {
return
}

mux := http.NewServeMux()
mux.Handle(MetricsPattern, s.metricsh)

hln, err := net.Listen("tcp", s.cfg.metricsNetAddr)
if err != nil {
s.Logf("Metrics server can't listen on %s: %s", s.cfg.metricsNetAddr, err)
return
}

metricsServer := &http.Server{
Handler: mux,
}
s.m = metricsServer

s.Logf("Metrics endpoint listening on %s://%s%s", s.cfg.scheme, s.cfg.metricsNetAddr, MetricsPattern)

go func() {
var err error
if s.cfg.TLS {
err = s.m.ServeTLS(hln, s.cfg.TLSCert, s.cfg.TLSKey)
} else {
err = s.m.Serve(hln)
}

if err != nil {
s.Stop(err)
}
}()

}

// stopMetricsServer stops the Metrics server
func (s *Service) stopMetricsServer() {
s.mu.Lock()
defer s.mu.Unlock()

if s.m == nil {
return
}

s.Debugf("Stopping Metrics server...")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

s.m.Shutdown(ctx)
s.m = nil

if ctx.Err() == context.DeadlineExceeded {
s.Errorf("Metrics server forcefully stopped after timeout")
} else {
s.Debugf("Metrics server gracefully stopped")
}
}
2 changes: 1 addition & 1 deletion server/mqClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func (s *Service) initMQClient() {
s.cache = rescache.NewCache(s.mq, CacheWorkers, s.cfg.ResetThrottle, UnsubscribeDelay, s.logger)
s.cache = rescache.NewCache(s.mq, CacheWorkers, s.cfg.ResetThrottle, UnsubscribeDelay, s.logger, s.metrics)
}

// startMQClients creates a connection to the messaging system.
Expand Down
Loading

0 comments on commit a42f3fd

Please sign in to comment.