Skip to content

Commit b0a8d71

Browse files
committed
fixed a bug
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent 9342865 commit b0a8d71

File tree

345 files changed

+52422
-2130
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

345 files changed

+52422
-2130
lines changed

common/monitoring/monitor.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"net"
13+
"time"
14+
15+
"github.com/hyperledger/fabric-x-orderer/common/types"
16+
)
17+
18+
type MonitoringMetric struct {
19+
Interval time.Duration
20+
Value *uint64
21+
Labels []string
22+
}
23+
24+
type Monitor struct {
25+
Provider *Provider
26+
logger types.Logger
27+
endpoint Endpoint
28+
// stop is used to stop the monitoring service
29+
stop context.CancelFunc
30+
listener net.Listener
31+
}
32+
33+
func NewMonitor(endpoint Endpoint, logger types.Logger) *Monitor {
34+
provider := NewProvider()
35+
return &Monitor{Provider: provider, endpoint: endpoint, logger: logger}
36+
}
37+
38+
func (m *Monitor) Start() {
39+
ctx, cancel := context.WithCancel(context.Background())
40+
m.stop = cancel
41+
42+
var err error
43+
serverConfig := ServerConfig{Endpoint: &m.endpoint}
44+
m.listener, err = serverConfig.Listener()
45+
if err != nil {
46+
m.logger.Panicf("%v", err)
47+
}
48+
m.endpoint.Port = serverConfig.Endpoint.Port
49+
50+
go func() {
51+
m.Provider.StartPrometheusServer(ctx, m.listener)
52+
}()
53+
}
54+
55+
func (m *Monitor) Stop() {
56+
if m.stop != nil {
57+
m.stop()
58+
}
59+
if m.listener != nil {
60+
m.listener.Close()
61+
}
62+
}
63+
64+
func (m *Monitor) Address() string {
65+
return fmt.Sprintf("http://%s/metrics", m.endpoint.Address())
66+
}

common/monitoring/provider.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"context"
11+
"net"
12+
"net/http"
13+
"net/url"
14+
"time"
15+
16+
kitmetrics "github.com/go-kit/kit/metrics"
17+
"github.com/go-kit/kit/metrics/prometheus"
18+
"github.com/hyperledger/fabric-lib-go/common/flogging"
19+
"github.com/hyperledger/fabric-lib-go/common/metrics"
20+
"github.com/pkg/errors"
21+
prom "github.com/prometheus/client_golang/prometheus"
22+
"github.com/prometheus/client_golang/prometheus/promhttp"
23+
"golang.org/x/sync/errgroup"
24+
)
25+
26+
const (
27+
scheme = "http://"
28+
metricsSubPath = "/metrics"
29+
)
30+
31+
// Provider is a prometheus metrics provider.
32+
type Provider struct {
33+
// registry *prom.Registry
34+
url string
35+
}
36+
37+
var logger = flogging.MustGetLogger("monitoring")
38+
39+
// NewProvider creates a new prometheus metrics provider.
40+
func NewProvider() *Provider {
41+
return &Provider{}
42+
}
43+
44+
// StartPrometheusServer starts a prometheus server.
45+
// It also starts the given monitoring methods. Their context will cancel once the server is cancelled.
46+
// This method returns once the server is shutdown and all monitoring methods returns.
47+
func (p *Provider) StartPrometheusServer(
48+
ctx context.Context, listener net.Listener, monitor ...func(context.Context),
49+
) error {
50+
logger.Debugf("Creating prometheus server")
51+
mux := http.NewServeMux()
52+
mux.Handle(
53+
metricsSubPath,
54+
promhttp.Handler(),
55+
)
56+
server := &http.Server{
57+
ReadTimeout: 30 * time.Second,
58+
Handler: mux,
59+
}
60+
61+
// l, err := serverConfig.Listener()
62+
// if err != nil {
63+
// return errors.Wrap(err, "failed to start prometheus server")
64+
// }
65+
// defer l.Close()
66+
67+
var err error
68+
p.url, err = MakeMetricsURL(listener.Addr().String())
69+
if err != nil {
70+
return errors.Wrap(err, "failed formatting URL")
71+
}
72+
73+
g, gCtx := errgroup.WithContext(ctx)
74+
g.Go(func() error {
75+
logger.Infof("Prometheus serving on URL: %s", p.url)
76+
defer logger.Info("Prometheus stopped serving")
77+
return server.Serve(listener)
78+
})
79+
80+
// The following ensures the method does not return before all monitor methods return.
81+
for _, m := range monitor {
82+
g.Go(func() error {
83+
m(gCtx)
84+
return nil
85+
})
86+
}
87+
88+
// The following ensures the method does not return before the close procedure is complete.
89+
stopAfter := context.AfterFunc(ctx, func() {
90+
g.Go(func() error {
91+
if errClose := server.Close(); err != nil {
92+
return errors.Wrap(errClose, "failed to close prometheus server")
93+
}
94+
return nil
95+
})
96+
})
97+
defer stopAfter()
98+
99+
if err = g.Wait(); !errors.Is(err, http.ErrServerClosed) {
100+
return errors.Wrap(err, "prometheus server stopped with an error")
101+
}
102+
return nil
103+
}
104+
105+
// URL returns the prometheus server URL.
106+
func (p *Provider) URL() string {
107+
return p.url
108+
}
109+
110+
// MakeMetricsURL construct the Prometheus metrics URL.
111+
func MakeMetricsURL(address string) (string, error) {
112+
return url.JoinPath(scheme, address, metricsSubPath)
113+
}
114+
115+
func (p *Provider) NewCounter(o metrics.CounterOpts) metrics.Counter {
116+
return &Counter{
117+
Counter: prometheus.NewCounterFrom(
118+
prom.CounterOpts{
119+
Namespace: o.Namespace,
120+
Subsystem: o.Subsystem,
121+
Name: o.Name,
122+
Help: o.Help,
123+
},
124+
o.LabelNames,
125+
),
126+
}
127+
}
128+
129+
func (p *Provider) NewGauge(o metrics.GaugeOpts) metrics.Gauge {
130+
return &Gauge{
131+
Gauge: prometheus.NewGaugeFrom(
132+
prom.GaugeOpts{
133+
Namespace: o.Namespace,
134+
Subsystem: o.Subsystem,
135+
Name: o.Name,
136+
Help: o.Help,
137+
},
138+
o.LabelNames,
139+
),
140+
}
141+
}
142+
143+
func (p *Provider) NewHistogram(o metrics.HistogramOpts) metrics.Histogram {
144+
return &Histogram{
145+
Histogram: prometheus.NewHistogramFrom(
146+
prom.HistogramOpts{
147+
Namespace: o.Namespace,
148+
Subsystem: o.Subsystem,
149+
Name: o.Name,
150+
Help: o.Help,
151+
Buckets: o.Buckets,
152+
},
153+
o.LabelNames,
154+
),
155+
}
156+
}
157+
158+
type Counter struct{ kitmetrics.Counter }
159+
160+
func (c *Counter) With(labelValues ...string) metrics.Counter {
161+
return &Counter{Counter: c.Counter.With(labelValues...)}
162+
}
163+
164+
type Gauge struct{ kitmetrics.Gauge }
165+
166+
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
167+
return &Gauge{Gauge: g.Gauge.With(labelValues...)}
168+
}
169+
170+
type Histogram struct{ kitmetrics.Histogram }
171+
172+
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
173+
return &Histogram{Histogram: h.Histogram.With(labelValues...)}
174+
}

common/monitoring/server_util.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package monitoring
8+
9+
import (
10+
"net"
11+
"os/exec"
12+
"strconv"
13+
"strings"
14+
15+
"github.com/pkg/errors"
16+
)
17+
18+
const protocol = "tcp"
19+
20+
type Endpoint struct {
21+
Host string
22+
Port int
23+
}
24+
25+
// Address returns a string representation of the endpoint's address.
26+
func (e *Endpoint) Address() string {
27+
return net.JoinHostPort(e.Host, strconv.Itoa(e.Port))
28+
}
29+
30+
type ServerConfig struct {
31+
Endpoint *Endpoint
32+
preAllocatedListener net.Listener
33+
}
34+
35+
// Listener instantiate a [net.Listener] and updates the config port with the effective port.
36+
func (s *ServerConfig) Listener() (net.Listener, error) {
37+
if s.preAllocatedListener != nil {
38+
return s.preAllocatedListener, nil
39+
}
40+
listener, err := net.Listen(protocol, s.Endpoint.Address())
41+
if err != nil {
42+
return nil, errors.Wrap(err, "failed to listen")
43+
}
44+
45+
addr := listener.Addr()
46+
tcpAddress, ok := addr.(*net.TCPAddr)
47+
if !ok {
48+
return nil, errors.New(strings.Join([]string{"failed to cast to TCP address", listener.Close().Error()}, "; "))
49+
}
50+
s.Endpoint.Port = tcpAddress.Port
51+
52+
logger.Infof("Listening on: %s://%s", protocol, s.Endpoint.Address())
53+
return listener, nil
54+
}
55+
56+
// PreAllocateListener is used to allocate a port and bind to ahead of the server initialization.
57+
// It stores the listener object internally to be reused on subsequent calls to Listener().
58+
func (c *ServerConfig) PreAllocateListener() (net.Listener, error) {
59+
listener, err := c.Listener()
60+
if err != nil {
61+
return nil, err
62+
}
63+
c.preAllocatedListener = listener
64+
return listener, nil
65+
}
66+
67+
func FQDN() (string, error) {
68+
out, err := exec.Command("hostname", "--fqdn").Output()
69+
if err != nil {
70+
return "", err
71+
}
72+
return string(out), nil
73+
}

0 commit comments

Comments
 (0)