Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use goakt testing framework #478

Merged
merged 11 commits into from
Jan 4, 2025
Merged
5 changes: 0 additions & 5 deletions api/nodecontrol/auction-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
"type": "string",
"description": "A one-time identifier used to target deployments"
},
"nexus": {
"type": "string",
"description": "The name of the nexus"
},
"version": {
"type": "string",
"description": "The version of the node"
Expand Down Expand Up @@ -50,7 +46,6 @@
},
"required": [
"bidder_id",
"nexus",
"version",
"uptime",
"target_xkey",
Expand Down
6 changes: 0 additions & 6 deletions api/nodecontrol/gen/auction_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions node/internal/actors/agent_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actors

import (
"context"
"io"
"log/slog"

"github.com/tochemey/goakt/v2/goaktpb"
Expand All @@ -16,8 +17,8 @@ const AgentSupervisorActorName = "agent_supervisor"

var _ goakt.Actor = (*AgentSupervisor)(nil)

func CreateAgentSupervisor(system goakt.ActorSystem, options models.NodeOptions) *AgentSupervisor {
return &AgentSupervisor{nodeOptions: options}
func CreateAgentSupervisor(options models.NodeOptions) *AgentSupervisor {
return &AgentSupervisor{logger: slog.New(slog.NewTextHandler(io.Discard, nil)), nodeOptions: options}
}

// Agent manager is responsible for starting one agent per workload type, supplying it with
Expand All @@ -30,7 +31,6 @@ type AgentSupervisor struct {
}

func (s *AgentSupervisor) PreStart(ctx context.Context) error {

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions node/internal/actors/agent_supervisor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package actors

import (
"context"
"io"
"log/slog"
"testing"
"time"

"github.com/synadia-io/nex/models"
actorproto "github.com/synadia-io/nex/node/internal/actors/pb"

"github.com/tochemey/goakt/v2/testkit"
)

func TestAgentSupervisor(t *testing.T) {
ctx := context.Background()

as := &AgentSupervisor{
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
nodeOptions: models.NodeOptions{},
}

tk := testkit.New(ctx, t)
t.Run("Send QueryWorkloads Message", func(t *testing.T) {
tk.Spawn(ctx, AgentSupervisorActorName, as)
probe := tk.NewProbe(ctx)
msg := new(actorproto.QueryWorkloads)
probe.SendSync(AgentSupervisorActorName, msg, time.Second)
resp := &actorproto.WorkloadList{
Workloads: []*actorproto.WorkloadSummary{},
}
probe.ExpectMessage(resp)
probe.ExpectNoMessage()
probe.Stop()
})

t.Run("Send SetLameDuck Message", func(t *testing.T) {
tk.Spawn(ctx, AgentSupervisorActorName, as)
probe := tk.NewProbe(ctx)
msg := new(actorproto.SetLameDuck)
probe.Send(AgentSupervisorActorName, msg)
probe.ExpectNoMessage()
probe.Stop()
})

t.Cleanup(func() {
tk.Shutdown(ctx)
})
}
30 changes: 11 additions & 19 deletions node/internal/actors/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,21 @@ import (
"testing"
"time"

"github.com/carlmjohnson/be"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

func startNatsServer(t testing.TB) (*server.Server, error) {
func startNatsServer(t testing.TB, workingDir string) *server.Server {
t.Helper()

s, err := server.NewServer(&server.Options{
Port: -1,
JetStream: true,
StoreDir: workingDir,
})

if err != nil {
return nil, err
}
be.NilErr(t, err)

go func() {
if err := server.Run(s); err != nil {
Expand All @@ -40,22 +39,18 @@ func startNatsServer(t testing.TB) (*server.Server, error) {
time.Sleep(1 * time.Second)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
return nil, err
}
be.NilErr(t, err)

jsCtx, err := jetstream.New(nc)
if err != nil {
return nil, err
}
be.NilErr(t, err)

_, err = jsCtx.CreateKeyValue(context.TODO(), jetstream.KeyValueConfig{
Bucket: "registry",
})
if err != nil {
return nil, err
}
be.NilErr(t, err)
nc.Close()

return s, nil
return s
}

func createTestBinary(t testing.TB, tmpDir string) (string, string, int) {
Expand Down Expand Up @@ -102,10 +97,7 @@ func createTestBinary(t testing.TB, tmpDir string) (string, string, int) {
}

func prepNatsObjStoreArtifact(t testing.TB, workingDir, binPath string) (string, *nats.Conn, error) {
s, err := startNatsServer(t)
if err != nil {
return "", nil, err
}
s := startNatsServer(t, t.TempDir())

nc, err := nats.Connect(s.ClientURL())
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
actorproto "github.com/synadia-io/nex/node/internal/actors/pb"
)

const ControlAPIActorName = "control_api"
const DefaultAskDuration = 10 * time.Second
const (
ControlAPIActorName = "control_api"
DefaultAskDuration = 10 * time.Second
)

const (
AuctionResponseType = "io.nats.nex.v2.auction_response"
Expand Down Expand Up @@ -95,7 +97,6 @@ func (a *ControlAPI) PreStart(ctx context.Context) error {
}

func (a *ControlAPI) PostStop(ctx context.Context) error {

return nil
}

Expand Down Expand Up @@ -386,7 +387,7 @@ findWorkload:
for _, grandchild := range child.Children() { // iterate over all workloads
if grandchild.Name() == workloadId {
askResp, err = api.self.Ask(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId}, DefaultAskDuration)
//err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId})
// err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId})
if err != nil {
api.logger.Error("Failed to stop workload", slog.Any("error", err))
models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("Failed to stop workload: %s", err))
Expand Down Expand Up @@ -477,7 +478,7 @@ func (api *ControlAPI) handleLameDuck(m *nats.Msg) {
}

ticker := time.NewTicker(100 * time.Millisecond)
for _ = range ticker.C {
for range ticker.C {
if agentSuper.ChildrenCount() == 0 {
ticker.Stop()
cancel()
Expand Down
1 change: 0 additions & 1 deletion node/internal/actors/control_api_conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func auctionResponseFromProto(response *actorproto.AuctionResponse) *api.Auction
}

return &api.AuctionResponseJson{
Nexus: response.Nexus,
BidderId: response.BidderId,
Status: gen.AuctionResponseJsonStatus{Status: convertedStatus},
Tags: api.AuctionResponseJsonTags{Tags: response.Tags},
Expand Down
105 changes: 105 additions & 0 deletions node/internal/actors/control_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package actors

import (
"bytes"
"context"
"encoding/json"
"log/slog"
"os"
"testing"
"time"

"github.com/carlmjohnson/be"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/synadia-io/nex/api/nodecontrol/gen"
"github.com/synadia-io/nex/models"
"github.com/tochemey/goakt/v2/testkit"
)

func TestControlApiAgent(t *testing.T) {
workingDir := t.TempDir()

s := startNatsServer(t, workingDir)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
be.NilErr(t, err)

kp, err := nkeys.CreateServer()
be.NilErr(t, err)

pubKey, err := kp.PublicKey()
be.NilErr(t, err)

ctlApi := CreateControlAPI(nc, slog.New(slog.NewTextHandler(os.Stdout, nil)), pubKey, NewMockNode())
as := CreateAgentSupervisor(models.NodeOptions{})

ctx := context.Background()
tk := testkit.New(ctx, t)
tk.Spawn(ctx, AgentSupervisorActorName, as)
tk.Spawn(ctx, ControlAPIActorName, ctlApi)
time.Sleep(250 * time.Millisecond)
defer tk.Shutdown(ctx)

t.Run("TestAuctionHandler", func(t *testing.T) {
req := gen.AuctionRequestJson{
AgentType: []gen.NexWorkload{"test"},
AuctionId: "foo",
Tags: gen.AuctionRequestJsonTags{
Tags: make(map[string]string),
},
}

reqB, err := json.Marshal(req)
be.NilErr(t, err)

respInbox := nats.NewInbox()
respStream := new(bytes.Buffer)
resp, err := nc.Subscribe(respInbox, func(m *nats.Msg) {
respStream.Write(m.Data)
})
be.NilErr(t, err)

msg := nats.NewMsg(models.AuctionRequestSubject("system"))
msg.Reply = respInbox
msg.Data = reqB
msg.Sub = resp
msg.Subject = models.AuctionRequestSubject("system")

ctlApi.handleAuction(msg)
time.Sleep(time.Second)

be.Nonzero(t, respStream.Bytes())
auctionReply := new(models.Envelope[gen.AuctionResponseJson])
be.NilErr(t, json.Unmarshal(respStream.Bytes(), auctionReply))

be.Equal(t, auctionReply.Data.BidderId, "abc123")
be.NilErr(t, resp.Unsubscribe())
})

t.Run("TestPingHandler", func(t *testing.T) {
respInbox := nats.NewInbox()
respStream := new(bytes.Buffer)
resp, err := nc.Subscribe(respInbox, func(m *nats.Msg) {
respStream.Write(m.Data)
})
be.NilErr(t, err)

msg := nats.NewMsg(models.PingSubject())
msg.Reply = respInbox
msg.Sub = resp
msg.Subject = models.PingSubject()

ctlApi.handlePing(msg)
time.Sleep(time.Second)

be.Nonzero(t, respStream.Bytes())
nodePingReply := new(models.Envelope[gen.NodePingResponseJson])
be.NilErr(t, json.Unmarshal(respStream.Bytes(), nodePingReply))
be.Equal(t, "NNODE123", nodePingReply.Data.NodeId)
be.Equal(t, "TESTNEXUS", nodePingReply.Data.Tags.Tags[models.TagNexus])
be.Equal(t, "0.0.0", nodePingReply.Data.Version)
be.Equal(t, "mock-node", nodePingReply.Data.Tags.Tags[models.TagNodeName])
})
}
Loading
Loading