Skip to content

Commit 9fcaa61

Browse files
committed
refactored metrics and counters
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent 078812f commit 9fcaa61

File tree

338 files changed

+51838
-2124
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

338 files changed

+51838
-2124
lines changed

common/monitoring/monitor.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"net"
13+
14+
"github.com/hyperledger/fabric-lib-go/common/flogging"
15+
"github.com/hyperledger/fabric-x-orderer/common/types"
16+
)
17+
18+
type Monitor struct {
19+
Provider *Provider
20+
logger types.Logger
21+
endpoint Endpoint
22+
// stop is used to stop the monitoring service
23+
stop context.CancelFunc
24+
listener net.Listener
25+
}
26+
27+
func NewMonitor(endpoint Endpoint, prefix string) *Monitor {
28+
logger := flogging.MustGetLogger(fmt.Sprintf("%s.monitoring", prefix))
29+
return &Monitor{Provider: NewProvider(logger), endpoint: endpoint, logger: logger}
30+
}
31+
32+
func (m *Monitor) Start() {
33+
ctx, cancel := context.WithCancel(context.Background())
34+
m.stop = cancel
35+
36+
var err error
37+
serverConfig := ServerConfig{Endpoint: &m.endpoint, logger: m.logger}
38+
m.listener, err = serverConfig.Listener()
39+
if err != nil {
40+
m.logger.Panicf("%v", err)
41+
}
42+
m.endpoint.Port = serverConfig.Endpoint.Port
43+
44+
go func() {
45+
m.Provider.StartPrometheusServer(ctx, m.listener)
46+
}()
47+
}
48+
49+
func (m *Monitor) Stop() {
50+
if m.stop != nil {
51+
m.stop()
52+
}
53+
if m.listener != nil {
54+
m.listener.Close()
55+
}
56+
}
57+
58+
func (m *Monitor) Address() string {
59+
return fmt.Sprintf("http://%s/metrics", m.endpoint.Address())
60+
}

common/monitoring/provider.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"net"
12+
"net/http"
13+
"net/url"
14+
"time"
15+
16+
"github.com/hyperledger/fabric-lib-go/common/metrics"
17+
"github.com/hyperledger/fabric-x-orderer/common/types"
18+
"github.com/pkg/errors"
19+
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/prometheus/client_golang/prometheus/promhttp"
21+
"golang.org/x/sync/errgroup"
22+
)
23+
24+
const (
25+
scheme = "http://"
26+
metricsSubPath = "/metrics"
27+
)
28+
29+
// Provider is a prometheus metrics provider.
30+
type Provider struct {
31+
logger types.Logger
32+
registry *prometheus.Registry
33+
url string
34+
}
35+
36+
// NewProvider creates a new prometheus metrics provider.
37+
func NewProvider(logger types.Logger) *Provider {
38+
return &Provider{logger: logger, registry: prometheus.NewRegistry()}
39+
}
40+
41+
// StartPrometheusServer starts a prometheus server.
42+
// It also starts the given monitoring methods. Their context will cancel once the server is cancelled.
43+
// This method returns once the server is shutdown and all monitoring methods returns.
44+
func (p *Provider) StartPrometheusServer(
45+
ctx context.Context, listener net.Listener, monitor ...func(context.Context),
46+
) error {
47+
p.logger.Debugf("Creating prometheus server")
48+
mux := http.NewServeMux()
49+
mux.Handle(
50+
metricsSubPath,
51+
promhttp.HandlerFor(
52+
p.Registry(),
53+
promhttp.HandlerOpts{
54+
Registry: p.Registry(),
55+
},
56+
),
57+
)
58+
server := &http.Server{
59+
ReadTimeout: 30 * time.Second,
60+
Handler: mux,
61+
}
62+
63+
var err error
64+
p.url, err = MakeMetricsURL(listener.Addr().String())
65+
if err != nil {
66+
return errors.Wrap(err, "failed formatting URL")
67+
}
68+
69+
g, gCtx := errgroup.WithContext(ctx)
70+
g.Go(func() error {
71+
p.logger.Infof("Prometheus serving on URL: %s", p.url)
72+
defer p.logger.Infof("Prometheus stopped serving")
73+
return server.Serve(listener)
74+
})
75+
76+
// The following ensures the method does not return before all monitor methods return.
77+
for _, m := range monitor {
78+
g.Go(func() error {
79+
m(gCtx)
80+
return nil
81+
})
82+
}
83+
84+
// The following ensures the method does not return before the close procedure is complete.
85+
stopAfter := context.AfterFunc(ctx, func() {
86+
g.Go(func() error {
87+
if errClose := server.Close(); err != nil {
88+
return errors.Wrap(errClose, "failed to close prometheus server")
89+
}
90+
return nil
91+
})
92+
})
93+
defer stopAfter()
94+
95+
if err = g.Wait(); !errors.Is(err, http.ErrServerClosed) {
96+
return errors.Wrap(err, "prometheus server stopped with an error")
97+
}
98+
return nil
99+
}
100+
101+
// URL returns the prometheus server URL.
102+
func (p *Provider) URL() string {
103+
return p.url
104+
}
105+
106+
// MakeMetricsURL construct the Prometheus metrics URL.
107+
func MakeMetricsURL(address string) (string, error) {
108+
return url.JoinPath(scheme, address, metricsSubPath)
109+
}
110+
111+
func (p *Provider) NewCounter(o metrics.CounterOpts) metrics.Counter {
112+
c := &Counter{
113+
cv: prometheus.NewCounterVec(
114+
prometheus.CounterOpts{
115+
Namespace: o.Namespace,
116+
Subsystem: o.Subsystem,
117+
Name: o.Name,
118+
Help: o.Help,
119+
},
120+
o.LabelNames,
121+
),
122+
}
123+
124+
p.registry.MustRegister(c.cv)
125+
return c
126+
}
127+
128+
func (p *Provider) NewGauge(o metrics.GaugeOpts) metrics.Gauge {
129+
g := &Gauge{
130+
gv: prometheus.NewGaugeVec(
131+
prometheus.GaugeOpts{
132+
Namespace: o.Namespace,
133+
Subsystem: o.Subsystem,
134+
Name: o.Name,
135+
Help: o.Help,
136+
},
137+
o.LabelNames,
138+
),
139+
}
140+
141+
p.registry.MustRegister(g.gv)
142+
return g
143+
}
144+
145+
func (p *Provider) NewHistogram(o metrics.HistogramOpts) metrics.Histogram {
146+
h := &Histogram{
147+
hv: prometheus.NewHistogramVec(
148+
prometheus.HistogramOpts{
149+
Namespace: o.Namespace,
150+
Subsystem: o.Subsystem,
151+
Name: o.Name,
152+
Help: o.Help,
153+
Buckets: o.Buckets,
154+
},
155+
o.LabelNames,
156+
),
157+
}
158+
159+
p.registry.MustRegister(h.hv)
160+
return h
161+
}
162+
163+
type Counter struct {
164+
prometheus.Counter
165+
cv *prometheus.CounterVec
166+
}
167+
168+
func (c *Counter) With(labelValues ...string) metrics.Counter {
169+
return &Counter{Counter: c.cv.WithLabelValues(labelValues...)}
170+
}
171+
172+
type Gauge struct {
173+
prometheus.Gauge
174+
gv *prometheus.GaugeVec
175+
}
176+
177+
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
178+
return &Gauge{Gauge: g.gv.WithLabelValues(labelValues...)}
179+
}
180+
181+
type Histogram struct {
182+
prometheus.Observer
183+
hv *prometheus.HistogramVec
184+
}
185+
186+
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
187+
return &Histogram{Observer: h.hv.WithLabelValues(labelValues...)}
188+
}
189+
190+
// Registry returns the prometheus registry.
191+
func (p *Provider) Registry() *prometheus.Registry {
192+
return p.registry
193+
}

common/monitoring/server_util.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"net"
11+
"os/exec"
12+
"strconv"
13+
"strings"
14+
15+
"github.com/hyperledger/fabric-x-orderer/common/types"
16+
"github.com/pkg/errors"
17+
)
18+
19+
const protocol = "tcp"
20+
21+
type Endpoint struct {
22+
Host string
23+
Port int
24+
}
25+
26+
// Address returns a string representation of the endpoint's address.
27+
func (e *Endpoint) Address() string {
28+
return net.JoinHostPort(e.Host, strconv.Itoa(e.Port))
29+
}
30+
31+
type ServerConfig struct {
32+
Endpoint *Endpoint
33+
preAllocatedListener net.Listener
34+
logger types.Logger
35+
}
36+
37+
// Listener instantiate a [net.Listener] and updates the config port with the effective port.
38+
func (s *ServerConfig) Listener() (net.Listener, error) {
39+
if s.preAllocatedListener != nil {
40+
return s.preAllocatedListener, nil
41+
}
42+
listener, err := net.Listen(protocol, s.Endpoint.Address())
43+
if err != nil {
44+
return nil, errors.Wrap(err, "failed to listen")
45+
}
46+
47+
addr := listener.Addr()
48+
tcpAddress, ok := addr.(*net.TCPAddr)
49+
if !ok {
50+
return nil, errors.New(strings.Join([]string{"failed to cast to TCP address", listener.Close().Error()}, "; "))
51+
}
52+
s.Endpoint.Port = tcpAddress.Port
53+
54+
s.logger.Infof("Listening on: %s://%s", protocol, s.Endpoint.Address())
55+
return listener, nil
56+
}
57+
58+
// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
59+
// It stores the listener object internally to be reused on subsequent calls to Listener().
60+
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
61+
listener, err := c.Listener()
62+
if err != nil {
63+
return nil, err
64+
}
65+
c.preAllocatedListener = listener
66+
return listener, nil
67+
}
68+
69+
func FQDN() (string, error) {
70+
out, err := exec.Command("hostname", "--fqdn").Output()
71+
if err != nil {
72+
return "", err
73+
}
74+
return string(out), nil
75+
}

config/config.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ func (config *Configuration) ExtractRouterConfig(configBlock *common.Block) *nod
256256
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
257257
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
258258
Bundle: bundle,
259+
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
260+
MonitoringInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringInterval,
259261
}
260262
return routerConfig
261263
}
@@ -284,8 +286,9 @@ func (config *Configuration) ExtractBatcherConfig(configBlock *common.Block) *no
284286
RequestMaxBytes: config.SharedConfig.BatchingConfig.RequestMaxBytes,
285287
SubmitTimeout: config.LocalConfig.NodeLocalConfig.BatcherParams.SubmitTimeout,
286288
BatchSequenceGap: types.BatchSequence(config.LocalConfig.NodeLocalConfig.BatcherParams.BatchSequenceGap),
289+
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
290+
MonitoringInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringInterval,
287291
ClientSignatureVerificationRequired: config.LocalConfig.NodeLocalConfig.GeneralConfig.ClientSignatureVerificationRequired,
288-
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
289292
}
290293

291294
if batcherConfig.FirstStrikeThreshold, err = time.ParseDuration(config.SharedConfig.BatchingConfig.BatchTimeouts.FirstStrikeThreshold); err != nil {
@@ -317,17 +320,19 @@ func (config *Configuration) ExtractConsenterConfig() *nodeconfig.ConsenterNodeC
317320
panic(fmt.Sprintf("error launching consenter, failed extracting consenter config: %s", err))
318321
}
319322
consenterConfig := &nodeconfig.ConsenterNodeConfig{
320-
Shards: config.ExtractShards(),
321-
Consenters: config.ExtractConsenters(),
322-
Directory: config.LocalConfig.NodeLocalConfig.FileStore.Path,
323-
ListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenPort)),
324-
PartyId: config.LocalConfig.NodeLocalConfig.PartyID,
325-
TLSPrivateKeyFile: config.LocalConfig.TLSConfig.PrivateKey,
326-
TLSCertificateFile: config.LocalConfig.TLSConfig.Certificate,
327-
SigningPrivateKey: signingPrivateKey,
328-
WALDir: DefaultConsenterNodeConfigParams(config.LocalConfig.NodeLocalConfig.FileStore.Path).WALDir,
329-
BFTConfig: BFTConfig,
330-
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
323+
Shards: config.ExtractShards(),
324+
Consenters: config.ExtractConsenters(),
325+
Directory: config.LocalConfig.NodeLocalConfig.FileStore.Path,
326+
ListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenPort)),
327+
PartyId: config.LocalConfig.NodeLocalConfig.PartyID,
328+
TLSPrivateKeyFile: config.LocalConfig.TLSConfig.PrivateKey,
329+
TLSCertificateFile: config.LocalConfig.TLSConfig.Certificate,
330+
SigningPrivateKey: signingPrivateKey,
331+
WALDir: DefaultConsenterNodeConfigParams(config.LocalConfig.NodeLocalConfig.FileStore.Path).WALDir,
332+
BFTConfig: BFTConfig,
333+
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
334+
MonitoringInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringInterval,
335+
MetricsLogInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MetricsLogInterval,
331336
}
332337
return consenterConfig
333338
}
@@ -358,6 +363,8 @@ func (config *Configuration) ExtractAssemblerConfig() *nodeconfig.AssemblerNodeC
358363
Consenter: consenterFromMyParty,
359364
UseTLS: config.LocalConfig.TLSConfig.Enabled,
360365
ClientAuthRequired: config.LocalConfig.TLSConfig.ClientAuthRequired,
366+
MonitoringListenAddress: config.LocalConfig.NodeLocalConfig.GeneralConfig.ListenAddress + ":" + strconv.Itoa(int(config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringListenPort)),
367+
MonitoringInterval: config.LocalConfig.NodeLocalConfig.GeneralConfig.MonitoringInterval,
361368
}
362369
return assemblerConfig
363370
}

0 commit comments

Comments
 (0)