diff --git a/engine/executor/http.go b/engine/executor/http.go index 0df6886c73b..ba9303a8614 100644 --- a/engine/executor/http.go +++ b/engine/executor/http.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/tiflow/dm/dm/common" "github.com/pingcap/tiflow/dm/pkg/log" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/pingcap/tiflow/engine/pkg/promutil" ) func httpHandler(lis net.Listener) error { @@ -31,7 +31,7 @@ func httpHandler(lis net.Listener) error { mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - mux.Handle("/metrics", promhttp.Handler()) + mux.Handle("/metrics", promutil.HTTPHandlerForMetric()) httpS := &http.Server{ Handler: mux, diff --git a/engine/executor/metrics.go b/engine/executor/metrics.go deleted file mode 100644 index 49a1291f2d0..00000000000 --- a/engine/executor/metrics.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package executor - -import "github.com/prometheus/client_golang/prometheus" - -// registerMetrics registers metrics for executor server -func registerMetrics() { - registry := prometheus.NewRegistry() - registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - registry.MustRegister(prometheus.NewGoCollector()) - - initServerMetrics(registry) - - prometheus.DefaultGatherer = registry -} diff --git a/engine/executor/metrics_server.go b/engine/executor/metrics_server.go index f9fe0ee0cd3..180eefecec1 100644 --- a/engine/executor/metrics_server.go +++ b/engine/executor/metrics_server.go @@ -14,18 +14,17 @@ package executor import ( + "github.com/pingcap/tiflow/engine/pkg/promutil" "github.com/prometheus/client_golang/prometheus" ) -var executorTaskNumGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "dataflow", - Subsystem: "executor", - Name: "task_num", - Help: "number of task in this executor", - }, []string{"status"}) - -// initServerMetrics registers statistics of executor server -func initServerMetrics(registry *prometheus.Registry) { - registry.MustRegister(executorTaskNumGauge) -} +var ( + executorFactory = promutil.NewFactory4Framework() + executorTaskNumGauge = executorFactory.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "dataflow", + Subsystem: "executor", + Name: "task_num", + Help: "number of task in this executor", + }, []string{"status"}) +) diff --git a/engine/executor/server.go b/engine/executor/server.go index 9269f552ee3..fe90db98dd4 100644 --- a/engine/executor/server.go +++ b/engine/executor/server.go @@ -325,8 +325,6 @@ func (s *Server) Run(ctx context.Context) error { return s.startForTest(ctx) } - registerMetrics() - wg, ctx := errgroup.WithContext(ctx) s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency) s.taskCommitter = worker.NewTaskCommitter(s.taskRunner, defaultTaskPreDispatchRequestTTL) diff --git a/engine/executor/server_test.go b/engine/executor/server_test.go index 2aa5f304692..8b9ceacb396 100644 --- a/engine/executor/server_test.go +++ b/engine/executor/server_test.go @@ -53,7 +53,6 @@ func TestStartTCPSrv(t *testing.T) { s := NewServer(cfg, nil) s.grpcSrv = grpc.NewServer() - registerMetrics() wg, ctx := errgroup.WithContext(context.Background()) err = s.startTCPService(ctx, wg) require.Nil(t, err) @@ -114,7 +113,6 @@ func TestCollectMetric(t *testing.T) { s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency) s.grpcSrv = grpc.NewServer() - registerMetrics() err = s.startTCPService(ctx, wg) require.Nil(t, err) diff --git a/engine/lib/base_jobmaster.go b/engine/lib/base_jobmaster.go index bc2ca45e49e..30ed120e1bd 100644 --- a/engine/lib/base_jobmaster.go +++ b/engine/lib/base_jobmaster.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" + "go.uber.org/zap" "github.com/pingcap/tiflow/engine/executor/worker" libModel "github.com/pingcap/tiflow/engine/lib/model" @@ -179,13 +180,16 @@ func (d *DefaultBaseJobMaster) GetWorkers() map[libModel.WorkerID]WorkerHandle { // Close implements BaseJobMaster.Close func (d *DefaultBaseJobMaster) Close(ctx context.Context) error { - if err := d.impl.CloseImpl(ctx); err != nil { - return errors.Trace(err) + err := d.impl.CloseImpl(ctx) + // We don't return here if CloseImpl return error to ensure + // that we can close inner resources of the framework + if err != nil { + log.L().Error("Failed to close JobMasterImpl", zap.Error(err)) } d.master.doClose() d.worker.doClose() - return nil + return errors.Trace(err) } // OnError implements BaseJobMaster.OnError diff --git a/engine/lib/master.go b/engine/lib/master.go index 82d2b8ed816..b26e14dab56 100644 --- a/engine/lib/master.go +++ b/engine/lib/master.go @@ -435,16 +435,20 @@ func (m *DefaultBaseMaster) doClose() { log.L().Warn("Failed to clean up message handlers", zap.String("master-id", m.id)) } + promutil.UnregisterWorkerMetrics(m.id) } // Close implements BaseMaster.Close func (m *DefaultBaseMaster) Close(ctx context.Context) error { - if err := m.Impl.CloseImpl(ctx); err != nil { - return errors.Trace(err) + err := m.Impl.CloseImpl(ctx) + // We don't return here if CloseImpl return error to ensure + // that we can close inner resources of the framework + if err != nil { + log.L().Error("Failed to close MasterImpl", zap.Error(err)) } m.doClose() - return nil + return errors.Trace(err) } // OnError implements BaseMaster.OnError diff --git a/engine/lib/worker.go b/engine/lib/worker.go index ce5f4f591f6..5b64557f0b8 100644 --- a/engine/lib/worker.go +++ b/engine/lib/worker.go @@ -342,17 +342,20 @@ func (w *DefaultBaseWorker) doClose() { } w.wg.Wait() + promutil.UnregisterWorkerMetrics(w.id) } // Close implements BaseWorker.Close func (w *DefaultBaseWorker) Close(ctx context.Context) error { - if err := w.Impl.CloseImpl(ctx); err != nil { + err := w.Impl.CloseImpl(ctx) + // We don't return here if CloseImpl return error to ensure + // that we can close inner resources of the framework + if err != nil { log.L().Error("Failed to close WorkerImpl", zap.Error(err)) - return errors.Trace(err) } w.doClose() - return nil + return errors.Trace(err) } // ID implements BaseWorker.ID diff --git a/engine/pkg/promutil/implement.go b/engine/pkg/promutil/implement.go index 2d809b5b45e..b88bf5ada52 100644 --- a/engine/pkg/promutil/implement.go +++ b/engine/pkg/promutil/implement.go @@ -87,16 +87,25 @@ func (f *wrappingFactory) NewHistogramVec(opts prometheus.HistogramOpts, labelNa } func wrapCounterOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.CounterOpts) *prometheus.CounterOpts { + if opts.ConstLabels == nil && constLabels != nil { + opts.ConstLabels = make(prometheus.Labels) + } wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels) return opts } func wrapGaugeOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.GaugeOpts) *prometheus.GaugeOpts { + if opts.ConstLabels == nil && constLabels != nil { + opts.ConstLabels = make(prometheus.Labels) + } wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels) return opts } func wrapHistogramOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.HistogramOpts) *prometheus.HistogramOpts { + if opts.ConstLabels == nil && constLabels != nil { + opts.ConstLabels = make(prometheus.Labels) + } wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels) return opts } diff --git a/engine/pkg/promutil/implement_test.go b/engine/pkg/promutil/implement_test.go index 7a2198b48de..a951ed7a3f4 100644 --- a/engine/pkg/promutil/implement_test.go +++ b/engine/pkg/promutil/implement_test.go @@ -69,6 +69,17 @@ func TestWrapCounterOpts(t *testing.T) { }, }, }, + { + constLabels: prometheus.Labels{ + "k2": "v2", + }, + inputOpts: &prometheus.CounterOpts{}, + outputOpts: &prometheus.CounterOpts{ + ConstLabels: prometheus.Labels{ + "k2": "v2", + }, + }, + }, } for _, c := range cases { diff --git a/engine/pkg/promutil/util.go b/engine/pkg/promutil/util.go index 96293e3f58e..8c270835ea2 100644 --- a/engine/pkg/promutil/util.go +++ b/engine/pkg/promutil/util.go @@ -72,3 +72,10 @@ func NewFactory4Worker(info tenant.ProjectInfo, jobType libModel.JobType, jobID func NewFactory4Framework() Factory { return NewFactory4FrameworkImpl(globalMetricRegistry) } + +// UnregisterWorkerMetrics unregisters all metrics of workerID +// IF 'worker' is a job master, use job id as workerID +// IF 'worker' is a worker, use worker id as workerID +func UnregisterWorkerMetrics(workerID libModel.WorkerID) { + globalMetricRegistry.Unregister(workerID) +} diff --git a/engine/servermaster/metrics.go b/engine/servermaster/metrics.go deleted file mode 100644 index 81d21b80f10..00000000000 --- a/engine/servermaster/metrics.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package servermaster - -import "github.com/prometheus/client_golang/prometheus" - -// registerMetrics registers metrics for server master -func registerMetrics() { - registry := prometheus.NewRegistry() - registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - registry.MustRegister(prometheus.NewGoCollector()) - - initServerMetrics(registry) - - prometheus.DefaultGatherer = registry -} diff --git a/engine/servermaster/metrics_server.go b/engine/servermaster/metrics_server.go index da260a29646..a598ce09abd 100644 --- a/engine/servermaster/metrics_server.go +++ b/engine/servermaster/metrics_server.go @@ -14,18 +14,20 @@ package servermaster import ( + "github.com/pingcap/tiflow/engine/pkg/promutil" "github.com/prometheus/client_golang/prometheus" ) var ( - serverExecutorNumGauge = prometheus.NewGaugeVec( + serverFactory = promutil.NewFactory4Framework() + serverExecutorNumGauge = serverFactory.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dataflow", Subsystem: "server_master", Name: "executor_num", Help: "number of executor servers in this cluster", }, []string{"status"}) - serverJobNumGauge = prometheus.NewGaugeVec( + serverJobNumGauge = serverFactory.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "dataflow", Subsystem: "server_master", @@ -33,9 +35,3 @@ var ( Help: "number of jobs in this cluster", }, []string{"status"}) ) - -// initServerMetrics registers statistics of server -func initServerMetrics(registry *prometheus.Registry) { - registry.MustRegister(serverExecutorNumGauge) - registry.MustRegister(serverJobNumGauge) -} diff --git a/engine/servermaster/server.go b/engine/servermaster/server.go index 02d815eaf8e..044ecc6427b 100644 --- a/engine/servermaster/server.go +++ b/engine/servermaster/server.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tiflow/dm/pkg/log" p2pProtocol "github.com/pingcap/tiflow/proto/p2p" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/server/v3/embed" @@ -56,6 +55,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/meta/metaclient" pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm" "github.com/pingcap/tiflow/engine/pkg/p2p" + "github.com/pingcap/tiflow/engine/pkg/promutil" "github.com/pingcap/tiflow/engine/pkg/rpcutil" "github.com/pingcap/tiflow/engine/pkg/serverutils" "github.com/pingcap/tiflow/engine/pkg/tenant" @@ -447,8 +447,6 @@ func (s *Server) Run(ctx context.Context) (err error) { return s.startForTest(ctx) } - registerMetrics() - err = s.registerMetaStore() if err != nil { return err @@ -570,7 +568,7 @@ func (s *Server) startGrpcSrv(ctx context.Context) (err error) { httpHandlers := map[string]http.Handler{ "/debug/": getDebugHandler(), - "/metrics": promhttp.Handler(), + "/metrics": promutil.HTTPHandlerForMetric(), } // generate grpcServer diff --git a/engine/servermaster/server_test.go b/engine/servermaster/server_test.go index 038cb0e8d3b..e9f88c995da 100644 --- a/engine/servermaster/server_test.go +++ b/engine/servermaster/server_test.go @@ -87,7 +87,6 @@ func TestStartGrpcSrv(t *testing.T) { defer cleanup() s := &Server{cfg: cfg} - registerMetrics() ctx := context.Background() err := s.startGrpcSrv(ctx) require.Nil(t, err) @@ -320,7 +319,6 @@ func TestCollectMetric(t *testing.T) { masterAddr, cfg, cleanup := prepareServerEnv(t, "test-collect-metric") defer cleanup() - registerMetrics() s := &Server{ cfg: cfg, metrics: newServerMasterMetric(),