From 480365c6cb7c86da7670cd5fa49e7f9890dca665 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 10 Jun 2021 17:32:15 +0300 Subject: [PATCH] [Release-v0.22.0][Backport] Make Channel metrics generic enough for eventing-kafka (#1311) * make eventing metrics generic * Expose dispatch result to be used by other implementations (#5481) * expose dispatch result to be used by other implementations * expose fields * add getters etc * less verbose --- pkg/channel/fanout/fanout_message_handler.go | 32 +++++++++++++++----- pkg/channel/stats_reporter.go | 12 ++++---- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/pkg/channel/fanout/fanout_message_handler.go b/pkg/channel/fanout/fanout_message_handler.go index b59b99a7f9b..fcb1409837f 100644 --- a/pkg/channel/fanout/fanout_message_handler.go +++ b/pkg/channel/fanout/fanout_message_handler.go @@ -190,7 +190,7 @@ func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context ctx = trace.NewContext(context.Background(), s) // Any returned error is already logged in f.dispatch(). dispatchResultForFanout := f.dispatch(ctx, subs, m, h) - _ = parseFanoutResultAndReportMetrics(dispatchResultForFanout, *r, *args) + _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) }(bufferedMessage, additionalHeaders, parentSpan, &f.reporter, &reportArgs) return nil } @@ -217,7 +217,7 @@ func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context reportArgs.EventType = string(te) reportArgs.Ns = ref.Namespace dispatchResultForFanout := f.dispatch(ctx, subs, bufferedMessage, additionalHeaders) - return parseFanoutResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) + return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) } } @@ -225,7 +225,8 @@ func (f *FanoutMessageHandler) ServeHTTP(response nethttp.ResponseWriter, reques f.receiver.ServeHTTP(response, request) } -func parseFanoutResultAndReportMetrics(result dispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error { +// ParseDispatchResultAndReportMetric processes the dispatch result and records the related channel metrics with the appropriate context +func ParseDispatchResultAndReportMetrics(result DispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error { if result.info != nil && result.info.Time > channel.NoDuration { if result.info.ResponseCode > channel.NoResponse { _ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Time) @@ -244,20 +245,20 @@ func parseFanoutResultAndReportMetrics(result dispatchResult, reporter channel.S // dispatch takes the event, fans it out to each subscription in subs. If all the fanned out // events return successfully, then return nil. Else, return an error. -func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult { +func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) DispatchResult { // Bind the lifecycle of the buffered message to the number of subs bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, len(subs)) - errorCh := make(chan dispatchResult, len(subs)) + errorCh := make(chan DispatchResult, len(subs)) for _, sub := range subs { go func(s Subscription) { dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s) - errorCh <- dispatchResult{err: err, info: dispatchedResultPerSub} + errorCh <- DispatchResult{err: err, info: dispatchedResultPerSub} }(sub) } var totalDispatchTimeForFanout time.Duration = channel.NoDuration - dispatchResultForFanout := dispatchResult{ + dispatchResultForFanout := DispatchResult{ info: &channel.DispatchExecutionInfo{ Time: channel.NoDuration, ResponseCode: channel.NoResponse, @@ -306,7 +307,22 @@ func (f *FanoutMessageHandler) makeFanoutRequest(ctx context.Context, message bi ) } -type dispatchResult struct { +type DispatchResult struct { err error info *channel.DispatchExecutionInfo } + +func (d DispatchResult) Error() error { + return d.err +} + +func (d DispatchResult) Info() *channel.DispatchExecutionInfo { + return d.info +} + +func NewDispatchResult(err error, info *channel.DispatchExecutionInfo) DispatchResult { + return DispatchResult{ + err: err, + info: info, + } +} diff --git a/pkg/channel/stats_reporter.go b/pkg/channel/stats_reporter.go index ae923210844..d5e6399522e 100644 --- a/pkg/channel/stats_reporter.go +++ b/pkg/channel/stats_reporter.go @@ -30,19 +30,19 @@ import ( ) var ( - // eventCountM is a counter which records the number of events received - // by the in-memory Channel. + // eventCountM is a counter which records the number of events dispatched + // by the channel. eventCountM = stats.Int64( "event_count", - "Number of events dispatched by the in-memory channel", + "Number of events dispatched by the channel", stats.UnitDimensionless, ) - // dispatchTimeInMsecM records the Time spent dispatching an event to - // a Channel, in milliseconds. + // dispatchTimeInMsecM records the time spent by the channel dispatching an event to + // to subscribers, in milliseconds. dispatchTimeInMsecM = stats.Float64( "event_dispatch_latencies", - "The Time spent dispatching an event from a in-memoryChannel", + "The time spent by the channel dispatching an event", stats.UnitMilliseconds, )