-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Backport][Release-v0.22.0] Support eventing metrics #234
[Backport][Release-v0.22.0] Support eventing metrics #234
Conversation
* support eventing metrics * lint * imports * update with latest deps
@matzew could you approve this one too? |
_, err := c.dispatcher.DispatchMessageWithRetries( | ||
te := kncloudevents.TypeExtractorTransformer("") | ||
|
||
bufferedMessage, err := buffering.CopyMessage(ctx, message, &te) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you doing this? You also don't free that memory invoking Message.Finish
, so this is going to leak memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just followed what we did in the past https://github.com/knative/eventing/blob/main/pkg/channel/fanout/fanout_message_handler.go#L177
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My assumption is bufferedMessage is managed internally.
DispatchWithRetries has:
defer func() {
for _, msg := range messagesToFinish {
_ = msg.Finish(nil)
}
}()
We discussed this in the past and here is just a copy of that idea. Transform the msg to get the type. If there is a better way (with no mem overhead) I will adapt (also upstream).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, please check the godocs for that method, it explains how it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slinkydeveloper Why not? The buffered msg is finished afaik. Have a pointer, what method?
type Message interface {
MessageReader
// Finish *must* be called when message from a Receiver can be forgotten by
// the receiver. A QoS 1 sender should not call Finish() until it gets an acknowledgment of
// receipt on the underlying transport. For QoS 2 see ExactlyOnceMessage.
//
// Note that, depending on the Message implementation, forgetting to Finish the message
// could produce memory/resources leaks!
//
// Passing a non-nil err indicates sending or processing failed.
// A non-nil return indicates that the message was not accepted
// by the receivers peer.
Finish(error) error
}
For the CopyMessage it says:
// CopyMessage reads m once and creates an in-memory copy depending on the encoding of m.
// The returned copy is not dependent on any transport and can be visited many times.
// When the copy can be forgot, the copied message must be finished with Finish() message to release the memory.
// transformers can be nil and this function guarantees that they are invoked only once during the encoding process.
We discussed this in the past and this is the API you mentioned to use. What else should I do here? Have an example? All msgs are finished as DispatchWithRetries
does that for the buffered one, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I think you're right, this seems correct, sorry for blocking this. Although I would love to explore if there's any way we can avoid this buffering...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slinkydeveloper np, yeah copying is not something I want to do either, especially on this path. One solution could be to leak the eventype in the kafka msg header (a bit of a hack). The other option would be to get back some useful info when we write the http request since there is the last time we touch the msg.
executeRequest
has a transformers parameter we never use. That method is called by DispatchWithRetries. When we write the request toEvent is called anyway and that method applies the transformers. I think there it is more efficient to pass the transformer.
In detail, the call here will eventually call http.WriteRequest which calls binding.Write which then calls this Write. Finally ToEvent is called and msg is transformed.
I think it should work if I either expand DispatchWithRetries or add a new method so I dont break dependent projects. WDYTH?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah maybe you can just pass that transformer to executeRequest then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try it and see if it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slinkydeveloper It does work as expected so I will do the update upstream and also here. :)
I updated this PR.
/hold |
@slinkydeveloper One thing though that may not work is that if reply is nil and destination is not defined then error is returned and there is no request written (other cases do exist). I dont think that all cases are covered if we rely on the request written but maybe good enough middle-ground solution, I am still thinking about it. @matzew wdyth? I dont see many options here, either we keep the copy approach or have incomplete data for some errors. Even adding the field in kafka headers also requires a copy. |
@slinkydeveloper gentle ping. |
/unhold |
/assign @slinkydeveloper |
@slinkydeveloper should we get this in (gentle ping)? |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: skonto, slinkydeveloper The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
* patch vendor * Support eventing metrics (knative-extensions#688) * support eventing metrics * lint * imports * update with latest deps * use transformers in dispatchWithRetries instead of copying * fix use of transformers * updates * pass transformers to executeRequest
Backport: knative-extensions#688
To be completely aligned opened: openshift/knative-eventing#1311
My goal is to add visualizations for channel metrics at the S-O side.
/cc @aliok @matzew
I am also planning to backport for 0.23. Didnt go upstream because afaik we depend on the release tag there.