-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added new command message: Cancel #122
Conversation
767d994
to
7f843d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I'll give this a try to see how it performs too.
7f843d9
to
2bc0bd1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, Thanks for the PR. 👍 I have few, requests, comments and suggestions.
worker/echo/main.go
Outdated
default: | ||
delete(cancelChan, id) | ||
} | ||
chanMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again this looks like as something that will be duplicate in all workers. Shouldn't we create something that will help all developers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As previously mentioned, channels are specific to Golang, and this implementation can be different depending on each worker. How to handle the cancellation should be up to each worker.
2bc0bd1
to
56cb0ab
Compare
Summary of changes made from reviews:
|
d8cfbeb
to
357f77b
Compare
0f83e01
to
60350fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updates. I have only two small requests/comments.
internal/work/dispatcher.go
Outdated
@@ -328,3 +328,26 @@ func (d *Dispatcher) senderName(sender dbus.Sender) (string, error) { | |||
|
|||
return "", fmt.Errorf("cannot get name for sender: %v", sender) | |||
} | |||
|
|||
// CancelMessage implements the dispatch of a cancel method to the worker. | |||
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following comment seems little bit off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method signature feels odd to me. It strikes me as odd that we expect an
entire yggdrasil.Command
structure as the first parameter, but then only the
message ID from the encapsulating message as the second parameter. Perhaps the
only parameters that get passed to the dispatcher via this method are the
unpacked values from the command?
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | |
func (d *Dispatcher) CancelMessage(directive string, message_id string, cancel_id string) error { |
This forces all the unmarshalling and parameter validation into the client and
makes it very clear which message ID is to be cancelled.
Another option is to follow the pattern of the Dispatch
method above, and expect
the entire yggdrasil.Control
struct in the method:
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | |
func (d *Dispatcher) CancelMessage(control yggdrasil.Control) error { |
This has the downside of unmarshalling the Command struct out of the control
message a second time (the first having occurred in the client when processing
the command name). I'm less inclined to take this approach, because this
introduces the potential to scope-creep the method into a larger "control
message handler" method. It's important to preserve it's responsibility as much
as possible.
So that being said, I think we should push all the message unpacking up to the
client and change the signature of this method as follows:
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | |
func (d *Dispatcher) Cancel(directive string, message_id string, cancel_id string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid the CancelMessage to unmarshall again the control message. I'm OK to move this logic into the client. It seems a good idea.
internal/work/dispatcher.go
Outdated
// CancelMessage implements the dispatch of a cancel method to the worker. | ||
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | ||
// Send the message through the cancel interface | ||
directive := command.Arguments["directive"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we check the existence of directive
as we do it for messageID
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right 😄
60350fc
to
429142a
Compare
Sumary of changesAddressed all the change from @jirihnidek comments:
|
|
429142a
to
ff9f125
Compare
ff9f125
to
5c6f2c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this is a great approach. We're blazing a new trail here with the introduction of control message data being handled and passed to workers, so I left a few comments about how to maybe improve the API boundary between client and dispatcher. I think we could probably offer a more functional worker example too; considering how the implementation isn't obvious (using channels and sync maps immediately increases the complexity of the example significantly). Having a clear example to use as guidance and documentation is going to be important.
internal/work/dispatcher.go
Outdated
@@ -328,3 +328,26 @@ func (d *Dispatcher) senderName(sender dbus.Sender) (string, error) { | |||
|
|||
return "", fmt.Errorf("cannot get name for sender: %v", sender) | |||
} | |||
|
|||
// CancelMessage implements the dispatch of a cancel method to the worker. | |||
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method signature feels odd to me. It strikes me as odd that we expect an
entire yggdrasil.Command
structure as the first parameter, but then only the
message ID from the encapsulating message as the second parameter. Perhaps the
only parameters that get passed to the dispatcher via this method are the
unpacked values from the command?
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | |
func (d *Dispatcher) CancelMessage(directive string, message_id string, cancel_id string) error { |
This forces all the unmarshalling and parameter validation into the client and
makes it very clear which message ID is to be cancelled.
Another option is to follow the pattern of the Dispatch
method above, and expect
the entire yggdrasil.Control
struct in the method:
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | |
func (d *Dispatcher) CancelMessage(control yggdrasil.Control) error { |
This has the downside of unmarshalling the Command struct out of the control
message a second time (the first having occurred in the client when processing
the command name). I'm less inclined to take this approach, because this
introduces the potential to scope-creep the method into a larger "control
message handler" method. It's important to preserve it's responsibility as much
as possible.
So that being said, I think we should push all the message unpacking up to the
client and change the signature of this method as follows:
func (d *Dispatcher) CancelMessage(command yggdrasil.Command, id string) error { | |
func (d *Dispatcher) Cancel(directive string, message_id string, cancel_id string) error { |
@@ -12,16 +12,23 @@ import ( | |||
|
|||
"git.sr.ht/~spc/go-log" | |||
|
|||
"github.com/redhatinsights/yggdrasil/internal/sync" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is okay because the echo worker is part of the yggdrasil repository, but I
don't know if Go would permit this import from an external project. I think the
'internal' package has special value to the Go compiler. Might be worth testing.
We could easily solve this by moving the sync
package to the top of the
yggdrasil
project, but it seems weird for the yggdrasil
package to export a
sync
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this sync package might be useful for worker development, in case other workers want to use this utilty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think to move this pieces of code into the pkg directory?
yggdrasil
.
├── build-aux
├── builddir
├── cmd
├── data
├── dbus
├── dist
├── doc
├── internal
├── ipc
├── lib
├── pkg
└── worker
That way, we have such an standard way to do the same things. I'm thinking as well in the http
implementation of the client we have in yggdrasil
, it can be also be used in rhc
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could move sync
out of internal
and into the top level of the project. I would prefer to avoid creating a pkg
package. It is unnecessary and creates an extraneous entry in import paths:
With pkg
import "github.com/redhatinsights/yggdrasil/pkg/sync"
Without pkg
import "github.com/redhatinsights/yggdrasil/sync"
func cancelEcho(w *worker.Worker, addr string, id string, cancelID string) error { | ||
log.Infof("cancelling message with id %v", cancelID) | ||
if cancelChan, exists := sycMapCancelChan.Get(cancelID); exists { | ||
close(cancelChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we've closed the channel because a cancellation was received, the channel
should get deleted from the map too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel is deleted from the map when the close message is used, that is in the echo function. The new echo worker functionality may change this.
5c6f2c2
to
8dda469
Compare
545c8c5
to
0197614
Compare
Changelog
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updates 👍 It is better and I think that we are reaching end of PR review process. I have some small suggestions and some requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a few typos as well. I'm going to take a second pass over this to experiment with the functionality next.
1d46c38
to
ec0d919
Compare
@subpop @jirihnidek I have fixed the typos and include your suggestions 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found one last typo. But the functionality works great!
This commit adds: - A new command message that can be use to cancel work in progress that is done in the workers.Is send as a command to yggdrasil client. Yggdrasil will send this message to the workers Cancel method. - A new function to the worker, CancelRX. This function will handle the cancel signal in workers side. If it is null the worker doesn't support cancellation of messages. - echo worker implements this cancel function to handle cancellation when it runs in slow mode. Signed-off-by: Alba Hita Catala <ahitacat@redhat.com>
ec0d919
to
17d8850
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for all updates.
Proposal
Add a new control message of type command that can be used to cancel a work that is already in progress in a worker.
yggd
will receive this command and forward it to the appropriate worker. If this worker implements the cancellation feature it will stop or cancel its work. But it is up to the worker to implement this functionality.This PR also provides an example of how the cancellation message can be implemented in the
echo worker
. It uses a map of synchronized channels to manage the current work, and to send the cancellation message to thegoroutines
that are handle it.This commit adds:
A new command message that can be use to cancel work in progress that is done in the workers.Is send as a command to yggdrasil client. Yggdrasil will send this message to the workers Cancel method.
A new function to the worker, CancelRX. This function will handle the cancel signal in workers side. If it is null the worker doesn't support cancellation of messages.
echo worker implements this cancel function to handle cancellation when it runs in slow mode.
How to test it
Follow the Quickstart steps to set the environment. With the following changes:
Terminal 2
Run the worker with the loop and sleep parameters, this will allow some time to launch the cancel message.
Terminal 4
Run the hello world, that will send a new job to the worker.
In the worker it will appear the
id
of that message:Use the id 67c40762-2343-48ad-9bf1-23a9b98bdacb to send the cancellation message: