diff --git a/common/constant/key.go b/common/constant/key.go index f200528785..79289094e0 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -404,6 +404,10 @@ const ( // metrics key const ( + MetadataEnabledKey = "metrics.metadata.enabled" + RegistryEnabledKey = "metrics.registry.enabled" + ConfigCenterEnabledKey = "metrics.config-center.enabled" + RpcEnabledKey = "metrics.rpc.enabled" AggregationEnabledKey = "aggregation.enabled" AggregationBucketNumKey = "aggregation.bucket.num" AggregationTimeWindowSecondsKey = "aggregation.time.window.seconds" diff --git a/config/metric_config.go b/config/metric_config.go index 0859b57ccd..af41eb8532 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -35,13 +35,17 @@ import ( // MetricConfig This is the config struct for all metrics implementation type MetricConfig struct { - Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"` - Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"` - Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"` - Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` - Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"` - Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"` - rootConfig *RootConfig + Enable *bool `default:"false" yaml:"enable" json:"enable,omitempty" property:"enable"` + Port string `default:"9090" yaml:"port" json:"port,omitempty" property:"port"` + Path string `default:"/metrics" yaml:"path" json:"path,omitempty" property:"path"` + Protocol string `default:"prometheus" yaml:"protocol" json:"protocol,omitempty" property:"protocol"` + EnableMetadata *bool `default:"true" yaml:"enable-metadata" json:"enable-metadata,omitempty" property:"enable-metadata"` + EnableRegistry *bool `default:"true" yaml:"enable-registry" json:"enable-registry,omitempty" property:"enable-registry"` + EnableConfigCenter *bool `default:"true" yaml:"enable-config-center" json:"enable-config-center,omitempty" property:"enable-config-center"` + EnableRpc *bool `default:"true" yaml:"enable-rpc" json:"enable-rpc,omitempty" property:"enable-rpc"` + Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus" property:"prometheus"` + Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation" property:"aggregation"` + rootConfig *RootConfig } type AggregateConfig struct { @@ -101,6 +105,26 @@ func NewMetricConfigBuilder() *MetricConfigBuilder { return &MetricConfigBuilder{metricConfig: &MetricConfig{}} } +func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool) *MetricConfigBuilder { + mcb.metricConfig.EnableMetadata = &enabled + return mcb +} + +func (mcb *MetricConfigBuilder) SetRegistryEnabled(enabled bool) *MetricConfigBuilder { + mcb.metricConfig.EnableRegistry = &enabled + return mcb +} + +func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool) *MetricConfigBuilder { + mcb.metricConfig.EnableConfigCenter = &enabled + return mcb +} + +func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool) *MetricConfigBuilder { + mcb.metricConfig.EnableRpc = &enabled + return mcb +} + func (mcb *MetricConfigBuilder) Build() *MetricConfig { return mcb.metricConfig } @@ -113,11 +137,15 @@ func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) { // prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false func (mc *MetricConfig) toURL() *common.URL { url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol)) - url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable)) + url.SetParam(constant.PrometheusExporterEnabledKey, strconv.FormatBool(*mc.Enable)) // for compatibility url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port) url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path) url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name) url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version) + url.SetParam(constant.MetadataEnabledKey, strconv.FormatBool(*mc.EnableMetadata)) + url.SetParam(constant.RegistryEnabledKey, strconv.FormatBool(*mc.EnableRegistry)) + url.SetParam(constant.ConfigCenterEnabledKey, strconv.FormatBool(*mc.EnableConfigCenter)) + url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc)) if mc.Aggregation != nil { url.SetParam(constant.AggregationEnabledKey, strconv.FormatBool(*mc.Aggregation.Enabled)) url.SetParam(constant.AggregationBucketNumKey, strconv.Itoa(mc.Aggregation.BucketNum)) diff --git a/config/metric_config_test.go b/config/metric_config_test.go index 70dce11b1e..31a0ac6b7b 100644 --- a/config/metric_config_test.go +++ b/config/metric_config_test.go @@ -26,9 +26,17 @@ import ( ) func TestMetricConfigBuilder(t *testing.T) { - config := NewMetricConfigBuilder().Build() - err := config.Init(&RootConfig{Application: &ApplicationConfig{Name: "dubbo", Version: "1.0.0"}}) - assert.NoError(t, err) - reporterConfig := config.ToReporterConfig() - assert.Equal(t, string(reporterConfig.Mode), "pull") + config := NewMetricConfigBuilder(). + SetConfigCenterEnabled(false). + SetMetadataEnabled(false). + SetRegistryEnabled(false). + SetRpcEnabled(false). + Build() + enable := false + assert.Equal(t, &MetricConfig{ + EnableConfigCenter: &enable, + EnableMetadata: &enable, + EnableRegistry: &enable, + EnableRpc: &enable, + }, config) } diff --git a/metrics/config_center/collector.go b/metrics/config_center/collector.go index 9ae551f0e5..3b4bb1e498 100644 --- a/metrics/config_center/collector.go +++ b/metrics/config_center/collector.go @@ -30,9 +30,11 @@ var ch = make(chan metrics.MetricsEvent, 10) var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed Total") func init() { - metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, _ *common.URL) { - c := &configCenterCollector{r: mr} - c.start() + metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, url *common.URL) { + if url.GetParamBool(constant.ConfigCenterEnabledKey, true) { + c := &configCenterCollector{r: mr} + c.start() + } }) } diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go index 7125fb1f14..8a08e0ff47 100644 --- a/metrics/metadata/collector.go +++ b/metrics/metadata/collector.go @@ -32,9 +32,11 @@ const eventType = constant.MetricsMetadata var ch = make(chan metrics.MetricsEvent, 10) func init() { - metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, _ *common.URL) { - l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}} - l.start() + metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, url *common.URL) { + if url.GetParamBool(constant.MetadataEnabledKey, true) { + l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}} + l.start() + } }) } diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go index f84f2ad552..d108f8f9f6 100644 --- a/metrics/prometheus/registry.go +++ b/metrics/prometheus/registry.go @@ -143,56 +143,62 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) metri func (p *promMetricRegistry) Export() { if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) { - go func() { - mux := http.NewServeMux() - path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath) - port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort) - mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{}))) - srv := &http.Server{Addr: ":" + port, Handler: mux} - extension.AddCustomShutdownCallback(func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - if err := srv.Shutdown(ctx); nil != err { - logger.Fatalf("prometheus server shutdown failed, err: %v", err) - } else { - logger.Info("prometheus server gracefully shutdown success") - } - }) - logger.Infof("prometheus endpoint :%s%s", port, path) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close - logger.Errorf("new prometheus server with error = %v", err) - } - }() + go p.exportHttp() } if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) { - baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey) - if !exist { - logger.Error("no pushgateway url found in config path: metrics.prometheus.pushgateway.bash-url, please check your config file") - return - } - username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "") - password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "") - job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName) - pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval) - pusher := push.New(baseUrl, job).Gatherer(p.gather) - if len(username) != 0 { - pusher.BasicAuth(username, password) + p.exportPushgateway() + } +} + +func (p *promMetricRegistry) exportHttp() { + mux := http.NewServeMux() + path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, constant.PrometheusDefaultMetricsPath) + port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, constant.PrometheusDefaultMetricsPort) + mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{}))) + srv := &http.Server{Addr: ":" + port, Handler: mux} + extension.AddCustomShutdownCallback(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); nil != err { + logger.Fatalf("prometheus server shutdown failed, err: %v", err) + } else { + logger.Info("prometheus server gracefully shutdown success") } - logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval) - ticker := time.NewTicker(time.Duration(pushInterval) * time.Second) - go func() { - for range ticker.C { - err := pusher.Add() - if err != nil { - logger.Errorf("push metric data to prometheus push gateway error", err) - } else { - logger.Debugf("prometheus pushgateway push to %s success", baseUrl) - } - } - }() + }) + logger.Infof("prometheus endpoint :%s%s", port, path) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { // except Shutdown or Close + logger.Errorf("new prometheus server with error: %v", err) } } +func (p *promMetricRegistry) exportPushgateway() { + baseUrl, exist := p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey) + if !exist { + logger.Error("no pushgateway base url found in config path: metrics.prometheus.pushgateway.base-url, please check your config") + return + } + username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "") + password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "") + job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, constant.PrometheusDefaultJobName) + pushInterval := p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, constant.PrometheusDefaultPushInterval) + pusher := push.New(baseUrl, job).Gatherer(p.gather) + if len(username) != 0 { + pusher.BasicAuth(username, password) + } + logger.Infof("prometheus pushgateway will push to %s every %d seconds", baseUrl, pushInterval) + ticker := time.NewTicker(time.Duration(pushInterval) * time.Second) + go func() { + for range ticker.C { + err := pusher.Add() + if err != nil { + logger.Errorf("push metric data to prometheus pushgateway error: %v", err) + } else { + logger.Debugf("prometheus pushgateway push to %s success", baseUrl) + } + } + }() +} + func (p *promMetricRegistry) Scrape() (string, error) { gathering, err := p.gather.Gather() if err != nil { diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go index 53a5d71b30..871dd469be 100644 --- a/metrics/registry/collector.go +++ b/metrics/registry/collector.go @@ -28,9 +28,11 @@ var ( ) func init() { - metrics.AddCollector("registry", func(m metrics.MetricRegistry, _ *common.URL) { - rc := ®istryCollector{metrics.BaseCollector{R: m}} - go rc.start() + metrics.AddCollector("registry", func(m metrics.MetricRegistry, url *common.URL) { + if url.GetParamBool(constant.RegistryEnabledKey, true) { + rc := ®istryCollector{metrics.BaseCollector{R: m}} + go rc.start() + } }) } diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go index dc9fb53347..d9e9f0551a 100644 --- a/metrics/rpc/collector.go +++ b/metrics/rpc/collector.go @@ -33,12 +33,14 @@ var ( // init will add the rpc collectorFunc to metrics.collectors slice, and lazy start the rpc collector goroutine func init() { - collectorFunc := func(registry metrics.MetricRegistry, c *common.URL) { - rc := &rpcCollector{ - registry: registry, - metricSet: buildMetricSet(registry), + collectorFunc := func(registry metrics.MetricRegistry, url *common.URL) { + if url.GetParamBool(constant.RpcEnabledKey, true) { + rc := &rpcCollector{ + registry: registry, + metricSet: buildMetricSet(registry), + } + go rc.start() } - go rc.start() } metrics.AddCollector("rpc", collectorFunc)