Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support delaying the publishing of messages (by x seconds) #24

Merged
merged 1 commit into from
May 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,15 @@ For example the following show a minimal example of such a route output:
|`.skip`|boolean|If true, the output message will be ignored by the runner and the `.message` will not be sent via MQTT or REST|
|`.context`|boolean|Indicates if the context property `_ctx` of the `.message` should be included in the outgoing message or not. The `_ctx` is added automatically by the template engine to add message tracing|
|`.end`|boolean|The outgoing message should not be processed by any other routes. This only works if `.context` is NOT set to `false`|
|`.delay`|number|Delay in seconds to wait before publishing the message|
|`.api`|object|Object containing information about which HTTP Request should be sent. Inclusion of the `.api` property indicates that a HTTP Request will be sent instead of an MQTT message (see below for the expected properties of the object|
|`.api.method`|string|HTTP Request Method, e.g. `GET`, `POST`, `PUT`|
|`.api.path`|string|HTTP Request path, e.g. `devicecontrol/operations/12345`|
|`.raw_message`|string|String based MQTT payload (e.g. good for c8y SmartREST 2.0 messages). Note: this could be deprecated in the future once the `.message` can handle both strings and object formats|
|`.updates[]`|array of objects|Additional MQTT messages that will also be sent, however these are intended for messages that will not be processed by other routes.|
|`.updates[].topic`|string|MQTT topic for the update message|
|`.updates[].message`|string|MQTT payload for the update message. Can be a string or an object. It will not contain any reference to the context property|
|`.updates[].delay`|number|Delay in seconds to wait before publishing the message|
|`.updates[].skip`|boolean|The update message will be ignored if this is set to `true`|


Expand Down
45 changes: 35 additions & 10 deletions pkg/service/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ import (

var TedgeBinary = "tedge"

func optionalDelay(delaySec float32, f func()) {
// Don't bother with sub second delays
if delaySec > 0.9 {
time.AfterFunc(time.Duration(int(delaySec*1000))*time.Millisecond, f)
} else {
f()
}
}

func WithMQTTPublisher(client mqtt.Client, topic string, message any) func() {
return func() {
client.Publish(topic, 0, false, message)
}
}

func WithRESTRequest(client *APIClient, host, method, path string, message any) func() {
return func() {
if err := SendAPIRequest(client, host, method, path, message); err != nil {
slog.Warn("Failed to send api request.", "error", err)
}
}
}

func NewStreamFactory(client mqtt.Client, apiClient *APIClient, route routes.Route, maxDepth int, postDelay time.Duration, opts ...jsonnet.TemplateOption) MessageHandler {

if maxDepth <= 0 {
Expand Down Expand Up @@ -87,7 +110,7 @@ func NewStreamFactory(client mqtt.Client, apiClient *APIClient, route routes.Rou
case string:
slog.Info("Publishing update message.", "topic", m.Topic, "message", m.Message)
if client != nil && !engine.DryRun() {
client.Publish(m.Topic, 0, false, m.Message)
optionalDelay(m.Delay, WithMQTTPublisher(client, m.Topic, m.Message))
}
default:
preMsg, preErr := json.Marshal(m.Message)
Expand All @@ -96,7 +119,7 @@ func NewStreamFactory(client mqtt.Client, apiClient *APIClient, route routes.Rou
} else {
slog.Info("Publishing update message.", "topic", m.Topic, "message", string(preMsg))
if client != nil && !engine.DryRun() {
client.Publish(m.Topic, 0, false, preMsg)
optionalDelay(m.Delay, WithMQTTPublisher(client, m.Topic, preMsg))
}
}
}
Expand Down Expand Up @@ -135,14 +158,14 @@ func NewStreamFactory(client mqtt.Client, apiClient *APIClient, route routes.Rou
// TODO: Switch to using the .MessageString() method
if sm.IsMQTTMessage() {
if sm.RawMessage != "" {
slog.Info("Publishing new message.", "topic", sm.Topic, "message", sm.RawMessage)
slog.Info("Publishing new message.", "topic", sm.Topic, "message", sm.RawMessage, "delay", sm.Delay)
if client != nil && !engine.DryRun() {
client.Publish(sm.Topic, 0, false, sm.RawMessage)
optionalDelay(sm.Delay, WithMQTTPublisher(client, sm.Topic, sm.RawMessage))
}
} else {
slog.Info("Publishing new message.", "topic", sm.Topic, "message", string(output))
slog.Info("Publishing new message.", "topic", sm.Topic, "message", string(output), "delay", sm.Delay)
if client != nil && !engine.DryRun() {
client.Publish(sm.Topic, 0, false, output)
optionalDelay(sm.Delay, WithMQTTPublisher(client, sm.Topic, output))
}
}
}
Expand All @@ -153,9 +176,7 @@ func NewStreamFactory(client mqtt.Client, apiClient *APIClient, route routes.Rou
return nil, err
}
if !engine.DryRun() {
if err := SendAPIRequest(apiClient, sm.API.Host, sm.API.Method, sm.API.Path, sm.Message); err != nil {
slog.Warn("Failed to send api request.", "error", err)
}
optionalDelay(sm.Delay, WithRESTRequest(apiClient, sm.API.Host, sm.API.Method, sm.API.Path, sm.Message))
}
}

Expand Down Expand Up @@ -349,7 +370,11 @@ func DisplayMessage(name string, in, out *streamer.OutputMessage, w io.Writer, c
fmt.Fprintf(w, "\nOutput Updates\n")
for _, update := range out.Updates {
if !update.Skip {
fmt.Fprintf(w, " %-10s%v\n", "topic:", update.Topic)
if update.Delay > 0 {
fmt.Fprintf(w, " %-10s%v (delayed: %.1fs)\n", "topic:", update.Topic, update.Delay)
} else {
fmt.Fprintf(w, " %-10s%v\n", "topic:", update.Topic)
}
displayJsonMessage(w, update.Message, compact, useColor)
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ type Streamer struct {
}

type SimpleOutputMessage struct {
Topic string `json:"topic"`
Message any `json:"message"`
Skip bool `json:"skip"`
Topic string `json:"topic"`
Message any `json:"message"`
Skip bool `json:"skip"`
Delay float32 `json:"delay"`
}

func (m *SimpleOutputMessage) MessageString() string {
Expand Down Expand Up @@ -56,6 +57,7 @@ type OutputMessage struct {
Topic string `json:"topic"`
Message any `json:"message,omitempty"`
RawMessage string `json:"raw_message,omitempty"`
Delay float32 `json:"delay"`
Updates []SimpleOutputMessage `json:"updates"`
API *RestRequest `json:"api,omitempty"`
Skip bool `json:"skip"`
Expand Down