Skip to content

Commit

Permalink
refactor: rewrite RPC metrics into the event pattern (#2392)
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark authored Aug 30, 2023
1 parent bdf5b70 commit 0a02805
Show file tree
Hide file tree
Showing 19 changed files with 681 additions and 1,058 deletions.
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ const (

// metrics key
const (
MetricsRpc = "dubbo.metrics.rpc"
MetricsRegistry = "dubbo.metrics.registry"
MetricsMetadata = "dubbo.metrics.metadata"
MetricApp = "dubbo.metrics.app"
Expand Down
7 changes: 2 additions & 5 deletions common/extension/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package extension

import (
"context"
"testing"
"time"
)

import (
Expand All @@ -29,7 +27,6 @@ import (

import (
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func TestGetMetricReporter(t *testing.T) {
Expand All @@ -45,8 +42,8 @@ func TestGetMetricReporter(t *testing.T) {
type mockReporter struct{}

// implement the interface of Reporter
func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
func (m *mockReporter) StartServer(config *metrics.ReporterConfig) {
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
func (m *mockReporter) ShutdownServer() {
}
42 changes: 14 additions & 28 deletions filter/metrics/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

// Package metrics provides metrics collection filter.
package metrics

import (
Expand All @@ -28,56 +27,43 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/metrics/rpc"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

// must initialize before using the filter and after loading configuration
var metricFilterInstance *Filter
var metricFilterInstance *metricsFilter

func init() {
extension.SetFilter(constant.MetricsFilterKey, newFilter)
}

// Filter will calculate the invocation's duration and the report to the reporters
// more info please take a look at dubbo-samples projects
type Filter struct {
reporters []metrics.Reporter
}
// metricsFilter will report RPC metrics to the metrics bus and implements the filter.Filter interface
type metricsFilter struct{}

// Invoke collect the duration of invocation and then report the duration by using goroutine
func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
go func() {
for _, reporter := range p.reporters {
reporter.ReportBeforeInvocation(ctx, invoker, invocation)
}
}()
// Invoke publish the BeforeInvokeEvent and AfterInvokeEvent to metrics bus
func (mf *metricsFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
metrics.Publish(rpc.NewBeforeInvokeEvent(invoker, invocation))
start := time.Now()
res := invoker.Invoke(ctx, invocation)
end := time.Now()
duration := end.Sub(start)
go func() {
for _, reporter := range p.reporters {
reporter.ReportAfterInvocation(ctx, invoker, invocation, duration, res)
}
}()
metrics.Publish(rpc.NewAfterInvokeEvent(invoker, invocation, duration, res))
return res
}

// OnResponse do nothing and return the result
func (p *Filter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
func (mf *metricsFilter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return res
}

// newFilter the Filter is singleton.
// it's lazy initialization
// make sure that the configuration had been loaded before invoking this method.
// newFilter creates a new metricsFilter instance.
//
// It's lazy initialization,
// and make sure that the configuration had been loaded before invoking this method.
func newFilter() filter.Filter {
if metricFilterInstance == nil {
reporters := make([]metrics.Reporter, 0, 1)
reporters = append(reporters, extension.GetMetricReporter("prometheus", metrics.NewReporterConfig()))
metricFilterInstance = &Filter{
reporters: reporters,
}
metricFilterInstance = &metricsFilter{}
}
return metricFilterInstance
}
43 changes: 9 additions & 34 deletions filter/metrics/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,25 @@ package metrics

import (
"context"
"sync"
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)

func TestMetricsFilterInvoke(t *testing.T) {
// prepare the mock reporter
mk := &mockReporter{}
extension.SetMetricReporter("mock", func(config *metrics.ReporterConfig) metrics.Reporter {
return mk
})

instance := newFilter()
mockChan := make(chan metrics.MetricsEvent, 10)
defer close(mockChan)
metrics.Subscribe(constant.MetricsRpc, mockChan)

url, _ := common.NewURL(
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" +
Expand All @@ -57,31 +49,14 @@ func TestMetricsFilterInvoke(t *testing.T) {

attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)

ctx := context.Background()

mk.On("Report", ctx, invoker, inv).Return(true, nil)

mk.wg.Add(1)
result := instance.Invoke(ctx, invoker, inv)
filter := newFilter()
result := filter.Invoke(ctx, invoker, inv)
assert.NotNil(t, result)
mk.AssertNotCalled(t, "Report", 1)
// it will do nothing
result = instance.OnResponse(ctx, nil, invoker, inv)
result = filter.OnResponse(ctx, nil, invoker, inv)
assert.Nil(t, result)
}

type mockReporter struct {
mock.Mock
wg sync.WaitGroup
}

func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}

func (m *mockReporter) ReportBeforeInvocation(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
assert.Equal(t, 2, len(mockChan))
assert.Equal(t, constant.MetricsRpc, (<-mockChan).Type())
}
Loading

0 comments on commit 0a02805

Please sign in to comment.