Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metric[engine]:Replace framework metric #5526

Merged
merged 10 commits into from
May 24, 2022
4 changes: 2 additions & 2 deletions engine/executor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/pingcap/tiflow/dm/dm/common"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/pingcap/tiflow/engine/pkg/promutil"
)

func httpHandler(lis net.Listener) error {
Expand All @@ -31,7 +31,7 @@ func httpHandler(lis net.Listener) error {
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promhttp.Handler())
mux.Handle("/metrics", promutil.HTTPHandlerForMetric())

httpS := &http.Server{
Handler: mux,
Expand Down
27 changes: 0 additions & 27 deletions engine/executor/metrics.go

This file was deleted.

23 changes: 11 additions & 12 deletions engine/executor/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
package executor

import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)

var executorTaskNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "executor",
Name: "task_num",
Help: "number of task in this executor",
}, []string{"status"})

// initServerMetrics registers statistics of executor server
func initServerMetrics(registry *prometheus.Registry) {
registry.MustRegister(executorTaskNumGauge)
}
var (
executorFactory = promutil.NewFactory4Framework()
executorTaskNumGauge = executorFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "executor",
Name: "task_num",
Help: "number of task in this executor",
}, []string{"status"})
)
2 changes: 0 additions & 2 deletions engine/executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,6 @@ func (s *Server) Run(ctx context.Context) error {
return s.startForTest(ctx)
}

registerMetrics()

wg, ctx := errgroup.WithContext(ctx)
s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency)
s.taskCommitter = worker.NewTaskCommitter(s.taskRunner, defaultTaskPreDispatchRequestTTL)
Expand Down
2 changes: 0 additions & 2 deletions engine/executor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestStartTCPSrv(t *testing.T) {
s := NewServer(cfg, nil)

s.grpcSrv = grpc.NewServer()
registerMetrics()
wg, ctx := errgroup.WithContext(context.Background())
err = s.startTCPService(ctx, wg)
require.Nil(t, err)
Expand Down Expand Up @@ -114,7 +113,6 @@ func TestCollectMetric(t *testing.T) {
s.taskRunner = worker.NewTaskRunner(defaultRuntimeIncomingQueueLen, defaultRuntimeInitConcurrency)

s.grpcSrv = grpc.NewServer()
registerMetrics()
err = s.startTCPService(ctx, wg)
require.Nil(t, err)

Expand Down
8 changes: 5 additions & 3 deletions engine/lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"

"github.com/pingcap/tiflow/engine/executor/worker"
libModel "github.com/pingcap/tiflow/engine/lib/model"
Expand Down Expand Up @@ -180,13 +181,14 @@ func (d *DefaultBaseJobMaster) GetWorkers() map[libModel.WorkerID]WorkerHandle {

// Close implements BaseJobMaster.Close
func (d *DefaultBaseJobMaster) Close(ctx context.Context) error {
if err := d.impl.CloseImpl(ctx); err != nil {
return errors.Trace(err)
err := d.impl.CloseImpl(ctx)
if err != nil {
log.L().Error("Failed to close JobMasterImpl", zap.Error(err))
}

d.master.doClose()
d.worker.doClose()
return nil
return errors.Trace(err)
maxshuang marked this conversation as resolved.
Show resolved Hide resolved
}

// OnError implements BaseJobMaster.OnError
Expand Down
8 changes: 5 additions & 3 deletions engine/lib/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,16 +435,18 @@ func (m *DefaultBaseMaster) doClose() {
log.L().Warn("Failed to clean up message handlers",
zap.String("master-id", m.id))
}
promutil.UnregisterWorkerMetrics(m.id)
}

// Close implements BaseMaster.Close
func (m *DefaultBaseMaster) Close(ctx context.Context) error {
if err := m.Impl.CloseImpl(ctx); err != nil {
return errors.Trace(err)
err := m.Impl.CloseImpl(ctx)
if err != nil {
log.L().Error("Failed to close MasterImpl", zap.Error(err))
}

m.doClose()
return nil
return errors.Trace(err)
maxshuang marked this conversation as resolved.
Show resolved Hide resolved
}

// OnError implements BaseMaster.OnError
Expand Down
7 changes: 4 additions & 3 deletions engine/lib/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,18 @@ func (w *DefaultBaseWorker) doClose() {
}

w.wg.Wait()
promutil.UnregisterWorkerMetrics(w.id)
}

// Close implements BaseWorker.Close
func (w *DefaultBaseWorker) Close(ctx context.Context) error {
if err := w.Impl.CloseImpl(ctx); err != nil {
err := w.Impl.CloseImpl(ctx)
if err != nil {
log.L().Error("Failed to close WorkerImpl", zap.Error(err))
return errors.Trace(err)
}

w.doClose()
return nil
return errors.Trace(err)
maxshuang marked this conversation as resolved.
Show resolved Hide resolved
}

// ID implements BaseWorker.ID
Expand Down
9 changes: 9 additions & 0 deletions engine/pkg/promutil/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,25 @@ func (f *wrappingFactory) NewHistogramVec(opts prometheus.HistogramOpts, labelNa
}

func wrapCounterOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.CounterOpts) *prometheus.CounterOpts {
if opts.ConstLabels == nil && constLabels != nil {
opts.ConstLabels = make(prometheus.Labels)
}
wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels)
return opts
}

func wrapGaugeOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.GaugeOpts) *prometheus.GaugeOpts {
if opts.ConstLabels == nil && constLabels != nil {
opts.ConstLabels = make(prometheus.Labels)
}
wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels)
return opts
}

func wrapHistogramOpts(prefix string, constLabels prometheus.Labels, opts *prometheus.HistogramOpts) *prometheus.HistogramOpts {
if opts.ConstLabels == nil && constLabels != nil {
opts.ConstLabels = make(prometheus.Labels)
}
wrapOptsCommon(prefix, constLabels, &opts.Namespace, opts.ConstLabels)
return opts
}
Expand Down
11 changes: 11 additions & 0 deletions engine/pkg/promutil/implement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ func TestWrapCounterOpts(t *testing.T) {
},
},
},
{
constLabels: prometheus.Labels{
"k2": "v2",
},
inputOpts: &prometheus.CounterOpts{},
outputOpts: &prometheus.CounterOpts{
ConstLabels: prometheus.Labels{
"k2": "v2",
},
},
},
}

for _, c := range cases {
Expand Down
7 changes: 7 additions & 0 deletions engine/pkg/promutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,10 @@ func NewFactory4Worker(info tenant.ProjectInfo, jobType libModel.JobType, jobID
func NewFactory4Framework() Factory {
return NewFactory4FrameworkImpl(globalMetricRegistry)
}

// UnregisterWorkerMetrics unregisters all metric of workerID
maxshuang marked this conversation as resolved.
Show resolved Hide resolved
// IF 'worker' is a job master, use job id as workerID
// IF 'worker' is a worker, use worke id as workerID
maxshuang marked this conversation as resolved.
Show resolved Hide resolved
func UnregisterWorkerMetrics(workerID libModel.WorkerID) {
globalMetricRegistry.Unregister(workerID)
}
27 changes: 0 additions & 27 deletions engine/servermaster/metrics.go

This file was deleted.

12 changes: 4 additions & 8 deletions engine/servermaster/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,24 @@
package servermaster

import (
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
)

var (
serverExecutorNumGauge = prometheus.NewGaugeVec(
serverFactory = promutil.NewFactory4Framework()
serverExecutorNumGauge = serverFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "server_master",
Name: "executor_num",
Help: "number of executor servers in this cluster",
}, []string{"status"})
serverJobNumGauge = prometheus.NewGaugeVec(
serverJobNumGauge = serverFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dataflow",
Subsystem: "server_master",
Name: "job_num",
Help: "number of jobs in this cluster",
}, []string{"status"})
)

// initServerMetrics registers statistics of server
func initServerMetrics(registry *prometheus.Registry) {
registry.MustRegister(serverExecutorNumGauge)
registry.MustRegister(serverJobNumGauge)
}
6 changes: 2 additions & 4 deletions engine/servermaster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/dm/pkg/log"
p2pProtocol "github.com/pingcap/tiflow/proto/p2p"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/embed"
Expand Down Expand Up @@ -56,6 +55,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/meta/metaclient"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/engine/pkg/p2p"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/pingcap/tiflow/engine/pkg/rpcutil"
"github.com/pingcap/tiflow/engine/pkg/serverutils"
"github.com/pingcap/tiflow/engine/pkg/tenant"
Expand Down Expand Up @@ -447,8 +447,6 @@ func (s *Server) Run(ctx context.Context) (err error) {
return s.startForTest(ctx)
}

registerMetrics()

err = s.registerMetaStore()
if err != nil {
return err
Expand Down Expand Up @@ -570,7 +568,7 @@ func (s *Server) startGrpcSrv(ctx context.Context) (err error) {

httpHandlers := map[string]http.Handler{
"/debug/": getDebugHandler(),
"/metrics": promhttp.Handler(),
"/metrics": promutil.HTTPHandlerForMetric(),
}

// generate grpcServer
Expand Down
2 changes: 0 additions & 2 deletions engine/servermaster/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func TestStartGrpcSrv(t *testing.T) {
defer cleanup()

s := &Server{cfg: cfg}
registerMetrics()
ctx := context.Background()
err := s.startGrpcSrv(ctx)
require.Nil(t, err)
Expand Down Expand Up @@ -313,7 +312,6 @@ func TestCollectMetric(t *testing.T) {
masterAddr, cfg, cleanup := prepareServerEnv(t, "test-collect-metric")
defer cleanup()

registerMetrics()
s := &Server{
cfg: cfg,
metrics: newServerMasterMetric(),
Expand Down