diff --git a/api/nodecontrol/client_test.go b/api/nodecontrol/client_test.go index d042ef26..ce839280 100644 --- a/api/nodecontrol/client_test.go +++ b/api/nodecontrol/client_test.go @@ -5,123 +5,31 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" "log/slog" "os" - "os/exec" "path/filepath" - "runtime" "strconv" "testing" "time" "disorder.dev/shandler" "github.com/carlmjohnson/be" - "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" "github.com/synadia-io/nex/api/nodecontrol/gen" "github.com/synadia-io/nex/models" - "github.com/synadia-io/nex/node" + "github.com/synadia-io/nex/test" ) -const ( - Node1ServerSeed string = "SNAB2T3VG2363NDA2JK7NT5O3FN5VCXI2MYJHOPFO2NIDXQU6DIWQTBQC4" - Node1ServerPublicKey string = "NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT" - Node1XKeySeed string = "SXAOUP7RZFW5QPE2GDWTPABUDM5UIAK6BPULJPWZQAFFL2RZ5K3UYWHYY4" - Node1XkeyPublicKey string = "XAL54S5FE6SRPONXRNVE4ZDAOHOT44GFIY2ZW33DHLR2U3H2HJSXXRKY" -) - -func buildTestBinary(t testing.TB, binMain string, workingDir string) string { - t.Helper() - binPath := func() string { - binName := "test" - if runtime.GOOS == "windows" { - binName = "test.exe" - } - return filepath.Join(workingDir, binName) - }() - - if _, err := os.Stat(binPath); err == nil { - return binPath - } - - cmd := exec.Command("go", "build", "-o", binPath, binMain) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - be.NilErr(t, cmd.Run()) - - _, err := os.Stat(binPath) - be.NilErr(t, err) - - return binPath -} - -func startNatsServer(t testing.TB, workingDir string) *server.Server { - t.Helper() - - s := server.New(&server.Options{ - Port: -1, - JetStream: true, - StoreDir: workingDir, - }) - - s.Start() - - go s.WaitForShutdown() - - return s -} - -func startNexus(t testing.TB, ctx context.Context, logger *slog.Logger, workingDir, natsUrl string, numNodes int) { - t.Helper() - - nc, err := nats.Connect(natsUrl) - be.NilErr(t, err) - - for i := 0; i < numNodes; i++ { - var kp, xkp nkeys.KeyPair - if i == 0 { - kp, err = nkeys.FromSeed([]byte(Node1ServerSeed)) - be.NilErr(t, err) - xkp, err = nkeys.FromSeed([]byte(Node1XKeySeed)) - be.NilErr(t, err) - } else { - kp, err = nkeys.CreateServer() - be.NilErr(t, err) - xkp, err = nkeys.CreateCurveKeys() - be.NilErr(t, err) - } - nn, err := node.NewNexNode(kp, nc, - models.WithContext(ctx), - models.WithLogger(logger), - models.WithXKeyKeyPair(xkp), - models.WithNodeName(fmt.Sprintf("node-%d", i+1)), - models.WithNexus("testnexus"), - models.WithResourceDirectory(workingDir), - ) - be.NilErr(t, err) - - err = nn.Validate() - be.NilErr(t, err) - - go func() { - err = nn.Start() - be.NilErr(t, err) - }() - } -} - func TestAuction(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -135,7 +43,7 @@ func TestAuction(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -157,12 +65,12 @@ func TestAuction(t *testing.T) { func TestPing(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -176,7 +84,7 @@ func TestPing(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 5) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 5) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -193,12 +101,12 @@ func TestPing(t *testing.T) { func TestDirectPing(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -212,7 +120,7 @@ func TestDirectPing(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -221,20 +129,20 @@ func TestDirectPing(t *testing.T) { control, err := NewControlApiClient(nc, logger) be.NilErr(t, err) - resp, err := control.DirectPing(Node1ServerPublicKey) + resp, err := control.DirectPing(test.Node1ServerPublicKey) be.NilErr(t, err) - be.Equal(t, Node1ServerPublicKey, resp.NodeId) + be.Equal(t, test.Node1ServerPublicKey, resp.NodeId) } func TestAuctionDeployAndFindWorkload(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -248,7 +156,7 @@ func TestAuctionDeployAndFindWorkload(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -273,7 +181,7 @@ func TestAuctionDeployAndFindWorkload(t *testing.T) { encEnv, err := tAKey.Seal(envB, auctionResp[0].TargetXkey) be.NilErr(t, err) - binPath := buildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) + binPath := test.BuildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) resp, err := control.AuctionDeployWorkload(models.NodeSystemNamespace, auctionResp[0].BidderId, gen.StartWorkloadRequestJson{ Description: "Test Workload", @@ -304,12 +212,12 @@ func TestAuctionDeployAndFindWorkload(t *testing.T) { func TestDirectDeployAndListWorkloads(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -323,7 +231,7 @@ func TestDirectDeployAndListWorkloads(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -342,12 +250,12 @@ func TestDirectDeployAndListWorkloads(t *testing.T) { tAPub, err := tAKey.PublicKey() be.NilErr(t, err) - encEnv, err := tAKey.Seal(envB, Node1XkeyPublicKey) + encEnv, err := tAKey.Seal(envB, test.Node1XkeyPublicKey) be.NilErr(t, err) - binPath := buildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) + binPath := test.BuildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) - resp, err := control.DeployWorkload(models.NodeSystemNamespace, Node1ServerPublicKey, gen.StartWorkloadRequestJson{ + resp, err := control.DeployWorkload(models.NodeSystemNamespace, test.Node1ServerPublicKey, gen.StartWorkloadRequestJson{ Description: "Test Workload", Namespace: models.NodeSystemNamespace, RetryCount: 3, @@ -377,12 +285,12 @@ func TestDirectDeployAndListWorkloads(t *testing.T) { func TestUndeployWorkload(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -396,7 +304,7 @@ func TestUndeployWorkload(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -415,12 +323,12 @@ func TestUndeployWorkload(t *testing.T) { tAPub, err := tAKey.PublicKey() be.NilErr(t, err) - encEnv, err := tAKey.Seal(envB, Node1XkeyPublicKey) + encEnv, err := tAKey.Seal(envB, test.Node1XkeyPublicKey) be.NilErr(t, err) - binPath := buildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) + binPath := test.BuildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) - resp, err := control.DeployWorkload(models.NodeSystemNamespace, Node1ServerPublicKey, gen.StartWorkloadRequestJson{ + resp, err := control.DeployWorkload(models.NodeSystemNamespace, test.Node1ServerPublicKey, gen.StartWorkloadRequestJson{ Description: "Test Workload", Namespace: models.NodeSystemNamespace, RetryCount: 3, @@ -450,12 +358,12 @@ func TestUndeployWorkload(t *testing.T) { func TestGetNodeInfo(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -469,7 +377,7 @@ func TestGetNodeInfo(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -480,12 +388,12 @@ func TestGetNodeInfo(t *testing.T) { t.Fatal(err) } - resp, err := control.GetInfo(Node1ServerPublicKey, gen.NodeInfoRequestJson{ + resp, err := control.GetInfo(test.Node1ServerPublicKey, gen.NodeInfoRequestJson{ Namespace: models.NodeSystemNamespace, }) be.NilErr(t, err) - be.Equal(t, Node1ServerPublicKey, resp.NodeId) + be.Equal(t, test.Node1ServerPublicKey, resp.NodeId) be.Equal(t, "node-1", resp.Tags.Tags[models.TagNodeName]) be.Equal(t, "testnexus", resp.Tags.Tags[models.TagNexus]) be.Equal(t, 0, len(resp.WorkloadSummaries)) @@ -493,12 +401,12 @@ func TestGetNodeInfo(t *testing.T) { func TestSetLameduck(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -512,7 +420,7 @@ func TestSetLameduck(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -521,10 +429,10 @@ func TestSetLameduck(t *testing.T) { control, err := NewControlApiClient(nc, logger) be.NilErr(t, err) - _, err = control.SetLameDuck(Node1ServerPublicKey, time.Second*3) + _, err = control.SetLameDuck(test.Node1ServerPublicKey, time.Second*3) be.NilErr(t, err) - resp, err := control.GetInfo(Node1ServerPublicKey, gen.NodeInfoRequestJson{ + resp, err := control.GetInfo(test.Node1ServerPublicKey, gen.NodeInfoRequestJson{ Namespace: models.NodeSystemNamespace, }) be.NilErr(t, err) @@ -536,12 +444,12 @@ func TestSetLameduck(t *testing.T) { func TestCopyWorkload(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -555,7 +463,7 @@ func TestCopyWorkload(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 5) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 5) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) @@ -574,12 +482,12 @@ func TestCopyWorkload(t *testing.T) { tAPub, err := tAKey.PublicKey() be.NilErr(t, err) - encEnv, err := tAKey.Seal(envB, Node1XkeyPublicKey) + encEnv, err := tAKey.Seal(envB, test.Node1XkeyPublicKey) be.NilErr(t, err) - binPath := buildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) + binPath := test.BuildTestBinary(t, "../../test/testdata/forever/main.go", workingDir) - resp, err := control.DeployWorkload(models.NodeSystemNamespace, Node1ServerPublicKey, gen.StartWorkloadRequestJson{ + resp, err := control.DeployWorkload(models.NodeSystemNamespace, test.Node1ServerPublicKey, gen.StartWorkloadRequestJson{ Description: "Test Workload", Argv: []string{"--arg1", "value1"}, Namespace: models.NodeSystemNamespace, @@ -607,17 +515,17 @@ func TestCopyWorkload(t *testing.T) { be.Equal(t, "testworkload", cResp.WorkloadName) be.AllEqual(t, []string{"--arg1", "value1"}, cResp.Argv) - be.Equal(t, Node1XkeyPublicKey, cResp.EncEnvironment.EncryptedBy) + be.Equal(t, test.Node1XkeyPublicKey, cResp.EncEnvironment.EncryptedBy) } func TestMonitorEndpoints(t *testing.T) { workingDir := t.TempDir() - natsServer := startNatsServer(t, workingDir) + natsServer := test.StartNatsServer(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(func() { - os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+Node1ServerPublicKey)) + os.RemoveAll(filepath.Join(os.TempDir(), "inex-"+test.Node1ServerPublicKey)) cancel() natsServer.Shutdown() }) @@ -631,7 +539,7 @@ func TestMonitorEndpoints(t *testing.T) { shandler.WithStdErr(stderr), )) - startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + test.StartNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) time.Sleep(1000 * time.Millisecond) nc, err := nats.Connect(natsServer.ClientURL()) diff --git a/cmd/nex/go.sum b/cmd/nex/go.sum index cdcd1612..56e0127b 100644 --- a/cmd/nex/go.sum +++ b/cmd/nex/go.sum @@ -45,6 +45,8 @@ github.com/buraksezer/consistent v0.10.0 h1:hqBgz1PvNLC5rkWcEBVAL9dFMBWz6I0VgUCW github.com/buraksezer/consistent v0.10.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= github.com/buraksezer/olric v0.5.6-0.20240925183822-6ca0e20256e0 h1:0oLGcEeK3ew44mabFJ3nrAkg8G83RGM0FaXAIDrKSEQ= github.com/buraksezer/olric v0.5.6-0.20240925183822-6ca0e20256e0/go.mod h1:g9fss7jy1l9W7pPezICBxRAPWDwCiImcPi0ABF1o1R0= +github.com/carlmjohnson/be v0.23.2 h1:1QjPnPJhwGUjsD9+7h98EQlKsxnG5TV+nnEvk0wnkls= +github.com/carlmjohnson/be v0.23.2/go.mod h1:KAgPUh0HpzWYZZI+IABdo80wTgY43YhbdsiLYAaSI/Q= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/test/auction_workload_start_test.go b/test/auction_workload_start_test.go index 9ba51a21..987bbfc2 100644 --- a/test/auction_workload_start_test.go +++ b/test/auction_workload_start_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/carlmjohnson/be" "github.com/synadia-io/nex/api/nodecontrol/gen" "github.com/synadia-io/nex/models" ) @@ -16,50 +17,27 @@ import ( func TestAuctionDeploy(t *testing.T) { workingDir := t.TempDir() - binPath, err := buildTestBinary(t, "./testdata/direct_start/main.go", workingDir) - if err != nil { - t.Fatal(err) - } + binPath := BuildTestBinary(t, "./testdata/direct_start/main.go", workingDir) - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) - nex1, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") - if err != nil { - t.Fatal(err) - } + nex1 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") nex1.Stdout = stdout nex1.Stderr = stderr - // nex1.Stdout = os.Stdout - // nex1.Stderr = os.Stderr nex1.SysProcAttr = sysProcAttr() - nex2, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus") - if err != nil { - t.Fatal(err) - } + nex2 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus") nex2.SysProcAttr = sysProcAttr() - // nex2.Stdout = os.Stdout - // nex2.Stderr = os.Stderr - err = nex1.Start() - if err != nil { - t.Fatal(err) - } + err := nex1.Start() + be.NilErr(t, err) err = nex2.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) go func() { time.Sleep(500 * time.Millisecond) @@ -69,10 +47,8 @@ func TestAuctionDeploy(t *testing.T) { // find unused port listener, err := net.Listen("tcp", ":0") - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) + port := listener.Addr().(*net.TCPAddr).Port listener.Close() @@ -80,9 +56,7 @@ func TestAuctionDeploy(t *testing.T) { auctionDeploy.Stdout = stdout auctionDeploy.Stderr = stderr err = auctionDeploy.Run() - if err != nil { - t.Error(err) - } + be.NilErr(t, err) stdout = new(bytes.Buffer) stderr = new(bytes.Buffer) @@ -90,28 +64,18 @@ func TestAuctionDeploy(t *testing.T) { nodels.Stdout = stdout nodels.Stderr = stderr err = nodels.Run() - if err != nil { - t.Error(err) - } + be.NilErr(t, err) lsout := []*gen.NodePingResponseJson{} err = json.Unmarshal(stdout.Bytes(), &lsout) - if err != nil { - t.Log(stdout.String()) - t.Log(stderr.String()) - t.Error(err) - } + be.NilErr(t, err) for _, n := range lsout { switch n.Tags.Tags[models.TagNodeName] { case "node1": - if n.RunningAgents.Status["direct-start"] != 1 { - t.Error("node1 does not have expected workload running") - } + be.Equal(t, 1, n.RunningAgents.Status["direct-start"]) case "node2": - if n.RunningAgents.Status["direct-start"] != 0 { - t.Error("node2 has unexpected workloads running") - } + be.Equal(t, 0, n.RunningAgents.Status["direct-start"]) default: t.Log(stdout.String()) t.Error("this should never happen") @@ -119,25 +83,15 @@ func TestAuctionDeploy(t *testing.T) { } err = stopProcess(nex1.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, err) err = stopProcess(nex2.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, err) }() err = nex1.Wait() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) err = nex2.Wait() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) - if !bytes.Contains(stdout.Bytes(), []byte("ENV: nexenvset")) { - t.Error("Expected ENV Data missing | stdout:", stdout.String()) - } + be.In(t, "ENV: nexenvset", stdout.Bytes()) } diff --git a/test/copy_workload_test.go b/test/copy_workload_test.go index 2f878ab7..d06c5612 100644 --- a/test/copy_workload_test.go +++ b/test/copy_workload_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/carlmjohnson/be" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" "github.com/synadia-io/nex/api/nodecontrol/gen" @@ -20,62 +21,33 @@ import ( func TestCopyWorkload(t *testing.T) { workingDir := t.TempDir() - binPath, err := buildTestBinary(t, "./testdata/nats_micro/main.go", workingDir) - if err != nil { - t.Fatal(err) - } + binPath := BuildTestBinary(t, "./testdata/nats_micro/main.go", workingDir) - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() - nex1, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") - if err != nil { - t.Fatal(err) - } + nex1 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") nex1.SysProcAttr = sysProcAttr() n2KP, err := nkeys.CreateServer() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) n2KPSeed, err := n2KP.Seed() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) n2KPPub, err := n2KP.PublicKey() - if err != nil { - t.Fatal(err) - } - - nex2, err := startNexNodeCmd(t, workingDir, string(n2KPSeed), "", s.ClientURL(), "node2", "nexus") - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) + + nex2 := startNexNodeCmd(t, workingDir, string(n2KPSeed), "", s.ClientURL(), "node2", "nexus") nex2.SysProcAttr = sysProcAttr() - err = nex1.Start() - if err != nil { - t.Fatal(err) - } - err = nex2.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, nex1.Start()) + be.NilErr(t, nex2.Start()) nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) go func() { time.Sleep(500 * time.Millisecond) @@ -84,38 +56,23 @@ func TestCopyWorkload(t *testing.T) { origDeploy := exec.Command(nexCli, "workload", "run", "-s", s.ClientURL(), "--name", "tester", fmt.Sprintf("--node-tags=%s=%s", models.TagNodeName, "node1"), "file://"+binPath, fmt.Sprintf("--env=NATS_URL=%s", s.ClientURL())) origDeploy.Stdout = origStdOut err = origDeploy.Run() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) time.Sleep(500 * time.Millisecond) msg, err := nc.Request("health", nil, time.Second) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) - if string(msg.Data) != "ok" { - t.Errorf("expected ok, got %s", string(msg.Data)) - } + be.Equal(t, "ok", string(msg.Data)) re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String())) - if len(match) != 2 { - t.Error("tester workload failed to start: ", origStdOut.String()) - return - } + be.Equal(t, 2, len(match)) origWorkloadId := match[1] copyStdOut := new(bytes.Buffer) copyDeploy := exec.Command(nexCli, "workload", "copy", "-s", s.ClientURL(), origWorkloadId, fmt.Sprintf("--node-tags=%s=%s", models.TagNodeName, "node2")) copyDeploy.Stdout = copyStdOut - err = copyDeploy.Run() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, copyDeploy.Run()) time.Sleep(500 * time.Millisecond) node2InfoStdOut := new(bytes.Buffer) @@ -123,102 +80,49 @@ func TestCopyWorkload(t *testing.T) { node2Info := exec.Command(nexCli, "node", "info", "-s", s.ClientURL(), "--json", n2KPPub) node2Info.Stdout = node2InfoStdOut node2Info.Stderr = node2InfoStdErr - err = node2Info.Run() - if err != nil { - t.Log("stdout: ", node2InfoStdOut.String()) - t.Log("stderr: ", node2InfoStdErr.String()) - t.Error(err) - return - } + be.NilErr(t, node2Info.Run()) resp := new(gen.NodeInfoResponseJson) - err = json.Unmarshal(node2InfoStdOut.Bytes(), resp) - if err != nil { - t.Error(err) - return - } - - if len(resp.WorkloadSummaries) != 1 { - t.Error("expected 1 workload, got ", len(resp.WorkloadSummaries)) - return - } - - err = stopProcess(nex1.Process) - if err != nil { - t.Error(err) - } - err = stopProcess(nex2.Process) - if err != nil { - t.Error(err) - } - }() + be.NilErr(t, json.Unmarshal(node2InfoStdOut.Bytes(), resp)) - err = nex1.Wait() - if err != nil { - t.Fatal(err) - } - err = nex2.Wait() - if err != nil { - t.Fatal(err) - } + be.Equal(t, 1, len(resp.WorkloadSummaries)) + be.NilErr(t, stopProcess(nex1.Process)) + be.NilErr(t, stopProcess(nex2.Process)) + }() + + be.NilErr(t, nex1.Wait()) + be.NilErr(t, nex2.Wait()) } func TestMultipleCopyWorkload(t *testing.T) { workingDir := t.TempDir() - binPath, err := buildTestBinary(t, "./testdata/forever/main.go", workingDir) - if err != nil { - t.Fatal(err) - } + binPath := BuildTestBinary(t, "./testdata/forever/main.go", workingDir) - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() n1KP, err := nkeys.CreateServer() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) n1KPSeed, err := n1KP.Seed() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) n1KPPub, err := n1KP.PublicKey() - if err != nil { - t.Fatal(err) - } - - nex1, err := startNexNodeCmd(t, workingDir, string(n1KPSeed), "", s.ClientURL(), "node1", "nexus") - if err != nil { - t.Fatal(err) - } - nex1.SysProcAttr = sysProcAttr() - // nex1.Stdout = os.Stdout - // nex1.Stderr = os.Stderr + be.NilErr(t, err) - err = nex1.Start() - if err != nil { - t.Fatal(err) - } + nex1 := startNexNodeCmd(t, workingDir, string(n1KPSeed), "", s.ClientURL(), "node1", "nexus") + nex1.SysProcAttr = sysProcAttr() + be.NilErr(t, nex1.Start()) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) go func() { <-ctx.Done() - err = stopProcess(nex1.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(nex1.Process)) }() passed := false @@ -228,76 +132,40 @@ func TestMultipleCopyWorkload(t *testing.T) { origStdOut := new(bytes.Buffer) origDeploy := exec.Command(nexCli, "workload", "run", "-s", s.ClientURL(), "--name", "tester", "file://"+binPath) origDeploy.Stdout = origStdOut - err = origDeploy.Run() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, origDeploy.Run()) re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String())) - if len(match) != 2 { - t.Error("tester workload failed to start: ", origStdOut.String()) - return - } + be.Equal(t, 2, len(match)) origWorkloadId := match[1] copyStdOut := new(bytes.Buffer) copyDeploy := exec.Command(nexCli, "workload", "copy", "-s", s.ClientURL(), origWorkloadId) copyDeploy.Stdout = copyStdOut - err = copyDeploy.Run() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, copyDeploy.Run()) copyStdOut = new(bytes.Buffer) copyDeploy = exec.Command(nexCli, "workload", "copy", "-s", s.ClientURL(), origWorkloadId) copyDeploy.Stdout = copyStdOut - // copyDeploy.Stdout = os.Stdout - // copyDeploy.Stderr = os.Stderr - err = copyDeploy.Run() - if err != nil { - t.Error(err) - return - } - + be.NilErr(t, copyDeploy.Run()) time.Sleep(500 * time.Millisecond) + node2InfoStdOut := new(bytes.Buffer) node2InfoStdErr := new(bytes.Buffer) node2Info := exec.Command(nexCli, "node", "info", "-s", s.ClientURL(), "--json", n1KPPub) node2Info.Stdout = node2InfoStdOut node2Info.Stderr = node2InfoStdErr - err = node2Info.Run() - if err != nil { - t.Log("stdout: ", node2InfoStdOut.String()) - t.Log("stderr: ", node2InfoStdErr.String()) - t.Error(err) - return - } + be.NilErr(t, node2Info.Run()) resp := new(gen.NodeInfoResponseJson) - err = json.Unmarshal(node2InfoStdOut.Bytes(), resp) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, json.Unmarshal(node2InfoStdOut.Bytes(), resp)) - if len(resp.WorkloadSummaries) != 3 { - t.Error("expected 3 workload, got ", len(resp.WorkloadSummaries)) - return - } + be.Equal(t, 3, len(resp.WorkloadSummaries)) passed = true cancel() }() - err = nex1.Wait() - if err != nil { - t.Fatal(err) - } - - if !passed { - t.Fatal("failed to multi clone workload") - } + be.NilErr(t, nex1.Wait()) + be.True(t, passed) } diff --git a/test/helpers.go b/test/helpers.go new file mode 100644 index 00000000..4a510929 --- /dev/null +++ b/test/helpers.go @@ -0,0 +1,167 @@ +package test + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/exec" + "path/filepath" + "runtime" + "testing" + + "github.com/carlmjohnson/be" + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" + "github.com/synadia-io/nex/models" + "github.com/synadia-io/nex/node" +) + +const ( + Node1ServerSeed string = "SNAB2T3VG2363NDA2JK7NT5O3FN5VCXI2MYJHOPFO2NIDXQU6DIWQTBQC4" + Node1ServerPublicKey string = "NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT" + Node1XKeySeed string = "SXAOUP7RZFW5QPE2GDWTPABUDM5UIAK6BPULJPWZQAFFL2RZ5K3UYWHYY4" + Node1XkeyPublicKey string = "XAL54S5FE6SRPONXRNVE4ZDAOHOT44GFIY2ZW33DHLR2U3H2HJSXXRKY" +) + +func BuildTestBinary(t testing.TB, binMain string, workingDir string) string { + t.Helper() + binPath := func() string { + binName := "test" + if runtime.GOOS == "windows" { + binName = "test.exe" + } + return filepath.Join(workingDir, binName) + }() + + if _, err := os.Stat(binPath); err == nil { + return binPath + } + + cmd := exec.Command("go", "build", "-o", binPath, binMain) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + be.NilErr(t, cmd.Run()) + + _, err := os.Stat(binPath) + be.NilErr(t, err) + + return binPath +} + +func StartNatsServer(t testing.TB, workingDir string) *server.Server { + t.Helper() + + s := server.New(&server.Options{ + Port: -1, + JetStream: true, + StoreDir: workingDir, + }) + + s.Start() + + go s.WaitForShutdown() + + return s +} + +func StartNexus(t testing.TB, ctx context.Context, logger *slog.Logger, workingDir, natsUrl string, numNodes int) { + t.Helper() + + nc, err := nats.Connect(natsUrl) + be.NilErr(t, err) + + for i := 0; i < numNodes; i++ { + var kp, xkp nkeys.KeyPair + if i == 0 { + kp, err = nkeys.FromSeed([]byte(Node1ServerSeed)) + be.NilErr(t, err) + xkp, err = nkeys.FromSeed([]byte(Node1XKeySeed)) + be.NilErr(t, err) + } else { + kp, err = nkeys.CreateServer() + be.NilErr(t, err) + xkp, err = nkeys.CreateCurveKeys() + be.NilErr(t, err) + } + nn, err := node.NewNexNode(kp, nc, + models.WithContext(ctx), + models.WithLogger(logger), + models.WithXKeyKeyPair(xkp), + models.WithNodeName(fmt.Sprintf("node-%d", i+1)), + models.WithNexus("testnexus"), + models.WithResourceDirectory(workingDir), + ) + be.NilErr(t, err) + + err = nn.Validate() + be.NilErr(t, err) + + go func() { + err = nn.Start() + be.NilErr(t, err) + }() + } +} + +// following helpers are for e2e tests in the test package only + +func buildNexCli(t testing.TB, workingDir string) string { + t.Helper() + + if _, err := os.Stat(filepath.Join(workingDir, "nex")); err == nil { + return filepath.Join(workingDir, "nex") + } + + cwd, err := os.Getwd() + be.NilErr(t, err) + + err = os.Chdir("../cmd/nex") + be.NilErr(t, err) + + cmd := exec.Command("go", "build", "-o", workingDir) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + err = cmd.Start() + be.NilErr(t, err) + + err = cmd.Wait() + be.NilErr(t, err) + + err = os.Chdir(cwd) + be.NilErr(t, err) + + return filepath.Join(workingDir, "nex") +} + +func startNexNodeCmd(t testing.TB, workingDir, nodeSeed, xkeySeed, natsServer, name, nexus string) *exec.Cmd { + t.Helper() + + cli := buildNexCli(t, workingDir) + + if nodeSeed == "" { + kp, err := nkeys.CreateServer() + be.NilErr(t, err) + + s, err := kp.Seed() + be.NilErr(t, err) + + nodeSeed = string(s) + } + + if xkeySeed == "" { + xkp, err := nkeys.CreateCurveKeys() + be.NilErr(t, err) + + xSeed, err := xkp.Seed() + be.NilErr(t, err) + + xkeySeed = string(xSeed) + } + + cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed) + return cmd +} diff --git a/test/lameduck_test.go b/test/lameduck_test.go index d0b7c866..c65091c8 100644 --- a/test/lameduck_test.go +++ b/test/lameduck_test.go @@ -6,82 +6,49 @@ import ( "testing" "time" + "github.com/carlmjohnson/be" "github.com/nats-io/nkeys" ) func TestLameDuckMode(t *testing.T) { workingDir := t.TempDir() - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() kp, err := nkeys.CreateServer() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) seed, err := kp.Seed() - if err != nil { - t.Fatal(err) - } - - cmd, err := startNexNodeCmd(t, workingDir, string(seed), "", s.ClientURL(), "node", "nexus") - if err != nil { - t.Fatal(err) - } - cmd.SysProcAttr = sysProcAttr() + be.NilErr(t, err) stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) + cmd := startNexNodeCmd(t, workingDir, string(seed), "", s.ClientURL(), "node", "nexus") + cmd.SysProcAttr = sysProcAttr() cmd.Stdout = stdout cmd.Stderr = stderr - err = cmd.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, cmd.Start()) time.Sleep(500 * time.Millisecond) ldStdout := new(bytes.Buffer) go func() { pub, err := kp.PublicKey() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) cmd := exec.Command(nexCli, "node", "lameduck", "-s", s.ClientURL(), "--node-id", pub) cmd.Stdout = ldStdout - err = cmd.Run() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, cmd.Run()) }() - err = cmd.Wait() - if err != nil { - t.Fatal(err) - } - - if len(stderr.Bytes()) > 0 { - t.Errorf("expected no output from stderr: %s", stderr.String()) - } + be.NilErr(t, cmd.Wait()) - if !bytes.Contains(ldStdout.Bytes(), []byte("is now in lameduck mode. Workloads will begin stopping gracefully.")) { - t.Errorf("expected lameduck message, got: %s", ldStdout.String()) - } + be.Equal(t, 0, len(stderr.Bytes())) - if !bytes.Contains(stdout.Bytes(), []byte("Received lame duck request")) || - !bytes.Contains(stdout.Bytes(), []byte("Shutting down nexnode")) { - t.Errorf("expected node logs not present, got: %s", stdout.String()) - } + be.In(t, "is now in lameduck mode. Workloads will begin stopping gracefully.", ldStdout.String()) + be.In(t, "Received lame duck request", stdout.String()) + be.In(t, "Shutting down nexnode", stdout.String()) } diff --git a/test/node_test.go b/test/node_test.go index 5357883a..890df356 100644 --- a/test/node_test.go +++ b/test/node_test.go @@ -4,131 +4,28 @@ import ( "bytes" "context" "encoding/json" - "os" "os/exec" - "path/filepath" "testing" "time" - "github.com/nats-io/nats-server/v2/server" - "github.com/nats-io/nkeys" + "github.com/carlmjohnson/be" "github.com/synadia-io/nex/api/nodecontrol/gen" ) -func buildNexCli(t testing.TB, workingDir string) (string, error) { - t.Helper() - - if _, err := os.Stat(filepath.Join(workingDir, "nex")); err == nil { - return filepath.Join(workingDir, "nex"), nil - } - - cwd, err := os.Getwd() - if err != nil { - return "", err - } - - err = os.Chdir("../cmd/nex") - if err != nil { - return "", err - } - - cmd := exec.Command("go", "build", "-o", workingDir) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err = cmd.Start() - if err != nil { - return "", err - } - - err = cmd.Wait() - if err != nil { - return "", err - } - - err = os.Chdir(cwd) - if err != nil { - return "", err - } - - return filepath.Join(workingDir, "nex"), nil -} - -func startNatsServer(t testing.TB, workingDir string) (*server.Server, error) { - t.Helper() - - s := server.New(&server.Options{ - Port: -1, - JetStream: true, - StoreDir: workingDir, - }) - - s.Start() - - go s.WaitForShutdown() - - return s, nil -} - -func startNexNodeCmd(t testing.TB, workingDir, nodeSeed, xkeySeed, natsServer, name, nexus string) (*exec.Cmd, error) { - t.Helper() - - cli, err := buildNexCli(t, workingDir) - if err != nil { - return nil, err - } - - if nodeSeed == "" { - kp, err := nkeys.CreateServer() - if err != nil { - return nil, err - } - s, err := kp.Seed() - if err != nil { - return nil, err - } - nodeSeed = string(s) - } - - if xkeySeed == "" { - xkp, err := nkeys.CreateCurveKeys() - if err != nil { - return nil, err - } - xSeed, err := xkp.Seed() - if err != nil { - return nil, err - } - xkeySeed = string(xSeed) - } - - cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed) - return cmd, nil -} - func TestStartNode(t *testing.T) { workingDir := t.TempDir() - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } - - cmd, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node", "nexus") - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() + cmd := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node", "nexus") + stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) cmd.Stdout = stdout cmd.Stderr = stderr cmd.SysProcAttr = sysProcAttr() - err = cmd.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, cmd.Start()) passed := false go func() { @@ -138,10 +35,7 @@ func TestStartNode(t *testing.T) { for { select { case <-ctx.Done(): - err = stopProcess(cmd.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(cmd.Process)) return case <-ticker.C: if bytes.Contains(stdout.Bytes(), []byte("NATS execution engine awaiting commands")) { @@ -153,68 +47,38 @@ func TestStartNode(t *testing.T) { } }() - err = cmd.Wait() - if err != nil { - t.Log(stdout.String()) - t.Log(stderr.String()) - t.Fatal(err) - } - - if !passed { - t.Fatal("Nex Node did not start") - } + be.NilErr(t, cmd.Wait()) + be.True(t, passed) } func TestStartNexus(t *testing.T) { workingDir := t.TempDir() - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() - nex1, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus3node") - if err != nil { - t.Fatal(err) - } + nex1 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus3node") nex1.SysProcAttr = sysProcAttr() - nex2, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus3node") - if err != nil { - t.Fatal(err) - } + nex2 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus3node") nex2.SysProcAttr = sysProcAttr() - nex3, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node3", "nexus3node") - if err != nil { - t.Fatal(err) - } + nex3 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node3", "nexus3node") nex3.SysProcAttr = sysProcAttr() - err = nex1.Start() - if err != nil { - t.Fatal(err) - } - err = nex2.Start() - if err != nil { - t.Fatal(err) - } - err = nex3.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, nex1.Start()) + be.NilErr(t, nex2.Start()) + be.NilErr(t, nex3.Start()) passed := false go func() { time.Sleep(time.Millisecond * 500) - nexPath, err := buildNexCli(t, workingDir) - if err != nil { - t.Error(err) - } + nexPath := buildNexCli(t, workingDir) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + for { stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) @@ -224,42 +88,20 @@ func TestStartNexus(t *testing.T) { select { case <-ctx.Done(): - err = stopProcess(nex1.Process) - if err != nil { - t.Error(err) - } - err = stopProcess(nex2.Process) - if err != nil { - t.Error(err) - } - err = stopProcess(nex3.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(nex1.Process)) + be.NilErr(t, stopProcess(nex2.Process)) + be.NilErr(t, stopProcess(nex3.Process)) return case <-ticker.C: - err := nodels.Run() - if err != nil { - t.Error(err) - ticker.Stop() - cancel() - } + be.NilErr(t, nodels.Run()) + be.Equal(t, 0, len(stderr.Bytes())) - if len(stderr.Bytes()) != 0 { - t.Log("stderr:", stderr.String()) - cancel() - } if len(stdout.Bytes()) == 0 { continue } resp := []*gen.NodePingResponseJson{} - err = json.Unmarshal(stdout.Bytes(), &resp) - if err != nil { - t.Error() - ticker.Stop() - cancel() - } + be.NilErr(t, json.Unmarshal(stdout.Bytes(), &resp)) if len(resp) == 3 { passed = true @@ -270,20 +112,8 @@ func TestStartNexus(t *testing.T) { } }() - err = nex1.Wait() - if err != nil { - t.Fatal(err) - } - err = nex2.Wait() - if err != nil { - t.Fatal(err) - } - err = nex3.Wait() - if err != nil { - t.Fatal(err) - } - - if !passed { - t.Fatal("Three Nex Nodes did not start") - } + be.NilErr(t, nex1.Wait()) + be.NilErr(t, nex2.Wait()) + be.NilErr(t, nex3.Wait()) + be.True(t, passed) } diff --git a/test/workload_test.go b/test/workload_test.go index b1bb2aef..ca438a88 100644 --- a/test/workload_test.go +++ b/test/workload_test.go @@ -8,107 +8,49 @@ import ( "io" "net" "net/http" - "os" "os/exec" - "path/filepath" "regexp" - "runtime" "strings" "testing" "time" + "github.com/carlmjohnson/be" "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" "github.com/synadia-io/nex/api/nodecontrol/gen" "github.com/synadia-io/nex/models" ) -func buildTestBinary(t testing.TB, binMain string, workingDir string) (string, error) { - t.Helper() - binName := func() string { - if runtime.GOOS == "windows" { - return "test.exe" - } - return "test" - } - - if _, err := os.Stat(filepath.Join(workingDir, binName())); err == nil { - return filepath.Join(workingDir, binName()), nil - } - - cmd := exec.Command("go", "build", "-o", filepath.Join(workingDir, binName()), binMain) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Run() - if err != nil { - return "", err - } - - if _, err := os.Stat(filepath.Join(workingDir, binName())); err != nil { - return "", err - } - - return filepath.Join(workingDir, binName()), nil -} - func TestDirectStartService(t *testing.T) { workingDir := t.TempDir() - binPath, err := buildTestBinary(t, "./testdata/direct_start/main.go", workingDir) - if err != nil { - t.Fatal(err) - } + binPath := BuildTestBinary(t, "./testdata/direct_start/main.go", workingDir) - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() kp, err := nkeys.CreateServer() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) seed, err := kp.Seed() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, err) xkey, err := nkeys.CreateCurveKeys() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) xSeed, err := xkey.Seed() - if err != nil { - t.Error(err) - return - } - - cmd, err := startNexNodeCmd(t, workingDir, string(seed), string(xSeed), s.ClientURL(), "node", "nexus") - if err != nil { - t.Fatal(err) - } - cmd.SysProcAttr = sysProcAttr() + be.NilErr(t, err) + stdout := new(bytes.Buffer) stderr := new(bytes.Buffer) + cmd := startNexNodeCmd(t, workingDir, string(seed), string(xSeed), s.ClientURL(), "node", "nexus") + cmd.SysProcAttr = sysProcAttr() cmd.Stdout = stdout cmd.Stderr = stderr - // cmd.Stdout = os.Stdout - // cmd.Stderr = os.Stderr - err = cmd.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, cmd.Start()) time.Sleep(500 * time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -116,125 +58,68 @@ func TestDirectStartService(t *testing.T) { go func() { <-ctx.Done() - err = stopProcess(cmd.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(cmd.Process)) }() go func() { pub, err := kp.PublicKey() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) xkey_pub, err := xkey.PublicKey() - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) // find unused port listener, err := net.Listen("tcp", ":0") - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) port := listener.Addr().(*net.TCPAddr).Port listener.Close() - cmd := exec.Command(nexCli, "workload", "run", "-s", s.ClientURL(), "--name", "tester", "file://"+binPath, "--node-id", pub, "--node-xkey-pub", xkey_pub, fmt.Sprintf("--argv=-port=%d", port), fmt.Sprintf("--env=ENV_TEST=%s", "derp")) cmdstdout := new(bytes.Buffer) cmdstderr := new(bytes.Buffer) + cmd := exec.Command(nexCli, "workload", "run", "-s", s.ClientURL(), "--name", "tester", "file://"+binPath, "--node-id", pub, "--node-xkey-pub", xkey_pub, fmt.Sprintf("--argv=-port=%d", port), fmt.Sprintf("--env=ENV_TEST=%s", "derp")) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } + be.NilErr(t, cmd.Run()) - if len(cmdstderr.Bytes()) > 0 { - t.Error("stderr:", cmdstderr.String()) - return - } + be.Equal(t, 0, len(cmdstderr.Bytes())) re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started on node (?P[A-Z0-9]+)$`) match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String())) - if len(match) != 3 { - t.Error("tester workload failed to start: ", cmdstdout.String()) - return - } + be.Equal(t, 3, len(match)) + be.Equal(t, pub, match[2]) - if pub != match[2] { - t.Error("expected node id", pub, "got", match[2]) - } - - time.Sleep(5000 * time.Millisecond) + time.Sleep(500 * time.Millisecond) resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d", port)) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) defer resp.Body.Close() body, err := io.ReadAll(resp.Body) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) - if string(body) != "passing" { - t.Error("expected passing, got", string(body)) - } + be.True(t, strings.Contains(string(body), "passing")) - // TODO: stop workload cancel() }() - err = cmd.Wait() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, cmd.Wait()) // This ensures that the environment was decrypted and injected into workload correctly - if !bytes.Contains(stdout.Bytes(), []byte("ENV: derp")) { - t.Error("Expected ENV Data missing | stdout:", stdout.String()) - } + be.In(t, "ENV: derp", stdout.String()) } func TestDirectStartFunction(t *testing.T) { workingDir := t.TempDir() - binPath, err := buildTestBinary(t, "./testdata/function/main.go", workingDir) - if err != nil { - t.Fatal(err) - } + binPath := BuildTestBinary(t, "./testdata/function/main.go", workingDir) - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() - cmd, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node", "nexus") - if err != nil { - t.Fatal(err) - } + cmd := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node", "nexus") cmd.SysProcAttr = sysProcAttr() - // cmd.Stdout = os.Stdout - // cmd.Stderr = os.Stderr - err = cmd.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, cmd.Start()) time.Sleep(500 * time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -242,10 +127,7 @@ func TestDirectStartFunction(t *testing.T) { go func() { <-ctx.Done() - err = stopProcess(cmd.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(cmd.Process)) }() passed := false @@ -256,78 +138,44 @@ func TestDirectStartFunction(t *testing.T) { cmdstderr := new(bytes.Buffer) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } - + be.NilErr(t, cmd.Run()) time.Sleep(500 * time.Millisecond) - if len(cmdstderr.Bytes()) > 0 { - t.Error("stderr:", cmdstderr.String()) - return - } + be.Equal(t, 0, len(cmdstderr.Bytes())) re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String())) - if len(match) != 2 { - t.Error("tester workload failed to start: ", cmdstdout.String()) - return - } - + be.Equal(t, 2, len(match)) time.Sleep(500 * time.Millisecond) + cmd = exec.Command(nexCli, "workload", "info", "-s", s.ClientURL(), match[1], "--json") cmdstdout = new(bytes.Buffer) cmdstderr = new(bytes.Buffer) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } + be.NilErr(t, cmd.Run()) time.Sleep(500 * time.Millisecond) var resp gen.WorkloadPingResponseJson - err = json.Unmarshal(cmdstdout.Bytes(), &resp) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, json.Unmarshal(cmdstdout.Bytes(), &resp)) - if resp.WorkloadSummary.WorkloadState != models.WorkloadStateWarm { - t.Logf("ERROR: workload state: %s; expected %s", resp.WorkloadSummary.WorkloadState, models.WorkloadStateWarm) - t.Error(err) - return - } + be.Equal(t, models.WorkloadStateWarm, resp.WorkloadSummary.WorkloadState) nc, err := nats.Connect(s.ClientURL()) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) defer nc.Close() workloadId := match[1] sub, err := nc.Subscribe("$NEX.logs.system."+workloadId+".stdout", func(msg *nats.Msg) { triggerLogs.Write(msg.Data) }) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) defer func() { _ = sub.Drain() }() err = nc.Publish("test", []byte("test data 123")) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) time.Sleep(time.Second) cmd = exec.Command(nexCli, "workload", "info", "-s", s.ClientURL(), match[1], "--json") @@ -335,25 +183,13 @@ func TestDirectStartFunction(t *testing.T) { cmdstderr = new(bytes.Buffer) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } + be.NilErr(t, cmd.Run()) time.Sleep(500 * time.Millisecond) - err = json.Unmarshal(cmdstdout.Bytes(), &resp) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, json.Unmarshal(cmdstdout.Bytes(), &resp)) dur, err := time.ParseDuration(resp.WorkloadSummary.Runtime) - if err != nil { - t.Error(err) - return - } + be.NilErr(t, err) if dur > 500*time.Millisecond && dur < 1*time.Second { passed = true @@ -361,67 +197,32 @@ func TestDirectStartFunction(t *testing.T) { t.Log("Job runtime: ", resp.WorkloadSummary.Runtime) } - // TODO: stop workload cancel() }() - err = cmd.Wait() - if err != nil { - t.Fatal(err) - } - - if !passed { - t.Fatal("expected workload to run for 500ms - 1s") - } - - if !bytes.Contains(triggerLogs.Bytes(), []byte("test data 123")) { - t.Error("expected 'test data 123' to be consumed by workload") - } + be.NilErr(t, cmd.Wait()) + be.True(t, passed) + be.In(t, "test data 123", triggerLogs.String()) } func TestDirectStop(t *testing.T) { workingDir := t.TempDir() - binPath, err := buildTestBinary(t, "./testdata/forever/main.go", workingDir) - if err != nil { - t.Fatal(err) - } + binPath := BuildTestBinary(t, "./testdata/forever/main.go", workingDir) - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() - nex1, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") - if err != nil { - t.Fatal(err) - } + nex1 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") nex1.SysProcAttr = sysProcAttr() - // nex1.Stdout = os.Stdout - // nex1.Stderr = os.Stderr - nex2, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus") - if err != nil { - t.Fatal(err) - } + nex2 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus") nex2.SysProcAttr = sysProcAttr() - // nex2.Stdout = os.Stdout - // nex2.Stderr = os.Stderr - - err = nex1.Start() - if err != nil { - t.Fatal(err) - } - err = nex2.Start() - if err != nil { - t.Fatal(err) - } + + be.NilErr(t, nex1.Start()) + be.NilErr(t, nex2.Start()) time.Sleep(500 * time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -429,41 +230,23 @@ func TestDirectStop(t *testing.T) { go func() { <-ctx.Done() - err = stopProcess(nex1.Process) - if err != nil { - t.Error(err) - } - err = stopProcess(nex2.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(nex1.Process)) + be.NilErr(t, stopProcess(nex2.Process)) }() - passed := false go func() { cmd := exec.Command(nexCli, "workload", "run", "-s", s.ClientURL(), "--name", "tester", "file://"+binPath) cmdstdout := new(bytes.Buffer) cmdstderr := new(bytes.Buffer) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } + be.NilErr(t, cmd.Run()) - if len(cmdstderr.Bytes()) > 0 { - t.Error("stderr:", cmdstderr.String()) - return - } + be.Equal(t, 0, len(cmdstderr.Bytes())) re := regexp.MustCompile(`^Workload tester \[(?P[A-Za-z0-9]+)\] started$`) match := re.FindStringSubmatch(strings.TrimSpace(cmdstdout.String())) - if len(match) != 2 { - t.Error("tester workload failed to start: ", cmdstdout.String()) - return - } + be.Equal(t, 2, len(match)) time.Sleep(500 * time.Millisecond) workloadID := match[1] @@ -473,65 +256,28 @@ func TestDirectStop(t *testing.T) { cmdstderr = new(bytes.Buffer) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } - - if bytes.Contains(cmdstdout.Bytes(), []byte("Workload "+workloadID+" stopped")) { - passed = true - } else { - t.Log("cmdstdout:", cmdstdout.String()) - if len(cmdstderr.Bytes()) > 0 { - t.Error("stderr:", cmdstderr.String()) - } - } + be.NilErr(t, cmd.Run()) + be.In(t, "Workload "+workloadID+" stopped", cmdstdout.String()) cancel() }() - err = nex1.Wait() - if err != nil { - t.Fatal(err) - } - err = nex2.Wait() - if err != nil { - t.Fatal(err) - } - - if !passed { - t.Fatal("expected workload to stop") - } + be.NilErr(t, nex1.Wait()) + be.NilErr(t, nex2.Wait()) } func TestDirectStopNoWorkload(t *testing.T) { workingDir := t.TempDir() - nexCli, err := buildNexCli(t, workingDir) - if err != nil { - t.Fatal(err) - } + nexCli := buildNexCli(t, workingDir) - s, err := startNatsServer(t, workingDir) - if err != nil { - t.Fatal(err) - } + s := StartNatsServer(t, workingDir) defer s.Shutdown() - nex1, err := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") - if err != nil { - t.Fatal(err) - } + nex1 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node1", "nexus") nex1.SysProcAttr = sysProcAttr() - // nex1.Stdout = os.Stdout - // nex1.Stderr = os.Stderr - err = nex1.Start() - if err != nil { - t.Fatal(err) - } + be.NilErr(t, nex1.Start()) time.Sleep(500 * time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -539,46 +285,20 @@ func TestDirectStopNoWorkload(t *testing.T) { go func() { <-ctx.Done() - err = stopProcess(nex1.Process) - if err != nil { - t.Error(err) - } + be.NilErr(t, stopProcess(nex1.Process)) }() - passed := false go func() { cmd := exec.Command(nexCli, "workload", "stop", "-s", s.ClientURL(), "fakeid") cmdstdout := new(bytes.Buffer) cmdstderr := new(bytes.Buffer) cmd.Stdout = cmdstdout cmd.Stderr = cmdstderr - // cmd.Stdout = os.Stdout - // cmd.Stderr = os.Stderr - err = cmd.Run() - if err != nil { - t.Log(cmdstderr.String()) - t.Error(err) - return - } - - if bytes.Contains(cmdstdout.Bytes(), []byte("Workload not found")) { - passed = true - } else { - t.Log("cmdstdout:", cmdstdout.String()) - if len(cmdstderr.Bytes()) > 0 { - t.Error("stderr:", cmdstderr.String()) - } - } + be.NilErr(t, cmd.Run()) + be.In(t, "Workload not found", cmdstdout.String()) cancel() }() - err = nex1.Wait() - if err != nil { - t.Fatal(err) - } - - if !passed { - t.Fatal("failed to properly detect no workloads") - } + be.NilErr(t, nex1.Wait()) }