Skip to content

Commit

Permalink
feat: Add more events for starting and stopping of worker
Browse files Browse the repository at this point in the history
* When worker is started, then it sends event "started"
* It also sends event "stopped", when it is terminated and
  proces cannot handle message anymore
* This functionality is IMHO important for developers of
  yggdrasil worker. When you develop new worker it is
  important to see that your worker "communication" with
  yggd during start. It is also decency to say something
  like "good bye" at the end.
* Small refactoring of code (do not use numeric constant)
* Updated doc strings
* Extended documentation in com.redhat.Yggdrasil1.Worker1.xml
  according changes in code
  • Loading branch information
jirihnidek authored and subpop committed Nov 15, 2023
1 parent cc61d41 commit 818d654
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
1 change: 1 addition & 0 deletions internal/work/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (d *Dispatcher) Connect() error {
go func() {
for s := range signals {
log.Tracef("received signal: %#v", s)

dest, err := d.senderName(dbus.Sender(s.Sender))
if err != nil {
log.Errorf("cannot find sender: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions ipc/com.redhat.Yggdrasil1.Worker1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@
3 = WORKING
Emitted when the worker wishes to continue to announce it is
working.
4 = STARTED
Emitted when the worker is started, and it is ready
to handle received messages. The message_id and response_id are
empty.
5 = STOPPED
Emitted when the worker is stopped, and it is not able
to handle received messages anymore. The message_id and
response_id are also empty.
-->
<signal name="Event">
<arg type="u" name="name" />
Expand Down
40 changes: 28 additions & 12 deletions ipc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipc

import (
_ "embed"
"fmt"
)

//go:embed com.redhat.Yggdrasil1.Dispatcher1.xml
Expand All @@ -12,13 +13,16 @@ var InterfaceDispatcher string
type DispatcherEvent uint

const (
// Emitted when the dispatcher receives the "disconnect" command.
// DispatcherEventReceivedDisconnect is emitted when the dispatcher receives
// the "disconnect" command.
DispatcherEventReceivedDisconnect DispatcherEvent = 1

// Emitted when the transport unexpected disconnects from the network.
// DispatcherEventUnexpectedDisconnect is emitted when the transport unexpected
// disconnects from the network.
DispatcherEventUnexpectedDisconnect DispatcherEvent = 2

// Emitted when the transport reconnects to the network.
// DispatcherEventConnectionRestored is emitted when the transport reconnects
// to the network.
DispatcherEventConnectionRestored DispatcherEvent = 3
)

Expand All @@ -29,28 +33,40 @@ type WorkerEventName uint

const (

// Emitted when the worker "accepts" a dispatched message and begins
// "working".
// WorkerEventNameBegin is emitted when the worker "accepts"
// a dispatched message and begins "working".
WorkerEventNameBegin WorkerEventName = 1

// Emitted when the worker finishes "working".
// WorkerEventNameEnd is emitted when the worker finishes "working".
WorkerEventNameEnd WorkerEventName = 2

// Emitted when the worker wishes to continue to announce it is
// working.
// WorkerEventNameWorking is emitted when the worker wishes
// to continue to announce it is working.
WorkerEventNameWorking WorkerEventName = 3

// WorkerEventNameStarted is emitted when worker finished starting
// process, and it can start process received messages.
WorkerEventNameStarted WorkerEventName = 4

// WorkerEventNameStopped is emitted when worker is stopped,
// and it cannot process any message.
WorkerEventNameStopped WorkerEventName = 5
)

func (e WorkerEventName) String() string {
switch e {
case 1:
case WorkerEventNameBegin:
return "BEGIN"
case 2:
case WorkerEventNameEnd:
return "END"
case 3:
case WorkerEventNameWorking:
return "WORKING"
case WorkerEventNameStarted:
return "STARTED"
case WorkerEventNameStopped:
return "STOPPED"
}
return ""
return fmt.Sprintf("UNKNOWN (value: %d)", e)
}

type WorkerEvent struct {
Expand Down
22 changes: 22 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error {
return fmt.Errorf("request name failed")
}

// Emit a started event
err = w.EmitEvent(
ipc.WorkerEventNameStarted,
"",
"",
map[string]string{},
)
if err != nil {
return fmt.Errorf("cannot emit event: %w", err)
}

signals := make(chan *dbus.Signal)
w.conn.Signal(signals)
go func() {
Expand All @@ -142,6 +153,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error {

<-quit

// Emit a stopped event
err = w.EmitEvent(
ipc.WorkerEventNameStopped,
"",
"",
map[string]string{},
)
if err != nil {
return fmt.Errorf("cannot emit event: %w", err)
}

return nil
}

Expand Down

0 comments on commit 818d654

Please sign in to comment.