Skip to content

Commit

Permalink
Merge remote-tracking branch 'chickenlj/router-enhance' into router-e…
Browse files Browse the repository at this point in the history
…nhance
  • Loading branch information
chickenlj committed Aug 24, 2023
2 parents 4fdf50b + 06f3369 commit eee3001
Show file tree
Hide file tree
Showing 13 changed files with 629 additions and 201 deletions.
6 changes: 4 additions & 2 deletions common/host_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
)

var localIp string
var localHostname string
var (
localIp string
localHostname string
)

func GetLocalIp() string {
if len(localIp) != 0 {
Expand Down
4 changes: 4 additions & 0 deletions common/host_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func TestGetLocalIp(t *testing.T) {
assert.NotNil(t, GetLocalIp())
}

func TestGetLocalHostName(t *testing.T) {
assert.NotNil(t, GetLocalHostName())
}

func TestHandleRegisterIPAndPort(t *testing.T) {
url := NewURLWithOptions(WithIp("1.2.3.4"), WithPort("20000"))
HandleRegisterIPAndPort(url)
Expand Down
7 changes: 6 additions & 1 deletion config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ func (s *ServiceConfig) IsExport() bool {
func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
ports := list.New()
for _, proto := range protocolConfigs {
if len(proto.Port) > 0 {
if port, err := strconv.Atoi(proto.Port); err != nil {
logger.Infof(
"%s will be assgined to a random port, since the port is an invalid number",
proto.Name,
)
} else if port > 0 {
continue
}

Expand Down
51 changes: 30 additions & 21 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

var registries = make(map[string]func(*ReporterConfig) MetricRegistry)
var collectors = make([]CollectorFunc, 0)
var registry MetricRegistry
var (
registries = make(map[string]func(*ReporterConfig) MetricRegistry)
collectors = make([]CollectorFunc, 0)
registry MetricRegistry
)

// CollectorFunc used to extend more indicators
type CollectorFunc func(MetricRegistry, *ReporterConfig)
Expand Down Expand Up @@ -59,15 +61,22 @@ func AddCollector(name string, fun func(MetricRegistry, *ReporterConfig)) {

// MetricRegistry data container,data compute、expose、agg
type MetricRegistry interface {
Counter(*MetricId) CounterMetric // add or update a counter
Gauge(*MetricId) GaugeMetric // add or update a gauge
Histogram(*MetricId) HistogramMetric // add a metric num to a histogram
Summary(*MetricId) SummaryMetric // add a metric num to a summary
Export() // expose metric data, such as Prometheus http exporter
Counter(*MetricId) CounterMetric // add or update a counter
Gauge(*MetricId) GaugeMetric // add or update a gauge
Histogram(*MetricId) ObservableMetric // add a metric num to a histogram
Summary(*MetricId) ObservableMetric // add a metric num to a summary
Rt(*MetricId, *RtOpts) ObservableMetric // add a metric num to a rt
Export() // expose metric data, such as Prometheus http exporter
// GetMetrics() []*MetricSample // get all metric data
// GetMetricsString() (string, error) // get text format metric data
}

type RtOpts struct {
Aggregate bool
BucketNum int // only for aggRt
TimeWindowSeconds int64 // only for aggRt
}

// multi registry,like micrometer CompositeMeterRegistry
// type CompositeRegistry struct {
// rs []MetricRegistry
Expand Down Expand Up @@ -131,14 +140,9 @@ type GaugeMetric interface {
// Sub(float64)
}

// HistogramMetric histogram metric
type HistogramMetric interface {
Record(float64)
}

// SummaryMetric summary metric
type SummaryMetric interface {
Record(float64)
// histogram summary rt metric
type ObservableMetric interface {
Observe(float64)
}

// StatesMetrics multi metrics,include total,success num, fail num,call MetricsRegistry save data
Expand Down Expand Up @@ -194,13 +198,13 @@ type TimeMetric interface {

const (
defaultBucketNum = 10
defalutTimeWindowSeconds = 120
defaultTimeWindowSeconds = 120
)

// NewTimeMetric init and write all data to registry
func NewTimeMetric(min, max, avg, sum, last *MetricId, mr MetricRegistry) TimeMetric {
return &DefaultTimeMetric{r: mr, min: min, max: max, avg: avg, sum: sum, last: last,
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defalutTimeWindowSeconds)}
agg: aggregate.NewTimeWindowAggregator(defaultBucketNum, defaultTimeWindowSeconds)}
}

type DefaultTimeMetric struct {
Expand Down Expand Up @@ -232,8 +236,13 @@ func ComputeIfAbsentCache(key string, supplier func() interface{}) interface{} {
} else {
metricsCacheMutex.Lock()
defer metricsCacheMutex.Unlock()
n := supplier()
metricsCache[key] = n
return n
v, ok = metricsCache[key] // double check,avoid overwriting
if ok {
return v
} else {
n := supplier()
metricsCache[key] = n
return n
}
}
}
165 changes: 57 additions & 108 deletions metrics/prometheus/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

import (
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/prometheus/common/expfmt"
)
Expand All @@ -35,97 +34,96 @@ import (

func init() {
metrics.SetRegistry("prometheus", func(rc *metrics.ReporterConfig) metrics.MetricRegistry {
return &promMetricRegistry{
cvm: make(map[string]*prom.CounterVec),
gvm: make(map[string]*prom.GaugeVec),
hvm: make(map[string]*prom.HistogramVec),
svm: make(map[string]*prom.SummaryVec),
}
return &promMetricRegistry{r: prom.DefaultRegisterer}
})
}

type promMetricRegistry struct {
mtx sync.RWMutex // Protects metrics.
cvm map[string]*prom.CounterVec // prom.CounterVec
gvm map[string]*prom.GaugeVec // prom.GaugeVec
hvm map[string]*prom.HistogramVec // prom.HistogramVec
svm map[string]*prom.SummaryVec // prom.SummaryVec
r prom.Registerer // for convenience of testing
vecs sync.Map
}

func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric {
p.mtx.RLock()
vec, ok := p.cvm[m.Name]
p.mtx.RUnlock()
func (p *promMetricRegistry) getOrComputeVec(key string, supplier func() interface{}) interface{} {
v, ok := p.vecs.Load(key)
if !ok {
p.mtx.Lock()
vec = promauto.NewCounterVec(prom.CounterOpts{
v, ok = p.vecs.LoadOrStore(key, supplier())
if !ok {
p.r.MustRegister(v.(prom.Collector)) // only registe collector which stored success
}
}
return v
}

func (p *promMetricRegistry) Counter(m *metrics.MetricId) metrics.CounterMetric {
vec := p.getOrComputeVec(m.Name, func() interface{} {
return prom.NewCounterVec(prom.CounterOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
p.cvm[m.Name] = vec
p.mtx.Unlock()
}
c := vec.With(m.Tags)
return &counter{pc: c}
}).(*prom.CounterVec)
return vec.With(m.Tags)
}

func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric {
p.mtx.RLock()
vec, ok := p.gvm[m.Name]
p.mtx.RUnlock()
if !ok {
p.mtx.Lock()
vec = promauto.NewGaugeVec(prom.GaugeOpts{
vec := p.getOrComputeVec(m.Name, func() interface{} {
return prom.NewGaugeVec(prom.GaugeOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
p.gvm[m.Name] = vec
p.mtx.Unlock()
}
g := vec.With(m.Tags)
return &gauge{pg: g}
}).(*prom.GaugeVec)
return vec.With(m.Tags)
}

func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.HistogramMetric {
p.mtx.RLock()
vec, ok := p.hvm[m.Name]
p.mtx.RUnlock()
if !ok {
p.mtx.Lock()
vec = promauto.NewHistogramVec(prom.HistogramOpts{
func (p *promMetricRegistry) Histogram(m *metrics.MetricId) metrics.ObservableMetric {
vec := p.getOrComputeVec(m.Name, func() interface{} {
return prom.NewHistogramVec(prom.HistogramOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
p.hvm[m.Name] = vec
p.mtx.Unlock()
}
h := vec.With(m.Tags)
return &histogram{ph: h.(prom.Histogram)}
}).(*prom.HistogramVec)
return vec.With(m.Tags)
}

func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.SummaryMetric {
p.mtx.RLock()
vec, ok := p.svm[m.Name]
p.mtx.RUnlock()
if !ok {
p.mtx.Lock()
vec = promauto.NewSummaryVec(prom.SummaryOpts{
func (p *promMetricRegistry) Summary(m *metrics.MetricId) metrics.ObservableMetric {
vec := p.getOrComputeVec(m.Name, func() interface{} {
return prom.NewSummaryVec(prom.SummaryOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
p.svm[m.Name] = vec
p.mtx.Unlock()
}).(*prom.SummaryVec)
return vec.With(m.Tags)
}

func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metrics.ObservableMetric {
var supplier func() interface{}
if opts != nil && opts.Aggregate {
supplier = func() interface{} {
// TODO set default aggregate config from config
return NewAggRtVec(&RtOpts{
Name: m.Name,
Help: m.Desc,
bucketNum: opts.BucketNum,
timeWindowSeconds: opts.TimeWindowSeconds,
}, m.TagKeys())
}
} else {
supplier = func() interface{} {
return NewRtVec(&RtOpts{
Name: m.Name,
Help: m.Desc,
}, m.TagKeys())
}
}
s := vec.With(m.Tags)
return &summary{ps: s.(prom.Summary)}
vec := p.getOrComputeVec(m.Name, supplier).(*RtVec)
return vec.With(m.Tags)
}

func (p *promMetricRegistry) Export() {

// use promauto export global, TODO move here
}

func (p *promMetricRegistry) Scrape() (string, error) {
r := prom.DefaultRegisterer.(*prom.Registry)
r := p.r.(prom.Gatherer)
gathering, err := r.Gather()
if err != nil {
return "", err
Expand All @@ -138,52 +136,3 @@ func (p *promMetricRegistry) Scrape() (string, error) {
}
return out.String(), nil
}

type counter struct {
pc prom.Counter
}

func (c *counter) Inc() {
c.pc.Inc()
}
func (c *counter) Add(v float64) {
c.pc.Add(v)
}

type gauge struct {
pg prom.Gauge
}

// func (g *gauge) Inc() {
// g.pg.Inc()
// }
//
// func (g *gauge) Dec() {
// g.pg.Dec()
// }
func (g *gauge) Set(v float64) {
g.pg.Set(v)
}

// func (g *gauge) Add(v float64) {
// g.pg.Add(v)
// }
// func (g *gauge) Sub(v float64) {
// g.pg.Sub(v)
// }

type histogram struct {
ph prom.Histogram
}

func (h *histogram) Record(v float64) {
h.ph.Observe(v)
}

type summary struct {
ps prom.Summary
}

func (s *summary) Record(v float64) {
s.ps.Observe(v)
}
Loading

0 comments on commit eee3001

Please sign in to comment.