Skip to content
This repository has been archived by the owner on Nov 28, 2022. It is now read-only.

Commit

Permalink
[Release-v0.22.0][Backport] Make Channel metrics generic enough for e…
Browse files Browse the repository at this point in the history
…venting-kafka (#1311)

* make eventing metrics generic

* Expose dispatch result to be used by other implementations (knative#5481)

* expose dispatch result to be used by other implementations

* expose fields

* add getters etc

* less verbose
  • Loading branch information
skonto authored Jun 10, 2021
1 parent 7043d7e commit 480365c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
32 changes: 24 additions & 8 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context
ctx = trace.NewContext(context.Background(), s)
// Any returned error is already logged in f.dispatch().
dispatchResultForFanout := f.dispatch(ctx, subs, m, h)
_ = parseFanoutResultAndReportMetrics(dispatchResultForFanout, *r, *args)
_ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args)
}(bufferedMessage, additionalHeaders, parentSpan, &f.reporter, &reportArgs)
return nil
}
Expand All @@ -217,15 +217,16 @@ func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context
reportArgs.EventType = string(te)
reportArgs.Ns = ref.Namespace
dispatchResultForFanout := f.dispatch(ctx, subs, bufferedMessage, additionalHeaders)
return parseFanoutResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs)
}
}

func (f *FanoutMessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) {
f.receiver.ServeHTTP(response, request)
}

func parseFanoutResultAndReportMetrics(result dispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error {
// ParseDispatchResultAndReportMetric processes the dispatch result and records the related channel metrics with the appropriate context
func ParseDispatchResultAndReportMetrics(result DispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error {
if result.info != nil && result.info.Time > channel.NoDuration {
if result.info.ResponseCode > channel.NoResponse {
_ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Time)
Expand All @@ -244,20 +245,20 @@ func parseFanoutResultAndReportMetrics(result dispatchResult, reporter channel.S

// dispatch takes the event, fans it out to each subscription in subs. If all the fanned out
// events return successfully, then return nil. Else, return an error.
func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) dispatchResult {
func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) DispatchResult {
// Bind the lifecycle of the buffered message to the number of subs
bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, len(subs))

errorCh := make(chan dispatchResult, len(subs))
errorCh := make(chan DispatchResult, len(subs))
for _, sub := range subs {
go func(s Subscription) {
dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
errorCh <- dispatchResult{err: err, info: dispatchedResultPerSub}
errorCh <- DispatchResult{err: err, info: dispatchedResultPerSub}
}(sub)
}

var totalDispatchTimeForFanout time.Duration = channel.NoDuration
dispatchResultForFanout := dispatchResult{
dispatchResultForFanout := DispatchResult{
info: &channel.DispatchExecutionInfo{
Time: channel.NoDuration,
ResponseCode: channel.NoResponse,
Expand Down Expand Up @@ -306,7 +307,22 @@ func (f *FanoutMessageHandler) makeFanoutRequest(ctx context.Context, message bi
)
}

type dispatchResult struct {
type DispatchResult struct {
err error
info *channel.DispatchExecutionInfo
}

func (d DispatchResult) Error() error {
return d.err
}

func (d DispatchResult) Info() *channel.DispatchExecutionInfo {
return d.info
}

func NewDispatchResult(err error, info *channel.DispatchExecutionInfo) DispatchResult {
return DispatchResult{
err: err,
info: info,
}
}
12 changes: 6 additions & 6 deletions pkg/channel/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ import (
)

var (
// eventCountM is a counter which records the number of events received
// by the in-memory Channel.
// eventCountM is a counter which records the number of events dispatched
// by the channel.
eventCountM = stats.Int64(
"event_count",
"Number of events dispatched by the in-memory channel",
"Number of events dispatched by the channel",
stats.UnitDimensionless,
)

// dispatchTimeInMsecM records the Time spent dispatching an event to
// a Channel, in milliseconds.
// dispatchTimeInMsecM records the time spent by the channel dispatching an event to
// to subscribers, in milliseconds.
dispatchTimeInMsecM = stats.Float64(
"event_dispatch_latencies",
"The Time spent dispatching an event from a in-memoryChannel",
"The time spent by the channel dispatching an event",
stats.UnitMilliseconds,
)

Expand Down

0 comments on commit 480365c

Please sign in to comment.