From 818d654aef4f75c19609bf13d219a26e3c6ddc9b Mon Sep 17 00:00:00 2001 From: Jiri Hnidek Date: Mon, 13 Nov 2023 09:25:27 +0100 Subject: [PATCH] feat: Add more events for starting and stopping of worker * When worker is started, then it sends event "started" * It also sends event "stopped", when it is terminated and proces cannot handle message anymore * This functionality is IMHO important for developers of yggdrasil worker. When you develop new worker it is important to see that your worker "communication" with yggd during start. It is also decency to say something like "good bye" at the end. * Small refactoring of code (do not use numeric constant) * Updated doc strings * Extended documentation in com.redhat.Yggdrasil1.Worker1.xml according changes in code --- internal/work/dispatcher.go | 1 + ipc/com.redhat.Yggdrasil1.Worker1.xml | 10 +++++++ ipc/interfaces.go | 40 +++++++++++++++++++-------- worker/worker.go | 22 +++++++++++++++ 4 files changed, 61 insertions(+), 12 deletions(-) diff --git a/internal/work/dispatcher.go b/internal/work/dispatcher.go index 6414d708..51e88ce4 100644 --- a/internal/work/dispatcher.go +++ b/internal/work/dispatcher.go @@ -111,6 +111,7 @@ func (d *Dispatcher) Connect() error { go func() { for s := range signals { log.Tracef("received signal: %#v", s) + dest, err := d.senderName(dbus.Sender(s.Sender)) if err != nil { log.Errorf("cannot find sender: %v", err) diff --git a/ipc/com.redhat.Yggdrasil1.Worker1.xml b/ipc/com.redhat.Yggdrasil1.Worker1.xml index 2555eea5..cefcffad 100644 --- a/ipc/com.redhat.Yggdrasil1.Worker1.xml +++ b/ipc/com.redhat.Yggdrasil1.Worker1.xml @@ -74,6 +74,16 @@ 3 = WORKING Emitted when the worker wishes to continue to announce it is working. + + 4 = STARTED + Emitted when the worker is started, and it is ready + to handle received messages. The message_id and response_id are + empty. + + 5 = STOPPED + Emitted when the worker is stopped, and it is not able + to handle received messages anymore. The message_id and + response_id are also empty. --> diff --git a/ipc/interfaces.go b/ipc/interfaces.go index 6e9350ad..f94a81f1 100644 --- a/ipc/interfaces.go +++ b/ipc/interfaces.go @@ -2,6 +2,7 @@ package ipc import ( _ "embed" + "fmt" ) //go:embed com.redhat.Yggdrasil1.Dispatcher1.xml @@ -12,13 +13,16 @@ var InterfaceDispatcher string type DispatcherEvent uint const ( - // Emitted when the dispatcher receives the "disconnect" command. + // DispatcherEventReceivedDisconnect is emitted when the dispatcher receives + // the "disconnect" command. DispatcherEventReceivedDisconnect DispatcherEvent = 1 - // Emitted when the transport unexpected disconnects from the network. + // DispatcherEventUnexpectedDisconnect is emitted when the transport unexpected + // disconnects from the network. DispatcherEventUnexpectedDisconnect DispatcherEvent = 2 - // Emitted when the transport reconnects to the network. + // DispatcherEventConnectionRestored is emitted when the transport reconnects + // to the network. DispatcherEventConnectionRestored DispatcherEvent = 3 ) @@ -29,28 +33,40 @@ type WorkerEventName uint const ( - // Emitted when the worker "accepts" a dispatched message and begins - // "working". + // WorkerEventNameBegin is emitted when the worker "accepts" + // a dispatched message and begins "working". WorkerEventNameBegin WorkerEventName = 1 - // Emitted when the worker finishes "working". + // WorkerEventNameEnd is emitted when the worker finishes "working". WorkerEventNameEnd WorkerEventName = 2 - // Emitted when the worker wishes to continue to announce it is - // working. + // WorkerEventNameWorking is emitted when the worker wishes + // to continue to announce it is working. WorkerEventNameWorking WorkerEventName = 3 + + // WorkerEventNameStarted is emitted when worker finished starting + // process, and it can start process received messages. + WorkerEventNameStarted WorkerEventName = 4 + + // WorkerEventNameStopped is emitted when worker is stopped, + // and it cannot process any message. + WorkerEventNameStopped WorkerEventName = 5 ) func (e WorkerEventName) String() string { switch e { - case 1: + case WorkerEventNameBegin: return "BEGIN" - case 2: + case WorkerEventNameEnd: return "END" - case 3: + case WorkerEventNameWorking: return "WORKING" + case WorkerEventNameStarted: + return "STARTED" + case WorkerEventNameStopped: + return "STOPPED" } - return "" + return fmt.Sprintf("UNKNOWN (value: %d)", e) } type WorkerEvent struct { diff --git a/worker/worker.go b/worker/worker.go index cae14776..52a3783b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -125,6 +125,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error { return fmt.Errorf("request name failed") } + // Emit a started event + err = w.EmitEvent( + ipc.WorkerEventNameStarted, + "", + "", + map[string]string{}, + ) + if err != nil { + return fmt.Errorf("cannot emit event: %w", err) + } + signals := make(chan *dbus.Signal) w.conn.Signal(signals) go func() { @@ -142,6 +153,17 @@ func (w *Worker) Connect(quit <-chan os.Signal) error { <-quit + // Emit a stopped event + err = w.EmitEvent( + ipc.WorkerEventNameStopped, + "", + "", + map[string]string{}, + ) + if err != nil { + return fmt.Errorf("cannot emit event: %w", err) + } + return nil }