Skip to content

Commit

Permalink
Merge branch 'main' into test/actors
Browse files Browse the repository at this point in the history
  • Loading branch information
jordan-rash authored Jan 3, 2025
2 parents 5dd61eb + c7a4459 commit bffa94a
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 21 deletions.
4 changes: 1 addition & 3 deletions api/nodecontrol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"github.com/synadia-io/nex/models"
)

var (
DefaultRequestTimeout = 5 * time.Second
)
var DefaultRequestTimeout = 5 * time.Second

type ControlAPIClient struct {
nc *nats.Conn
Expand Down
6 changes: 6 additions & 0 deletions api/nodecontrol/gen/start_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/nodecontrol/gen/stop_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/nodecontrol/start-workload-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
},
"started": {
"type": "boolean"
},
"message": {
"type": "string"
}
},
"required": [
"id",
"issuer",
"name",
"started"
"started",
"message"
],
"definitions": {},
"additionalProperties": false
Expand Down
6 changes: 5 additions & 1 deletion api/nodecontrol/stop-workload-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
},
"stopped": {
"type": "boolean"
},
"message": {
"type": "string"
}
},
"required": [
"id",
"issuer",
"stopped"
"stopped",
"message"
],
"definitions": {},
"additionalProperties": false
Expand Down
5 changes: 3 additions & 2 deletions cmd/nex/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"disorder.dev/shandler"
"github.com/nats-io/nats.go"
"github.com/synadia-io/nex/models"
)

func configureLogger(cfg *Globals, nc *nats.Conn, serverPublicKey string, showSystemLogs, hideWorkloadLogs bool) *slog.Logger {
Expand Down Expand Up @@ -85,8 +86,8 @@ func configureLogger(cfg *Globals, nc *nats.Conn, serverPublicKey string, showSy
}
}
if slices.Contains(cfg.Target, "nats") {
natsLogSubject := fmt.Sprintf("$NEX.logs.%s.stdout", serverPublicKey)
natsErrLogSubject := fmt.Sprintf("$NEX.logs.%s.stderr", serverPublicKey)
natsLogSubject := fmt.Sprintf("%s.%s.%s.stdout", models.LogAPIPrefix, models.NodeSystemNamespace, serverPublicKey)
natsErrLogSubject := fmt.Sprintf("%s.%s.%s.stderr", models.LogAPIPrefix, models.NodeSystemNamespace, serverPublicKey)
stdoutWriters = append(stdoutWriters, NewNatsLogger(nc, natsLogSubject))
stderrWriters = append(stderrWriters, NewNatsLogger(nc, natsErrLogSubject))
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/nex/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ type Up struct {
HideWorkloadLogs bool `name:"hide-workload-logs" help:"Hide logs from workloads" default:"false"`
ShowSystemLogs bool `name:"system-logs" help:"Show verbose level logs from inside actor framework" default:"false" hidden:""`
OCICache string `name:"oci-cache" help:"Path to OCI cache registry" placeholder:"localhost:5000"`
StartMessage string `name:"start-message" help:"Message to display on the successful start of a workload" placeholder:"Great job!"`
StopMessage string `name:"stop-message" help:"Message to display on the successful stop of a workload" placeholder:"Goodbye!"`

HostServicesConfig HostServicesConfig `embed:"" prefix:"hostservices." group:"Host Services Configuration"`
OtelConfig OtelConfig `embed:"" prefix:"otel." group:"OpenTelemetry Configuration"`
Expand Down Expand Up @@ -435,6 +437,8 @@ func (u Up) Run(ctx context.Context, globals *Globals, n *Node) error {
options.WithValidIssuers(u.ValidIssuers),
options.WithOCICacheRegistry(u.OCICache),
options.WithDevMode(globals.DevMode),
options.WithStartWorkloadMessage(u.StartMessage),
options.WithStopWorkloadMessage(u.StopMessage),
options.WithOTelOptions(options.OTelOptions{
MetricsEnabled: u.OtelConfig.OtelMetrics,
MetricsPort: u.OtelConfig.OtelMetricsPort,
Expand Down
13 changes: 10 additions & 3 deletions cmd/nex/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"strings"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/synadia-io/nex/api/nodecontrol"
Expand Down Expand Up @@ -239,8 +238,14 @@ func (r RunWorkload) Run(ctx context.Context, globals *Globals, w *Workload) err
if resp.Started {
if r.NodeId != "" {
fmt.Printf("Workload %s [%s] started on node %s\n", r.WorkloadName, resp.Id, r.NodeId)
if resp.Message != "" {
fmt.Println(resp.Message)
}
} else {
fmt.Printf("Workload %s [%s] started\n", r.WorkloadName, resp.Id)
if resp.Message != "" {
fmt.Println(resp.Message)
}
}
} else {
fmt.Printf("Workload %s failed to start\n", r.WorkloadName)
Expand Down Expand Up @@ -289,6 +294,9 @@ func (s StopWorkload) Run(ctx context.Context, globals *Globals, w *Workload) er

if resp.Stopped {
fmt.Printf("Workload %s stopped\n", s.WorkloadId)
if resp.Message != "" {
fmt.Println(resp.Message)
}
} else {
fmt.Printf("Workload %s failed to stop\n", s.WorkloadId)
}
Expand Down Expand Up @@ -779,7 +787,6 @@ func (b BundleWorkload) Run(ctx context.Context, globals *Globals) error {
_, err = io.Copy(tw, file)
return err
})

if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions models/node_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type NodeOptions struct {
OCICacheRegistry string
DevMode bool

StartWorkloadMessage string
StopWorkloadMessage string

Errs error
}

Expand Down Expand Up @@ -136,6 +139,18 @@ func WithDevMode(b bool) NodeOption {
}
}

func WithStartWorkloadMessage(s string) NodeOption {
return func(n *NodeOptions) {
n.StartWorkloadMessage = s
}
}

func WithStopWorkloadMessage(s string) NodeOption {
return func(n *NodeOptions) {
n.StopWorkloadMessage = s
}
}

type OTelOptions struct {
MetricsEnabled bool
MetricsExporter string
Expand Down
11 changes: 8 additions & 3 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type NodeCallback interface {
EncryptPayload([]byte, string) ([]byte, string, error)
DecryptPayload([]byte) ([]byte, error)
EmitEvent(string, json.RawMessage) error
StartWorkloadMessage() string
StopWorkloadMessage() string
}

type StateCallback interface {
Expand Down Expand Up @@ -363,8 +365,9 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) {
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
resp := startResponseFromProto(&workloadStarted)
resp.Message = api.nodeCallback.StartWorkloadMessage()
models.RespondEnvelope(m, RunResponseType, 200, resp, "")
}

func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
Expand Down Expand Up @@ -419,7 +422,9 @@ findWorkload:
models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}
models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(&workloadStopped), "")
resp := stopResponseFromProto(&workloadStopped)
resp.Message = api.nodeCallback.StopWorkloadMessage()
models.RespondEnvelope(m, StopResponseType, 200, resp, "")
}

func (api *ControlAPI) handleInfo(m *nats.Msg) {
Expand Down
14 changes: 12 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func NewNexNode(serverKey nkeys.KeyPair, nc *nats.Conn, opts ...models.NodeOptio
HostServiceOptions: models.HostServiceOptions{
Services: make(map[string]models.ServiceConfig),
},
OCICacheRegistry: "",
DevMode: false,
OCICacheRegistry: "",
DevMode: false,
StartWorkloadMessage: "",
StopWorkloadMessage: "",
},
}

Expand Down Expand Up @@ -719,3 +721,11 @@ func (nn *nexNode) getState() (map[string]*actorproto.StartWorkload, error) {

return reqs, nil
}

func (nn *nexNode) StartWorkloadMessage() string {
return nn.options.StartWorkloadMessage
}

func (nn *nexNode) StopWorkloadMessage() string {
return nn.options.StopWorkloadMessage
}
4 changes: 2 additions & 2 deletions test/copy_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCopyWorkload(t *testing.T) {

be.Equal(t, "ok", string(msg.Data))

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestMultipleCopyWorkload(t *testing.T) {
origDeploy.Stdout = origStdOut
be.NilErr(t, origDeploy.Run())

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]
Expand Down
2 changes: 1 addition & 1 deletion test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,6 @@ func startNexNodeCmd(t testing.TB, workingDir, nodeSeed, xkeySeed, natsServer, n
xkeySeed = string(xSeed)
}

cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed)
cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed, "--start-message", "test workload started", "--stop-message", "test workload stopped")
return cmd
}
8 changes: 5 additions & 3 deletions test/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestDirectStartService(t *testing.T) {

be.Equal(t, 0, len(cmdstderr.Bytes()))

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started on node (?P<node>[A-Z0-9]+)$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started on node (?P<node>[A-Z0-9]+)\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String()))
be.Equal(t, 3, len(match))
be.Equal(t, pub, match[2])
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestDirectStartFunction(t *testing.T) {

be.Equal(t, 0, len(cmdstderr.Bytes()))

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String()))
be.Equal(t, 2, len(match))
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestDirectStop(t *testing.T) {

be.Equal(t, 0, len(cmdstderr.Bytes()))

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String()))
be.Equal(t, 2, len(match))

Expand All @@ -259,6 +259,7 @@ func TestDirectStop(t *testing.T) {
be.NilErr(t, cmd.Run())

be.In(t, "Workload "+workloadID+" stopped", cmdstdout.String())
be.In(t, "test workload stopped", cmdstdout.String())
cancel()
}()

Expand Down Expand Up @@ -297,6 +298,7 @@ func TestDirectStopNoWorkload(t *testing.T) {
be.NilErr(t, cmd.Run())

be.In(t, "Workload not found", cmdstdout.String())
be.NotIn(t, "test workload stopped", cmdstdout.String())
cancel()
}()

Expand Down

0 comments on commit bffa94a

Please sign in to comment.