diff --git a/pkg/util/asyncworker/async_workers.go b/pkg/util/asyncworker/async_workers.go index c6ae2d8bd..9ade5c68f 100644 --- a/pkg/util/asyncworker/async_workers.go +++ b/pkg/util/asyncworker/async_workers.go @@ -116,6 +116,17 @@ func (aws *AsyncWorkers) handleWork(ctx context.Context, workName string, work * defer func() { if r := recover(); r != nil { handleErr = fmt.Errorf("recover from %v", r) + + metricErr := EmitCustomizedAsyncedMetrics(ctx, + metricNameAsyncWorkPanic, 1, + metrics.ConvertMapToTags(map[string]string{ + "workName": workName, + })...) + + if metricErr != nil { + general.Errorf("emit metric(%s:%d) failed with err: %v", + metricNameAsyncWorkDurationMs, 1, metricErr) + } } aws.completeWork(workName, work, handleErr) @@ -136,7 +147,10 @@ func (aws *AsyncWorkers) handleWork(ctx context.Context, workName string, work * paramValues = append(paramValues, reflect.ValueOf(param)) } + startTime := time.Now() funcRets := funcValue.Call(paramValues) + workDurationMs := time.Since(startTime).Milliseconds() + if len(funcRets) != 1 { handleErr = fmt.Errorf("work Fn returns invalid number: %d of return values", len(funcRets)) } else if funcRets[0].Interface() != nil { @@ -147,6 +161,17 @@ func (aws *AsyncWorkers) handleWork(ctx context.Context, workName string, work * handleErr = fmt.Errorf("work Fn returns return value: %v of invalid type", funcRets[0].Interface()) } } + + metricErr := EmitCustomizedAsyncedMetrics(ctx, + metricNameAsyncWorkDurationMs, workDurationMs, + metrics.ConvertMapToTags(map[string]string{ + "workName": workName, + })...) + + if metricErr != nil { + general.Errorf("emit metric(%s:%d) failed with err: %v", + metricNameAsyncWorkDurationMs, workDurationMs, metricErr) + } } func (aws *AsyncWorkers) completeWork(workName string, completedWork *Work, workErr error) { diff --git a/pkg/util/asyncworker/helpers.go b/pkg/util/asyncworker/helpers.go index 553dc153d..253cc72f7 100644 --- a/pkg/util/asyncworker/helpers.go +++ b/pkg/util/asyncworker/helpers.go @@ -80,15 +80,34 @@ func parseContextMetricName(ctx context.Context) (string, bool) { func EmitAsyncedMetrics(ctx context.Context, tags ...metrics.MetricTag) error { emitter, ok := parseContextMetricEmitter(ctx) if !ok { - general.InfofV(4, "failed to get metrics-emitter") + general.Errorf("failed to get metrics-emitter") return nil } name, ok := parseContextMetricName(ctx) if !ok { - general.InfofV(4, "failed to get metrics-name") + general.Errorf("failed to get metrics-name") return nil } return emitter.StoreInt64(name, 1, metrics.MetricTypeNameRaw, tags...) } + +// EmitAsyncedMetrics emit metrics through metricEmitter parsed from context and provided metricName & metricValue & tags +func EmitCustomizedAsyncedMetrics(ctx context.Context, metricName string, metricValue int64, tags ...metrics.MetricTag) error { + emitter, ok := parseContextMetricEmitter(ctx) + if !ok { + general.Errorf("failed to get metrics-emitter") + return nil + } + + briefWorkName, ok := parseContextMetricName(ctx) + if ok { + tags = append(tags, metrics.MetricTag{ + Key: "briefWorkName", + Val: briefWorkName, + }) + } + + return emitter.StoreInt64(metricName, metricValue, metrics.MetricTypeNameRaw, tags...) +} diff --git a/pkg/util/asyncworker/types.go b/pkg/util/asyncworker/types.go index 8555acb13..153f4fdbc 100644 --- a/pkg/util/asyncworker/types.go +++ b/pkg/util/asyncworker/types.go @@ -35,6 +35,10 @@ type contextKey string const ( contextKeyMetricEmitter contextKey = "metric_emitter" contextKeyMetricName contextKey = "metric_name" + + // metrics for asyncworker + metricNameAsyncWorkDurationMs = "async_work_duration_ms" + metricNameAsyncWorkPanic = "async_work_panic" ) // workStatus tracks worker is working or not