-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[extension/opamp]: Custom Message Support #32281
[extension/opamp]: Custom Message Support #32281
Conversation
523146c
to
780e1c1
Compare
|
6d98bd7
to
adbb28d
Compare
This looks good to me at a high level but I found the naming of type CustomCapabilityRegistry interface {
Register(capability string, listener CustomCapabilityListener) (CustomCapabilitySender, error)
}
type CustomCapabilityListener interface {
ReceiveMessage(*protobufs.CustomMessage)
Done() <-chan struct{}
}
type CustomCapabilitySender interface {
SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error)
} |
I do like how that partitions the responsibilities and makes it a little clearer, I'll take a stab at refactoring to fit that interface and see what it's like. |
f173210
to
ff81b43
Compare
@djaglowski I've refactored to fit that interface, mind taking another look? |
Based on an offline conversation I think something like the previous approach was cleaner, though I think we need a better name for the struct which is returned by registering. |
Alright, I think this is good to look at now. After discussing with @djaglowski we decided on having |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @djaglowski for giving this a review. Just a few small things so far, I want to discuss the external API before going any further, now that I'm really looking closely at this.
Regarding the custom message API, it seems to me we have two actors here:
Could we design API around this and encapsulate the functionality for each actor in separate interfaces? Here's how I would expect the API could look, and how a component may use it: type CustomCapabilityRegistry interface {
Register(capability string) (cm CustomMessageHandler err error)
}
// Open to suggestions for other names, but this is essentially a
// contact point between the component and the OpAMP connection.
type CustomMessageHandler interface {
Message() <-chan *protobufs.CustomMessage
SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error)
Unregister()
}
type myComponent struct{
cmh CustomMessageHandler
shutdown chan struct{}
}
func (m *myComponent) Start(host component.Host) {
// Get registry, cast it, etc.
registry := host.GetExtensions()["myOpAMPExtension"].(CustomCapabilityRegistry)
m.cmh, _ = registry.Register("myComponent")
}
func (m *myComponent) handleAsyncOps() {
switch {
case msg := <-m.cmh.Message():
case <-m.shutdown:
break
}
}
func (m *myComponent) doOperation() {
m.cmh.SendMessage("operation", []byte{})
}
func (m *myComponent) Shutdown() {
m.cmh.Unregister()
close(m.shutdown)
} I see a few advantages here:
|
Thanks for this feedback! I do like the extensibility here. I think this also solves this awkwardness where you want to receive a message, then respond to it. Before it would be something like: var sender opampextension.CustomMessageSender
sender, _ = registry.Register("cap", func(cm *protobufs.CustomMessage){
// Process message
sender.SendMessage("type", []byte("some response message"))
}) It's honestly not perfectly clear to me whether the above avoids a data race or not. With this interface it's a little more well ordered and less awkward, IMO: handler, _ = registry.Register("cap")
for {
cm <- handler.Message()
// Process message
handler.SendMessage("type", []byte("some response message")
} The only downside I see is what happens when a component misbehaves and ends up not listening to the message channel?
It's probably good to note that this would only be in the case where the component is misbehaving. Also, the implementation as it is now can spawn goroutines that takes up memory forever if a component ends up blocking them, so maybe unbounded memory isn't that big of a deal in these cases. Also, now that I'm thinking of it, in-order message processing isn't even guaranteed in the current design, because the spun-off go routines might not execute in the order they were spawned. I'm thinking either 2 or 4 are going to be our best options on how to handle it, and I think initially implementing 2 would be fine, then upgrading to 4 when there is a need for the messages to be well-ordered. I'll try and rework this PR to use this suggested interface, because I do think it has the potential to fix a few issues and leave us open to adding things if needed later without any breaking changes. |
Thanks for the detailed analysis.
I think that 2 and 4 are probably more powerful solutions if we could provide hard limits somewhere, but for now I would advocate for 3. We could probably offer this alongside some user-facing configuration to tune the size of the buffer. If we need to allocate limited resources somewhere, I feel that the Collector's pipelines are probably generally a more important place to put resources than custom messages, with configuration options to allow users to change this if they have different priorities. |
@evan-bradley I've refactored this to fit the interface you've suggested now. I think all your feedback from the previous iteration should be resolved now, and this is ready for another pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me.
I've incorporated your feedback @evan-bradley - One additional change I made is that I unexported the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Nice work!
This reverts commit af136737fcc6ce6082ebc91ebcbf4c909578a096.
This reverts commit ff81b43c07ea2c5b33e440e85cddc1dc41aac65b.
e1674bd
to
5166988
Compare
Description:
Link to tracking Issue: #32021
Testing:
Documentation:
Added some docs for usage to the opamp extension README