Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include message id in emitted worker events #115

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/yggctl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,20 @@ func listenAction(ctx *cli.Context) error {
}
name, ok := s.Body[1].(uint32)
if !ok {
return cli.Exit(fmt.Errorf("cannot cat %T as uint32", s.Body[1]), 1)
return cli.Exit(fmt.Errorf("cannot cast %T as uint32", s.Body[1]), 1)
}
messageID, ok := s.Body[2].(string)
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[2]), 1)
}
var message string
if len(s.Body) > 2 {
message, ok = s.Body[2].(string)
if len(s.Body) > 3 {
message, ok = s.Body[3].(string)
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[0]), 1)
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[3]), 1)
}
}
log.Printf("%v: %v: %v", worker, ipc.WorkerEventName(name), message)
log.Printf("%v: %v: %v: %v", worker, messageID, ipc.WorkerEventName(name), message)

}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/yggd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *Client) Connect() error {
// channel, emitting a D-Bus "WorkerEvent" signal for each.
go func() {
for e := range c.dispatcher.WorkerEvents {
args := []interface{}{e.Worker, e.Name}
args := []interface{}{e.Worker, e.Name, e.MessageID}
switch e.Name {
case ipc.WorkerEventNameWorking:
args = append(args, e.Message)
Expand Down
2 changes: 2 additions & 0 deletions dbus/com.redhat.Yggdrasil1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
WorkerEvent:
@worker: Name of the worker emitting the event.
@name: Name of the event.
@message_id: The id associated with the worker message.
@message: Optional message included with the event.

Emitted by a worker when certain conditions arise, such as beginning
Expand All @@ -60,6 +61,7 @@
<signal name="WorkerEvent">
<arg type="s" name="worker" />
<arg type="u" name="name" />
<arg type="s" name="message_id" />
<arg type="s" name="message" />
</signal>
</interface>
Expand Down
13 changes: 11 additions & 2 deletions internal/work/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,24 @@ func (d *Dispatcher) Connect() error {
}
event.Name = ipc.WorkerEventName(eventName)
event.Worker = strings.TrimPrefix(dest, "com.redhat.Yggdrasil1.Worker1.")

eventMessageID, ok := s.Body[1].(string)
if !ok {
log.Errorf("cannot convert %T to string", s.Body[1])
continue
}
event.MessageID = eventMessageID

switch ipc.WorkerEventName(eventName) {
case ipc.WorkerEventNameWorking:
eventMessage, ok := s.Body[1].(string)
eventMessage, ok := s.Body[2].(string)
if !ok {
log.Errorf("cannot convert %T to string", s.Body[1])
log.Errorf("cannot convert %T to string", s.Body[2])
continue
}
event.Message = eventMessage
}

d.WorkerEvents <- event
}
}
Expand Down
2 changes: 2 additions & 0 deletions ipc/com.redhat.Yggdrasil1.Worker1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<!--
Event:
@name: Name of the event.
@message_id: The id associated with the worker message.
@message: Optional message included with the event.

Emitted by a worker when certain conditions arise, such as beginning
Expand All @@ -65,6 +66,7 @@
-->
<signal name="Event">
<arg type="u" name="name" />
<arg type="s" name="message_id" />
<arg type="s" name="message" />
</signal>
</interface>
Expand Down
7 changes: 4 additions & 3 deletions ipc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (e WorkerEventName) String() string {
}

type WorkerEvent struct {
Worker string
Name WorkerEventName
Message string
Worker string
Name WorkerEventName
MessageID string
Message string
}
6 changes: 3 additions & 3 deletions worker/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
var sleepTime time.Duration

// echo opens a new dbus connection and calls the
// com.redhat.Yggdrasil1.Dispatcher1.Transmit method, returning the metadata and
// data it received.
// com.redhat.Yggdrasil1.Dispatcher1.Transmit method, returning the
// metadata, data, and the message id it received.
func echo(w *worker.Worker, addr string, id string, responseTo string, metadata map[string]string, data []byte) error {
if err := w.EmitEvent(ipc.WorkerEventNameWorking, fmt.Sprintf("echoing %v", data)); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameWorking, id, fmt.Sprintf("echoing %v", data)); err != nil {
return fmt.Errorf("cannot call EmitEvent: %w", err)
}

Expand Down
10 changes: 5 additions & 5 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (w *Worker) Transmit(addr string, id string, responseTo string, metadata ma
return
}

// EmitEvent emits a WorkerEvent, including an optional message.
func (w *Worker) EmitEvent(event ipc.WorkerEventName, message string) error {
args := []interface{}{event}
// EmitEvent emits a WorkerEvent, worker message id, and an optional message.
func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, message string) error {
args := []interface{}{event, messageID}
if message != "" {
args = append(args, message)
}
Expand All @@ -178,15 +178,15 @@ func (w *Worker) dispatch(addr string, id string, responseTo string, metadata ma
log.Tracef("metadata = %#v", metadata)
log.Tracef("data = %v", data)

if err := w.EmitEvent(ipc.WorkerEventNameBegin, ""); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, ""); err != nil {
return dbus.NewError("com.redhat.Yggdrasil1.Worker1.EventError", []interface{}{err.Error()})
}

go func() {
if err := w.rx(w, addr, id, responseTo, metadata, data); err != nil {
log.Errorf("cannot call rx: %v", err)
}
if err := w.EmitEvent(ipc.WorkerEventNameEnd, ""); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, ""); err != nil {
log.Errorf("cannot emit event: %v", err)
}
}()
Expand Down