Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend message struct to include a raw interface #205

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

danielcanessa
Copy link

@danielcanessa danielcanessa commented Nov 5, 2020

Add a new field to the message struct in order to pass user data such as a struct of pointers.
Despite the data can be stored on the message payload it has two disadvantages:

  1. It forces the user to marshall/unmarshall the data each time that the message is passed in the pipeline.
  2. Data such as signaling channels or pointers can not be cast to a []byte.

The user data approach is valid for routers running inside the same process.

Signed-off-by: Daniel Canessa daniel.canessa@hpe.com

Add a new field to the message struct in order to pass
user data such as a struct of pointers.
Despite of the data can be stored on the message payload
it have two dissavantages:
1) It forces the user to marshall/unmarshall the data each time
that the message is passed in the pipeline
2) Data such as signaling channels or pointers can not  be casted to
a []byte.

Signed-off-by: Daniel Canessa <daniel.canessa@hpe.com>
@m110
Copy link
Member

m110 commented Nov 5, 2020

Hey @danielcanessa, thanks for the proposal.

The disadvantages that you list are actually there by design, as Pub/Subs are distributed by nature. I guess you would like to use this with go-channel implementation, which is the only Pub/Sub that would support it. However, go-channel is an exception here and I think we shouldn't change the public API just to take advantage of its features. Note that the UserData field would be useless for any other Pub/Sub implementation (which are actually used more often on production).

Can you share how do you use watermill and what's your use case for this?

@danielcanessa
Copy link
Author

danielcanessa commented Nov 5, 2020

thanks for the quick response @m110

  • Basically, I have a single router instance with around 5 handlers which compose the pipeline.
  • Also, I have an RPC2 request listener running in a different goroutine.

Each time that a request is queued I create a message and published it to the pipeline. I'm passing a structure with pointers in the message in order to modify the data in the different stages of the router. Besides, a signaling channel is included in the struct because I need to block the request to the RPC2 listener until the pipeline modifies all the necessary data.

Right now I have two issues with the current message implementation:

  1. Cast my struct to a slice of bytes and marshalling/unmarshalling in each stage of the pipeline adds an overhead.
  2. I can't marshall the types *interface{} and signaling channels to include them in the payload field.

Can you please suggest an alternative given the current message structure, please?

@m110
Copy link
Member

m110 commented Nov 11, 2020

Hey @danielcanessa.

So I have another proposal that should allow you to achieve the same thing without modifying the Message. We could change the GoChannel implementation so that it passes the values from message.Context() when copying it to the subscriber: https://github.com/ThreeDotsLabs/watermill/blob/master/pubsub/gochannel/pubsub.go#L340

Preferably, this would be enabled by a config boolean flag, as this is changing the current behavior.

Then, you would be able to send it like this:

someStruct := myStruct{/* ... */}
msg := message.NewMessage(watermill.NewUUID(), nil)
ctx := context.WithValue(msg.Context(), "my_key", someStruct)
msg.SetContext(ctx)

publisher.Publish("topic", msg)

// Then in subscriber:

someStruct := msg.Context().Value("my_key").(myStruct)

It doesn't look that good, but the UserData would need similar casting anyway. I guess you could define your own message constructors, like NewMessage(myStruct) message.Message and ParseMessage(message.Message) myStruct that would handle this.

Would that solve your use case?

@danielcanessa
Copy link
Author

Hi @m110 !
I took a look at your suggestion and it could work since what I need is pass my own data structure.

I'm wondering if changing the message context in pubsub will cause issues in other parts of watermill.
Also, I took a look at the code and I'm not sure where should I include the boolean config flag.

Could you please give me more details about the changes required on watermill side?

@m110
Copy link
Member

m110 commented Nov 12, 2020

Hey @danielcanessa.

The implementation should be straightforward - in the line I've mentioned, in the gochannel Pub/Sub, we could copy the message's Context(), wrap it with cancel and pass to the message, instead of the subscriber's context.

This shouldn't have any impact on other parts of watermill. The config I mentioned is this struct: https://github.com/ThreeDotsLabs/watermill/blob/master/pubsub/gochannel/pubsub.go#L15
We could add a boolean field there, that needs to be switched to true to enable the new behavior.

I'm happy to review a PR, or I could look into this during this weekend. :)

@danielcanessa
Copy link
Author

hi @m110!
I tried using your suggestion, but unfortunately, the context of the message on the function sendMessageToSubscriber is nil. I took a look at the pubsub code and the message is getting copied before call sendMessage which calls sendMessageToSubscriber, so I'm thinking that I'm losing the message context on the publish function.
I can check if that is the reason for the nil context on sendMessageToSubscriber, but I'm still wondering if this approach could hide future issues.
If the copy of the message causes the issue I think that the easiest solution is to copy the context on the copy message function, however, I wouldn't have access to the config struct from pubsub in order to use the bool flag and do not change the current behavior, I think that it is pretty similar to add the void struct to the message.
Please, let me know your thoughts :).

@roblaszczak
Copy link
Member

Hello @danielcanessa!

I took a look at the pubsub code and the message is getting copied before call sendMessage which calls sendMessageToSubscriber, so I'm thinking that I'm losing the message context on the publish function.

Maybe the solution here may be adding a configuration that will explicitly copy the context in this Pub/Sub (with false as default, to not break default Pub/Sub behavior)?

@smoya
Copy link

smoya commented Sep 28, 2021

Any plan to retake this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants