Skip to content

Commit 92e4094

Browse files
committed
a minor change
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent b17682e commit 92e4094

File tree

329 files changed

+51613
-2067
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

329 files changed

+51613
-2067
lines changed

common/utils/monitoring/monitor.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"sync/atomic"
13+
"time"
14+
15+
"github.com/hyperledger/fabric-x-orderer/common/types"
16+
"github.com/prometheus/client_golang/prometheus"
17+
)
18+
19+
type MonitoringMetric struct {
20+
Interval time.Duration
21+
Value *uint64
22+
Labels []string
23+
}
24+
25+
func (m *MonitoringMetric) monitorBuilder(counter prometheus.Counter) func(ctx context.Context) {
26+
return func(ctx context.Context) {
27+
ticker := time.NewTicker(m.Interval)
28+
defer ticker.Stop()
29+
30+
for {
31+
select {
32+
case <-ctx.Done():
33+
return
34+
case <-ticker.C:
35+
v := atomic.LoadUint64(m.Value)
36+
atomic.StoreUint64(m.Value, 0)
37+
counter.Add(float64(v))
38+
}
39+
}
40+
}
41+
}
42+
43+
type Monitor struct {
44+
monitor *Provider
45+
logger types.Logger
46+
host string
47+
endpoint Endpoint
48+
monitors []func(context.Context)
49+
// stop is used to stop the monitoring service
50+
stop context.CancelFunc
51+
}
52+
53+
type MonitorOpts struct {
54+
Host string
55+
prometheus.CounterOpts
56+
Labels []string
57+
Metrics []MonitoringMetric
58+
}
59+
60+
// NewMonitor creates and returns a new Monitor instance using the provided MonitorOpts and logger.
61+
// It initializes a metrics provider, sets up a CounterVec with the specified options and labels,
62+
// and constructs monitor functions for each metric defined in the MonitorOpts.
63+
// The returned Monitor is configured to track and log metrics as specified.
64+
//
65+
// Parameters:
66+
// - monitorOpts: Pointer to MonitorOpts containing configuration for metrics, labels, and host.
67+
// - logger: Logger instance for logging within the Monitor.
68+
//
69+
// Returns:
70+
// - Pointer to the initialized Monitor.
71+
func NewMonitor(monitorOpts *MonitorOpts, logger types.Logger) *Monitor {
72+
provider := NewProvider()
73+
counterVec := provider.NewCounterVec(monitorOpts.CounterOpts, monitorOpts.Labels)
74+
75+
monitors := make([]func(context.Context), len(monitorOpts.Metrics))
76+
for i := range monitorOpts.Metrics {
77+
monitors[i] = monitorOpts.Metrics[i].monitorBuilder(counterVec.WithLabelValues(monitorOpts.Metrics[i].Labels...))
78+
}
79+
80+
return &Monitor{monitor: provider, host: monitorOpts.Host, logger: logger, monitors: monitors}
81+
}
82+
83+
func (m *Monitor) Start() {
84+
m.endpoint = Endpoint{Host: m.host, Port: 2112}
85+
ctx, cancel := context.WithCancel(context.Background())
86+
m.stop = cancel
87+
serverConfig := ServerConfig{Endpoint: m.endpoint}
88+
89+
go func() {
90+
m.monitor.StartPrometheusServer(ctx, &serverConfig, m.monitors...)
91+
}()
92+
}
93+
94+
func (m *Monitor) Stop() {
95+
if m.stop != nil {
96+
m.stop()
97+
}
98+
}
99+
100+
func (m *Monitor) Address() string {
101+
return fmt.Sprintf("http://%s/metrics", m.endpoint.Address())
102+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package promutil
8+
9+
import (
10+
"time"
11+
12+
"github.com/prometheus/client_golang/prometheus"
13+
)
14+
15+
// AddToCounter adds a value to a prometheus counter.
16+
func AddToCounter(c prometheus.Counter, n int) {
17+
c.Add(float64(n))
18+
}
19+
20+
// AddToCounterVec add a value to given labels on a prometheus counter vector.
21+
func AddToCounterVec(c *prometheus.CounterVec, labels []string, n int) {
22+
c.WithLabelValues(labels...).Add(float64(n))
23+
}
24+
25+
// AddToGauge adds a value to a prometheus gauge.
26+
func AddToGauge(g prometheus.Gauge, n int) {
27+
g.Add(float64(n))
28+
}
29+
30+
// SubFromGauge subtracts a given value from a prometheus gauge.
31+
func SubFromGauge(g prometheus.Gauge, n int) {
32+
g.Sub(float64(n))
33+
}
34+
35+
// SetGaugeVec sets a value to given labels on the prometheus gauge vector.
36+
func SetGaugeVec(c *prometheus.GaugeVec, labels []string, n int) {
37+
c.WithLabelValues(labels...).Set(float64(n))
38+
}
39+
40+
// SetGauge sets a value to a prometheus gauge.
41+
func SetGauge(queue prometheus.Gauge, n int) {
42+
queue.Set(float64(n))
43+
}
44+
45+
// SetUint64Gauge sets a uint64 value to a prometheus gauge.
46+
func SetUint64Gauge(queue prometheus.Gauge, n uint64) {
47+
queue.Set(float64(n))
48+
}
49+
50+
// Observe observes a prometheus histogram.
51+
func Observe(h prometheus.Observer, d time.Duration) {
52+
h.Observe(d.Seconds())
53+
}
54+
55+
// ObserveSize observes a prometheus histogram size.
56+
func ObserveSize(h prometheus.Observer, size int) {
57+
h.Observe(float64(size))
58+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"net/http"
12+
"net/url"
13+
"time"
14+
15+
"github.com/hyperledger/fabric-lib-go/common/flogging"
16+
"github.com/pkg/errors"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/client_golang/prometheus/promhttp"
19+
"golang.org/x/sync/errgroup"
20+
)
21+
22+
const (
23+
scheme = "http://"
24+
metricsSubPath = "/metrics"
25+
)
26+
27+
// Provider is a prometheus metrics provider.
28+
type Provider struct {
29+
registry *prometheus.Registry
30+
url string
31+
}
32+
33+
var logger = flogging.MustGetLogger("monitoring")
34+
35+
// NewProvider creates a new prometheus metrics provider.
36+
func NewProvider() *Provider {
37+
return &Provider{registry: prometheus.NewRegistry()}
38+
}
39+
40+
// StartPrometheusServer starts a prometheus server.
41+
// It also starts the given monitoring methods. Their context will cancel once the server is cancelled.
42+
// This method returns once the server is shutdown and all monitoring methods returns.
43+
func (p *Provider) StartPrometheusServer(
44+
ctx context.Context, serverConfig *ServerConfig, monitor ...func(context.Context),
45+
) error {
46+
logger.Debugf("Creating prometheus server")
47+
mux := http.NewServeMux()
48+
mux.Handle(
49+
metricsSubPath,
50+
promhttp.HandlerFor(
51+
p.Registry(),
52+
promhttp.HandlerOpts{
53+
Registry: p.Registry(),
54+
},
55+
),
56+
)
57+
server := &http.Server{
58+
ReadTimeout: 30 * time.Second,
59+
Handler: mux,
60+
}
61+
62+
l, err := serverConfig.Listener()
63+
if err != nil {
64+
return errors.Wrap(err, "failed to start prometheus server")
65+
}
66+
defer l.Close()
67+
68+
p.url, err = MakeMetricsURL(l.Addr().String())
69+
if err != nil {
70+
return errors.Wrap(err, "failed formatting URL")
71+
}
72+
73+
g, gCtx := errgroup.WithContext(ctx)
74+
g.Go(func() error {
75+
logger.Infof("Prometheus serving on URL: %s", p.url)
76+
defer logger.Info("Prometheus stopped serving")
77+
return server.Serve(l)
78+
})
79+
80+
// The following ensures the method does not return before all monitor methods return.
81+
for _, m := range monitor {
82+
g.Go(func() error {
83+
m(gCtx)
84+
return nil
85+
})
86+
}
87+
88+
// The following ensures the method does not return before the close procedure is complete.
89+
stopAfter := context.AfterFunc(ctx, func() {
90+
g.Go(func() error {
91+
if errClose := server.Close(); err != nil {
92+
return errors.Wrap(errClose, "failed to close prometheus server")
93+
}
94+
return nil
95+
})
96+
})
97+
defer stopAfter()
98+
99+
if err = g.Wait(); !errors.Is(err, http.ErrServerClosed) {
100+
return errors.Wrap(err, "prometheus server stopped with an error")
101+
}
102+
return nil
103+
}
104+
105+
// URL returns the prometheus server URL.
106+
func (p *Provider) URL() string {
107+
return p.url
108+
}
109+
110+
// NewCounter creates a new prometheus counter.
111+
func (p *Provider) NewCounter(opts prometheus.CounterOpts) prometheus.Counter {
112+
c := prometheus.NewCounter(opts)
113+
p.registry.MustRegister(c)
114+
return c
115+
}
116+
117+
// NewCounterVec creates a new prometheus counter vector.
118+
func (p *Provider) NewCounterVec(opts prometheus.CounterOpts, labels []string) *prometheus.CounterVec {
119+
cv := prometheus.NewCounterVec(opts, labels)
120+
p.registry.MustRegister(cv)
121+
return cv
122+
}
123+
124+
// NewGauge creates a new prometheus gauge.
125+
func (p *Provider) NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge {
126+
g := prometheus.NewGauge(opts)
127+
p.registry.MustRegister(g)
128+
return g
129+
}
130+
131+
// NewGaugeVec creates a new prometheus gauge vector.
132+
func (p *Provider) NewGaugeVec(opts prometheus.GaugeOpts, labels []string) *prometheus.GaugeVec {
133+
gv := prometheus.NewGaugeVec(opts, labels)
134+
p.registry.MustRegister(gv)
135+
return gv
136+
}
137+
138+
// NewHistogram creates a new prometheus histogram.
139+
func (p *Provider) NewHistogram(opts prometheus.HistogramOpts) prometheus.Histogram {
140+
h := prometheus.NewHistogram(opts)
141+
p.registry.MustRegister(h)
142+
return h
143+
}
144+
145+
// NewHistogramVec creates a new prometheus histogram vector.
146+
func (p *Provider) NewHistogramVec(opts prometheus.HistogramOpts, labels []string) *prometheus.HistogramVec {
147+
hv := prometheus.NewHistogramVec(opts, labels)
148+
p.registry.MustRegister(hv)
149+
return hv
150+
}
151+
152+
// Registry returns the prometheus registry.
153+
func (p *Provider) Registry() *prometheus.Registry {
154+
return p.registry
155+
}
156+
157+
// MakeMetricsURL construct the Prometheus metrics URL.
158+
func MakeMetricsURL(address string) (string, error) {
159+
return url.JoinPath(scheme, address, metricsSubPath)
160+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"net"
11+
"os/exec"
12+
"strconv"
13+
"strings"
14+
15+
"github.com/pkg/errors"
16+
)
17+
18+
const grpcProtocol = "tcp"
19+
20+
type Endpoint struct {
21+
Host string
22+
Port int
23+
}
24+
25+
// Address returns a string representation of the endpoint's address.
26+
func (e *Endpoint) Address() string {
27+
return net.JoinHostPort(e.Host, strconv.Itoa(e.Port))
28+
}
29+
30+
type ServerConfig struct {
31+
Endpoint Endpoint
32+
preAllocatedListener net.Listener
33+
}
34+
35+
// Listener instantiate a [net.Listener] and updates the config port with the effective port.
36+
func (c *ServerConfig) Listener() (net.Listener, error) {
37+
if c.preAllocatedListener != nil {
38+
return c.preAllocatedListener, nil
39+
}
40+
listener, err := net.Listen(grpcProtocol, c.Endpoint.Address())
41+
if err != nil {
42+
return nil, errors.Wrap(err, "failed to listen")
43+
}
44+
45+
addr := listener.Addr()
46+
tcpAddress, ok := addr.(*net.TCPAddr)
47+
if !ok {
48+
return nil, errors.New(strings.Join([]string{"failed to cast to TCP address", listener.Close().Error()}, "; "))
49+
}
50+
c.Endpoint.Port = tcpAddress.Port
51+
52+
logger.Infof("Listening on: %s://%s", grpcProtocol, c.Endpoint.Address())
53+
return listener, nil
54+
}
55+
56+
// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
57+
// It stores the listener object internally to be reused on subsequent calls to Listener().
58+
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
59+
listener, err := c.Listener()
60+
if err != nil {
61+
return nil, err
62+
}
63+
c.preAllocatedListener = listener
64+
return listener, nil
65+
}
66+
67+
func FQDN() (string, error) {
68+
out, err := exec.Command("hostname", "--fqdn").Output()
69+
if err != nil {
70+
return "", err
71+
}
72+
return string(out), nil
73+
}

0 commit comments

Comments
 (0)