Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Convert tests to go test
Browse files Browse the repository at this point in the history
sjberman committed Jan 22, 2025
1 parent bbcf56f commit 40cc29b
Showing 5 changed files with 277 additions and 303 deletions.
166 changes: 87 additions & 79 deletions internal/mode/static/nginx/agent/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -4,97 +4,105 @@ import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast"
)

func TestBroadcaster(t *testing.T) {
func TestSubscribe(t *testing.T) {
t.Parallel()
RegisterFailHandler(Fail)
RunSpecs(t, "Broadcaster Suite")
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)

subscriber := broadcaster.Subscribe()
g.Expect(subscriber.ID).NotTo(BeEmpty())

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}

go func() {
result := broadcaster.Send(message)
g.Expect(result).To(BeTrue())
}()

g.Eventually(subscriber.ListenCh).Should(Receive(Equal(message)))
}

var _ = Describe("Broadcaster", func() {
var (
ctx context.Context
cancel context.CancelFunc
broadcaster broadcast.Broadcaster
)

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background()) //nolint:fatcontext // ignore for test
broadcaster = broadcast.NewDeploymentBroadcaster(ctx)
})

AfterEach(func() {
cancel()
})

It("should allow a listener to subscribe and receive messages", func() {
subscriber := broadcaster.Subscribe()
Expect(subscriber.ID).NotTo(BeEmpty())

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}

go func() {
result := broadcaster.Send(message)
Expect(result).To(BeTrue())
}()

Eventually(subscriber.ListenCh).Should(Receive(Equal(message)))
})

It("should send messages to all subscribers and wait for responses", func() {
subscriber1 := broadcaster.Subscribe()
subscriber2 := broadcaster.Subscribe()

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}

go func() {
result := broadcaster.Send(message)
Expect(result).To(BeTrue())
}()

Eventually(subscriber1.ListenCh).Should(Receive(Equal(message)))
Eventually(subscriber2.ListenCh).Should(Receive(Equal(message)))

subscriber1.ResponseCh <- struct{}{}
subscriber2.ResponseCh <- struct{}{}
})

It("should return false if there are no listeners", func() {
message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}
func TestSubscribe_MultipleListeners(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)

subscriber1 := broadcaster.Subscribe()
subscriber2 := broadcaster.Subscribe()

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}

go func() {
result := broadcaster.Send(message)
Expect(result).To(BeFalse())
})
g.Expect(result).To(BeTrue())
}()

g.Eventually(subscriber1.ListenCh).Should(Receive(Equal(message)))
g.Eventually(subscriber2.ListenCh).Should(Receive(Equal(message)))

subscriber1.ResponseCh <- struct{}{}
subscriber2.ResponseCh <- struct{}{}
}

func TestSubscribe_NoListeners(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

broadcaster := broadcast.NewDeploymentBroadcaster(ctx)

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}

result := broadcaster.Send(message)
g.Expect(result).To(BeFalse())
}

func TestCancelSubscription(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

It("should remove a subscriber from the list", func() {
subscriber := broadcaster.Subscribe()
broadcaster := broadcast.NewDeploymentBroadcaster(ctx)

broadcaster.CancelSubscription(subscriber.ID)
subscriber := broadcaster.Subscribe()

message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}
broadcaster.CancelSubscription(subscriber.ID)

go func() {
result := broadcaster.Send(message)
Expect(result).To(BeFalse())
}()
message := broadcast.NginxAgentMessage{
ConfigVersion: "v1",
Type: broadcast.ConfigApplyRequest,
}

Consistently(subscriber.ListenCh).ShouldNot(Receive())
})
})
go func() {
result := broadcaster.Send(message)
g.Expect(result).To(BeFalse())
}()

g.Consistently(subscriber.ListenCh).ShouldNot(Receive())
}
183 changes: 99 additions & 84 deletions internal/mode/static/nginx/agent/grpc/connections_test.go
Original file line number Diff line number Diff line change
@@ -1,93 +1,108 @@
package grpc_test

import (
. "github.com/onsi/ginkgo/v2"
"testing"

. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/types"

agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
)

var _ = Describe("ConnectionsTracker", func() {
var tracker agentgrpc.ConnectionsTracker

BeforeEach(func() {
tracker = agentgrpc.NewConnectionsTracker()
})

It("should add a connection to the tracker", func() {
conn := agentgrpc.Connection{
PodName: "pod1",
InstanceID: "instance1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

trackedConn := tracker.GetConnection("key1")
Expect(trackedConn).To(Equal(conn))
})

It("should return an empty connection if the key does not exist", func() {
trackedConn := tracker.GetConnection("nonexistent")
Expect(trackedConn).To(Equal(agentgrpc.Connection{}))
})

It("should return true if the connection is ready", func() {
conn := agentgrpc.Connection{
PodName: "pod1",
InstanceID: "instance1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

trackedConn, ready := tracker.ConnectionIsReady("key1")
Expect(ready).To(BeTrue())
Expect(trackedConn).To(Equal(conn))
})

It("should return false if the connection is not ready when instanceID is not specified", func() {
conn := agentgrpc.Connection{
PodName: "pod1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

_, ready := tracker.ConnectionIsReady("key1")
Expect(ready).To(BeFalse())
})

It("should set the instance ID for a connection", func() {
conn := agentgrpc.Connection{
PodName: "pod1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

_, ready := tracker.ConnectionIsReady("key1")
Expect(ready).To(BeFalse())

tracker.SetInstanceID("key1", "instance1")

trackedConn, ready := tracker.ConnectionIsReady("key1")
Expect(ready).To(BeTrue())
Expect(trackedConn.InstanceID).To(Equal("instance1"))
})

It("should remove all connections for a given parent", func() {
parent := types.NamespacedName{Namespace: "default", Name: "parent1"}
conn1 := agentgrpc.Connection{PodName: "pod1", InstanceID: "instance1", Parent: parent}
conn2 := agentgrpc.Connection{PodName: "pod2", InstanceID: "instance2", Parent: parent}

parent2 := types.NamespacedName{Namespace: "default", Name: "parent2"}
conn3 := agentgrpc.Connection{PodName: "pod3", InstanceID: "instance3", Parent: parent2}

tracker.Track("key1", conn1)
tracker.Track("key2", conn2)
tracker.Track("key3", conn3)

tracker.UntrackConnectionsForParent(parent)
Expect(tracker.GetConnection("key1")).To(Equal(agentgrpc.Connection{}))
Expect(tracker.GetConnection("key2")).To(Equal(agentgrpc.Connection{}))
Expect(tracker.GetConnection("key3")).To(Equal(conn3))
})
})
func TestGetConnection(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()

conn := agentgrpc.Connection{
PodName: "pod1",
InstanceID: "instance1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

trackedConn := tracker.GetConnection("key1")
g.Expect(trackedConn).To(Equal(conn))

nonExistent := tracker.GetConnection("nonexistent")
g.Expect(nonExistent).To(Equal(agentgrpc.Connection{}))
}

func TestConnectionIsReady(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()

conn := agentgrpc.Connection{
PodName: "pod1",
InstanceID: "instance1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

trackedConn, ready := tracker.ConnectionIsReady("key1")
g.Expect(ready).To(BeTrue())
g.Expect(trackedConn).To(Equal(conn))
}

func TestConnectionIsNotReady(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()

conn := agentgrpc.Connection{
PodName: "pod1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

_, ready := tracker.ConnectionIsReady("key1")
g.Expect(ready).To(BeFalse())
}

func TestSetInstanceID(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()
conn := agentgrpc.Connection{
PodName: "pod1",
Parent: types.NamespacedName{Namespace: "default", Name: "parent1"},
}
tracker.Track("key1", conn)

_, ready := tracker.ConnectionIsReady("key1")
g.Expect(ready).To(BeFalse())

tracker.SetInstanceID("key1", "instance1")

trackedConn, ready := tracker.ConnectionIsReady("key1")
g.Expect(ready).To(BeTrue())
g.Expect(trackedConn.InstanceID).To(Equal("instance1"))
}

func TestUntrackConnectionsForParent(t *testing.T) {
t.Parallel()
g := NewWithT(t)

tracker := agentgrpc.NewConnectionsTracker()

parent := types.NamespacedName{Namespace: "default", Name: "parent1"}
conn1 := agentgrpc.Connection{PodName: "pod1", InstanceID: "instance1", Parent: parent}
conn2 := agentgrpc.Connection{PodName: "pod2", InstanceID: "instance2", Parent: parent}

parent2 := types.NamespacedName{Namespace: "default", Name: "parent2"}
conn3 := agentgrpc.Connection{PodName: "pod3", InstanceID: "instance3", Parent: parent2}

tracker.Track("key1", conn1)
tracker.Track("key2", conn2)
tracker.Track("key3", conn3)

tracker.UntrackConnectionsForParent(parent)
g.Expect(tracker.GetConnection("key1")).To(Equal(agentgrpc.Connection{}))
g.Expect(tracker.GetConnection("key2")).To(Equal(agentgrpc.Connection{}))
g.Expect(tracker.GetConnection("key3")).To(Equal(conn3))
}
42 changes: 17 additions & 25 deletions internal/mode/static/nginx/agent/grpc/context/context_test.go
Original file line number Diff line number Diff line change
@@ -4,36 +4,28 @@ import (
"context"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

grpcContext "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc/context"
)

func TestContext(t *testing.T) {
func TestGrpcInfoInContext(t *testing.T) {
t.Parallel()
RegisterFailHandler(Fail)
RunSpecs(t, "Context Suite")
g := NewWithT(t)

grpcInfo := grpcContext.GrpcInfo{IPAddress: "192.168.1.1"}

newCtx := grpcContext.NewGrpcContext(context.Background(), grpcInfo)
info, ok := grpcContext.GrpcInfoFromContext(newCtx)
g.Expect(ok).To(BeTrue())
g.Expect(info).To(Equal(grpcInfo))
}

var _ = Describe("Context", func() {
Context("when GrpcInfo is present in the context", func() {
It("should return the GrpcInfo", func() {
ctx := context.Background()
grpcInfo := grpcContext.GrpcInfo{IPAddress: "192.168.1.1"}

newCtx := grpcContext.NewGrpcContext(ctx, grpcInfo)
info, ok := grpcContext.GrpcInfoFromContext(newCtx)
Expect(ok).To(BeTrue())
Expect(info).To(Equal(grpcInfo))
})
})

Context("when GrpcInfo is not present in the context", func() {
It("should return false", func() {
info, ok := grpcContext.GrpcInfoFromContext(context.Background())
Expect(ok).To(BeFalse())
Expect(info).To(Equal(grpcContext.GrpcInfo{}))
})
})
})
func TestGrpcInfoNotInContext(t *testing.T) {
t.Parallel()
g := NewWithT(t)

info, ok := grpcContext.GrpcInfoFromContext(context.Background())
g.Expect(ok).To(BeFalse())
g.Expect(info).To(Equal(grpcContext.GrpcInfo{}))
}
14 changes: 0 additions & 14 deletions internal/mode/static/nginx/agent/grpc/grpc_suite_test.go

This file was deleted.

175 changes: 74 additions & 101 deletions internal/mode/static/nginx/agent/grpc/messenger/messenger_test.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import (
"testing"

v1 "github.com/nginx/agent/v3/api/grpc/mpi/v1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"google.golang.org/grpc"

@@ -47,106 +46,80 @@ func (m *mockErrorServer) Recv() (*v1.DataPlaneResponse, error) {
return nil, errors.New("error received from server")
}

func TestMessenger(t *testing.T) {
func createServer() *mockServer {
return &mockServer{
sendChan: make(chan *v1.ManagementPlaneRequest, 1),
recvChan: make(chan *v1.DataPlaneResponse, 1),
}
}

func createErrorServer() *mockErrorServer {
return &mockErrorServer{
sendChan: make(chan *v1.ManagementPlaneRequest, 1),
recvChan: make(chan *v1.DataPlaneResponse, 1),
}
}

func TestSend(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

server := createServer()
msgr := messenger.New(server)

go msgr.Run(ctx)

msg := &v1.ManagementPlaneRequest{
MessageMeta: &v1.MessageMeta{
MessageId: "test",
},
}
g.Expect(msgr.Send(ctx, msg)).To(Succeed())

g.Eventually(server.sendChan).Should(Receive(Equal(msg)))

cancel()

g.Expect(msgr.Send(ctx, &v1.ManagementPlaneRequest{})).ToNot(Succeed())
}

func TestMessages(t *testing.T) {
t.Parallel()
RegisterFailHandler(Fail)
RunSpecs(t, "Messenger Suite")
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

server := createServer()
msgr := messenger.New(server)

go msgr.Run(ctx)

msg := &v1.DataPlaneResponse{InstanceId: "test"}
server.recvChan <- msg

g.Eventually(msgr.Messages()).Should(Receive(Equal(msg)))
}

var _ = Describe("Messenger", func() {
var (
ctx context.Context
cancel context.CancelFunc
)

Context("valid server", func() {
var (
server *mockServer
msgr messenger.Messenger
)

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background()) //nolint:fatcontext // ignore for test
server = &mockServer{
sendChan: make(chan *v1.ManagementPlaneRequest, 1),
recvChan: make(chan *v1.DataPlaneResponse, 1),
}
msgr = messenger.New(server)
Expect(msgr).ToNot(BeNil())
})

AfterEach(func() {
cancel()
close(server.sendChan)
close(server.recvChan)
})

It("should send a message successfully", func() {
go msgr.Run(ctx)

msg := &v1.ManagementPlaneRequest{
MessageMeta: &v1.MessageMeta{
MessageId: "test",
},
}
Expect(msgr.Send(ctx, msg)).To(Succeed())

Eventually(server.sendChan).Should(Receive(Equal(msg)))
})

It("should receive a message successfully", func() {
go msgr.Run(ctx)

msg := &v1.DataPlaneResponse{InstanceId: "test"}
server.recvChan <- msg

Eventually(msgr.Messages()).Should(Receive(Equal(msg)))
})

It("should handle send context cancellation", func() {
go msgr.Run(ctx)

cancel()

Expect(msgr.Send(ctx, &v1.ManagementPlaneRequest{})).ToNot(Succeed())
})
})

Context("server returns error", func() {
var (
server *mockErrorServer
msgr messenger.Messenger
)

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background()) //nolint:fatcontext // ignore for test
server = &mockErrorServer{
sendChan: make(chan *v1.ManagementPlaneRequest, 1),
recvChan: make(chan *v1.DataPlaneResponse, 1),
}
msgr = messenger.New(server)
Expect(msgr).ToNot(BeNil())
})

AfterEach(func() {
cancel()
close(server.sendChan)
close(server.recvChan)
})

It("should handle an error on the server side when sending", func() {
go msgr.Run(ctx)

Expect(msgr.Send(ctx, &v1.ManagementPlaneRequest{})).To(Succeed())
Eventually(msgr.Errors()).Should(Receive(MatchError("error sending to server")))
})

It("should handle an error response sent by the server", func() {
go msgr.Run(ctx)

server.recvChan <- &v1.DataPlaneResponse{}

Eventually(msgr.Errors()).Should(Receive(MatchError("error received from server")))
})
})
})
func TestErrors(t *testing.T) {
t.Parallel()
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

server := createErrorServer()
msgr := messenger.New(server)

go msgr.Run(ctx)

g.Expect(msgr.Send(ctx, &v1.ManagementPlaneRequest{})).To(Succeed())
g.Eventually(msgr.Errors()).Should(Receive(MatchError("error sending to server")))

server.recvChan <- &v1.DataPlaneResponse{}

g.Eventually(msgr.Errors()).Should(Receive(MatchError("error received from server")))
}

0 comments on commit 40cc29b

Please sign in to comment.