diff --git a/api/nodecontrol/client.go b/api/nodecontrol/client.go index 522b2ede..619d8f69 100644 --- a/api/nodecontrol/client.go +++ b/api/nodecontrol/client.go @@ -11,9 +11,7 @@ import ( "github.com/synadia-io/nex/models" ) -var ( - DefaultRequestTimeout = 5 * time.Second -) +var DefaultRequestTimeout = 5 * time.Second type ControlAPIClient struct { nc *nats.Conn diff --git a/api/nodecontrol/gen/start_workload_response.go b/api/nodecontrol/gen/start_workload_response.go index fb0fd158..edd866a0 100644 --- a/api/nodecontrol/gen/start_workload_response.go +++ b/api/nodecontrol/gen/start_workload_response.go @@ -12,6 +12,9 @@ type StartWorkloadResponseJson struct { // Issuer corresponds to the JSON schema field "issuer". Issuer string `json:"issuer"` + // Message corresponds to the JSON schema field "message". + Message string `json:"message"` + // Name corresponds to the JSON schema field "name". Name string `json:"name"` @@ -31,6 +34,9 @@ func (j *StartWorkloadResponseJson) UnmarshalJSON(b []byte) error { if _, ok := raw["issuer"]; raw != nil && !ok { return fmt.Errorf("field issuer in StartWorkloadResponseJson: required") } + if _, ok := raw["message"]; raw != nil && !ok { + return fmt.Errorf("field message in StartWorkloadResponseJson: required") + } if _, ok := raw["name"]; raw != nil && !ok { return fmt.Errorf("field name in StartWorkloadResponseJson: required") } diff --git a/api/nodecontrol/gen/stop_workload_response.go b/api/nodecontrol/gen/stop_workload_response.go index 9d40fa57..b0c5d76a 100644 --- a/api/nodecontrol/gen/stop_workload_response.go +++ b/api/nodecontrol/gen/stop_workload_response.go @@ -12,6 +12,9 @@ type StopWorkloadResponseJson struct { // Issuer corresponds to the JSON schema field "issuer". Issuer string `json:"issuer"` + // Message corresponds to the JSON schema field "message". + Message string `json:"message"` + // Stopped corresponds to the JSON schema field "stopped". Stopped bool `json:"stopped"` } @@ -28,6 +31,9 @@ func (j *StopWorkloadResponseJson) UnmarshalJSON(b []byte) error { if _, ok := raw["issuer"]; raw != nil && !ok { return fmt.Errorf("field issuer in StopWorkloadResponseJson: required") } + if _, ok := raw["message"]; raw != nil && !ok { + return fmt.Errorf("field message in StopWorkloadResponseJson: required") + } if _, ok := raw["stopped"]; raw != nil && !ok { return fmt.Errorf("field stopped in StopWorkloadResponseJson: required") } diff --git a/api/nodecontrol/start-workload-response.json b/api/nodecontrol/start-workload-response.json index 47530000..e38e298e 100644 --- a/api/nodecontrol/start-workload-response.json +++ b/api/nodecontrol/start-workload-response.json @@ -15,13 +15,17 @@ }, "started": { "type": "boolean" + }, + "message": { + "type": "string" } }, "required": [ "id", "issuer", "name", - "started" + "started", + "message" ], "definitions": {}, "additionalProperties": false diff --git a/api/nodecontrol/stop-workload-response.json b/api/nodecontrol/stop-workload-response.json index 2e198e05..ff74049f 100644 --- a/api/nodecontrol/stop-workload-response.json +++ b/api/nodecontrol/stop-workload-response.json @@ -12,12 +12,16 @@ }, "stopped": { "type": "boolean" + }, + "message": { + "type": "string" } }, "required": [ "id", "issuer", - "stopped" + "stopped", + "message" ], "definitions": {}, "additionalProperties": false diff --git a/cmd/nex/node.go b/cmd/nex/node.go index 813bdff5..136877a1 100644 --- a/cmd/nex/node.go +++ b/cmd/nex/node.go @@ -348,6 +348,8 @@ type Up struct { HideWorkloadLogs bool `name:"hide-workload-logs" help:"Hide logs from workloads" default:"false"` ShowSystemLogs bool `name:"system-logs" help:"Show verbose level logs from inside actor framework" default:"false" hidden:""` OCICache string `name:"oci-cache" help:"Path to OCI cache registry" placeholder:"localhost:5000"` + StartMessage string `name:"start-message" help:"Message to display on the successful start of a workload" placeholder:"Great job!"` + StopMessage string `name:"stop-message" help:"Message to display on the successful stop of a workload" placeholder:"Goodbye!"` HostServicesConfig HostServicesConfig `embed:"" prefix:"hostservices." group:"Host Services Configuration"` OtelConfig OtelConfig `embed:"" prefix:"otel." group:"OpenTelemetry Configuration"` @@ -435,6 +437,8 @@ func (u Up) Run(ctx context.Context, globals *Globals, n *Node) error { options.WithValidIssuers(u.ValidIssuers), options.WithOCICacheRegistry(u.OCICache), options.WithDevMode(globals.DevMode), + options.WithStartWorkloadMessage(u.StartMessage), + options.WithStopWorkloadMessage(u.StopMessage), options.WithOTelOptions(options.OTelOptions{ MetricsEnabled: u.OtelConfig.OtelMetrics, MetricsPort: u.OtelConfig.OtelMetricsPort, diff --git a/cmd/nex/workload.go b/cmd/nex/workload.go index f614569e..3134b6c9 100644 --- a/cmd/nex/workload.go +++ b/cmd/nex/workload.go @@ -17,9 +17,8 @@ import ( "net/http" "os" "path/filepath" - "time" - "strings" + "time" "github.com/jedib0t/go-pretty/v6/table" "github.com/synadia-io/nex/api/nodecontrol" @@ -239,8 +238,14 @@ func (r RunWorkload) Run(ctx context.Context, globals *Globals, w *Workload) err if resp.Started { if r.NodeId != "" { fmt.Printf("Workload %s [%s] started on node %s\n", r.WorkloadName, resp.Id, r.NodeId) + if resp.Message != "" { + fmt.Println(resp.Message) + } } else { fmt.Printf("Workload %s [%s] started\n", r.WorkloadName, resp.Id) + if resp.Message != "" { + fmt.Println(resp.Message) + } } } else { fmt.Printf("Workload %s failed to start\n", r.WorkloadName) @@ -289,6 +294,9 @@ func (s StopWorkload) Run(ctx context.Context, globals *Globals, w *Workload) er if resp.Stopped { fmt.Printf("Workload %s stopped\n", s.WorkloadId) + if resp.Message != "" { + fmt.Println(resp.Message) + } } else { fmt.Printf("Workload %s failed to stop\n", s.WorkloadId) } @@ -779,7 +787,6 @@ func (b BundleWorkload) Run(ctx context.Context, globals *Globals) error { _, err = io.Copy(tw, file) return err }) - if err != nil { return err } diff --git a/models/node_options.go b/models/node_options.go index 3e12b266..6215e675 100644 --- a/models/node_options.go +++ b/models/node_options.go @@ -26,6 +26,9 @@ type NodeOptions struct { OCICacheRegistry string DevMode bool + StartWorkloadMessage string + StopWorkloadMessage string + Errs error } @@ -136,6 +139,18 @@ func WithDevMode(b bool) NodeOption { } } +func WithStartWorkloadMessage(s string) NodeOption { + return func(n *NodeOptions) { + n.StartWorkloadMessage = s + } +} + +func WithStopWorkloadMessage(s string) NodeOption { + return func(n *NodeOptions) { + n.StopWorkloadMessage = s + } +} + type OTelOptions struct { MetricsEnabled bool MetricsExporter string diff --git a/node/internal/actors/control_api.go b/node/internal/actors/control_api.go index df37d6c6..322048ec 100644 --- a/node/internal/actors/control_api.go +++ b/node/internal/actors/control_api.go @@ -23,8 +23,10 @@ import ( actorproto "github.com/synadia-io/nex/node/internal/actors/pb" ) -const ControlAPIActorName = "control_api" -const DefaultAskDuration = 10 * time.Second +const ( + ControlAPIActorName = "control_api" + DefaultAskDuration = 10 * time.Second +) const ( AuctionResponseType = "io.nats.nex.v2.auction_response" @@ -59,6 +61,8 @@ type NodeCallback interface { EncryptPayload([]byte, string) ([]byte, string, error) DecryptPayload([]byte) ([]byte, error) EmitEvent(string, json.RawMessage) error + StartWorkloadMessage() string + StopWorkloadMessage() string } type StateCallback interface { @@ -95,7 +99,6 @@ func (a *ControlAPI) PreStart(ctx context.Context) error { } func (a *ControlAPI) PostStop(ctx context.Context) error { - return nil } @@ -362,8 +365,9 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) { models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err)) return } - - models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "") + resp := startResponseFromProto(&workloadStarted) + resp.Message = api.nodeCallback.StartWorkloadMessage() + models.RespondEnvelope(m, RunResponseType, 200, resp, "") } func (api *ControlAPI) handleUndeploy(m *nats.Msg) { @@ -386,7 +390,7 @@ findWorkload: for _, grandchild := range child.Children() { // iterate over all workloads if grandchild.Name() == workloadId { askResp, err = api.self.Ask(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId}, DefaultAskDuration) - //err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId}) + // err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId}) if err != nil { api.logger.Error("Failed to stop workload", slog.Any("error", err)) models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("Failed to stop workload: %s", err)) @@ -418,7 +422,9 @@ findWorkload: models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err)) return } - models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(&workloadStopped), "") + resp := stopResponseFromProto(&workloadStopped) + resp.Message = api.nodeCallback.StopWorkloadMessage() + models.RespondEnvelope(m, StopResponseType, 200, resp, "") } func (api *ControlAPI) handleInfo(m *nats.Msg) { @@ -477,7 +483,7 @@ func (api *ControlAPI) handleLameDuck(m *nats.Msg) { } ticker := time.NewTicker(100 * time.Millisecond) - for _ = range ticker.C { + for range ticker.C { if agentSuper.ChildrenCount() == 0 { ticker.Stop() cancel() diff --git a/node/node.go b/node/node.go index 07f86f46..bab9c174 100644 --- a/node/node.go +++ b/node/node.go @@ -94,8 +94,10 @@ func NewNexNode(serverKey nkeys.KeyPair, nc *nats.Conn, opts ...models.NodeOptio HostServiceOptions: models.HostServiceOptions{ Services: make(map[string]models.ServiceConfig), }, - OCICacheRegistry: "", - DevMode: false, + OCICacheRegistry: "", + DevMode: false, + StartWorkloadMessage: "", + StopWorkloadMessage: "", }, } @@ -209,9 +211,9 @@ func (nn *nexNode) initializeSupervisionTree() error { goakt.WithPassivationDisabled(), // In the non-v2 version of goakt, these functions were supported. // TODO: figure out why they're gone or how we can plug in our own impls - //goakt.WithTelemetry(telemetry), - //goakt.WithTracing(), - //goakt.WithSupervisorDirective(restartDirective), + // goakt.WithTelemetry(telemetry), + // goakt.WithTracing(), + // goakt.WithSupervisorDirective(restartDirective), goakt.WithActorInitMaxRetries(3)) if err != nil { return err @@ -230,7 +232,6 @@ func (nn *nexNode) initializeSupervisionTree() error { agentSuper, err := nn.actorSystem.Spawn(nn.ctx, actors.AgentSupervisorActorName, actors.CreateAgentSupervisor(nn.actorSystem, *nn.options), goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)), ) - if err != nil { return err } @@ -248,7 +249,6 @@ func (nn *nexNode) initializeSupervisionTree() error { _, err = nn.actorSystem.Spawn(nn.ctx, actors.InternalNatsServerActorName, inats, goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)), ) - if err != nil { return err } @@ -274,7 +274,6 @@ func (nn *nexNode) initializeSupervisionTree() error { _, err = agentSuper.SpawnChild(nn.ctx, models.DirectStartActorName, actors.CreateDirectStartAgent(nn.ctx, nn.nc, pk, *nn.options, nn.options.Logger.WithGroup(models.DirectStartActorName), nn), goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)), ) - if err != nil { return err } @@ -320,7 +319,7 @@ func (nn *nexNode) initializeSupervisionTree() error { } if len(wl) > 0 { - //direct-start_wdhGT117n7TOHpsasG2lRP + // direct-start_wdhGT117n7TOHpsasG2lRP nn.options.Logger.Info("Existing state detected, Restoring now") for _, c := range agentSuper.Children() { wl := getWorkloads(c.Name()) @@ -477,8 +476,8 @@ func (nn *nexNode) GetInfo(namespace string) (*actorproto.NodeInfo, error) { } resp := &actorproto.NodeInfo{ Id: pk, - //FINDME - //TargetXkey: nn.options. + // FINDME + // TargetXkey: nn.options. Tags: nn.options.Tags, Uptime: time.Since(nn.startedAt).String(), Version: VERSION, @@ -729,3 +728,11 @@ func (nn *nexNode) getState() (map[string]*actorproto.StartWorkload, error) { return reqs, nil } + +func (nn *nexNode) StartWorkloadMessage() string { + return nn.options.StartWorkloadMessage +} + +func (nn *nexNode) StopWorkloadMessage() string { + return nn.options.StopWorkloadMessage +} diff --git a/test/copy_workload_test.go b/test/copy_workload_test.go index d06c5612..ca65e223 100644 --- a/test/copy_workload_test.go +++ b/test/copy_workload_test.go @@ -64,7 +64,7 @@ func TestCopyWorkload(t *testing.T) { be.Equal(t, "ok", string(msg.Data)) - re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) + re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started\n.*$`) match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String())) be.Equal(t, 2, len(match)) origWorkloadId := match[1] @@ -134,7 +134,7 @@ func TestMultipleCopyWorkload(t *testing.T) { origDeploy.Stdout = origStdOut be.NilErr(t, origDeploy.Run()) - re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) + re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started\n.*$`) match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String())) be.Equal(t, 2, len(match)) origWorkloadId := match[1] diff --git a/test/helpers.go b/test/helpers.go index 4a510929..b4600f91 100644 --- a/test/helpers.go +++ b/test/helpers.go @@ -162,6 +162,6 @@ func startNexNodeCmd(t testing.TB, workingDir, nodeSeed, xkeySeed, natsServer, n xkeySeed = string(xSeed) } - cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed) + cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed, "--start-message", "test workload started", "--stop-message", "test workload stopped") return cmd } diff --git a/test/workload_test.go b/test/workload_test.go index ca438a88..d0ac3d8b 100644 --- a/test/workload_test.go +++ b/test/workload_test.go @@ -83,7 +83,7 @@ func TestDirectStartService(t *testing.T) { be.Equal(t, 0, len(cmdstderr.Bytes())) - re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started on node (?P[A-Z0-9]+)$`) + re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started on node (?P[A-Z0-9]+)\n.*$`) match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String())) be.Equal(t, 3, len(match)) be.Equal(t, pub, match[2]) @@ -143,7 +143,7 @@ func TestDirectStartFunction(t *testing.T) { be.Equal(t, 0, len(cmdstderr.Bytes())) - re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) + re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started\n.*$`) match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String())) be.Equal(t, 2, len(match)) time.Sleep(500 * time.Millisecond) @@ -244,7 +244,7 @@ func TestDirectStop(t *testing.T) { be.Equal(t, 0, len(cmdstderr.Bytes())) - re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) + re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started\n.*$`) match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String())) be.Equal(t, 2, len(match)) @@ -259,6 +259,7 @@ func TestDirectStop(t *testing.T) { be.NilErr(t, cmd.Run()) be.In(t, "Workload "+workloadID+" stopped", cmdstdout.String()) + be.In(t, "test workload stopped", cmdstdout.String()) cancel() }() @@ -297,6 +298,7 @@ func TestDirectStopNoWorkload(t *testing.T) { be.NilErr(t, cmd.Run()) be.In(t, "Workload not found", cmdstdout.String()) + be.NotIn(t, "test workload stopped", cmdstdout.String()) cancel() }()