Skip to content

Commit

Permalink
run only fifo test for begin with channels on CI
Browse files Browse the repository at this point in the history
Signed-off-by: Nikita Skrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik committed Nov 3, 2023
1 parent 4cd8e35 commit 06ddf5c
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 72 deletions.
17 changes: 10 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ on:
push:
branches:
- "release/**"
jobs:
yamllint:
uses: networkservicemesh/.github/.github/workflows/yamllint.yaml@main

build-and-test:
uses: networkservicemesh/.github/.github/workflows/build-and-test.yaml@main
with:
os: '["ubuntu-latest", "macos-latest", "windows-latest"]'
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v2
- name: Setup Go
uses: actions/setup-go@v1
with:
go-version: 1.20.5
- run: |
go test ./pkg/registry/common/beginloop -run TestFIFOSequence -race -v
golangci-lint:
uses: networkservicemesh/.github/.github/workflows/golangci-lint.yaml@main
Expand Down
173 changes: 108 additions & 65 deletions pkg/registry/common/beginloop/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,29 @@ package beginloop_test

import (
"context"
"crypto/rand"
"fmt"
"math/rand"
"math/big"
"net"
"sync"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/registry"
"github.com/networkservicemesh/sdk/pkg/registry/common/begin"
"github.com/networkservicemesh/sdk/pkg/registry/common/beginloop"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type waitGroupServer struct {
wg *sync.WaitGroup
}

func (s *waitGroupServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
s.wg.Done()
log.FromContext(ctx).Infof("Thread [%v] made Done", in.Url)
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

func (s *waitGroupServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *waitGroupServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

type collectorServer struct {
mu sync.Mutex
registrations []*registry.NetworkServiceEndpoint
}

func (s *collectorServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
s.mu.Lock()
s.registrations = append(s.registrations, in)
s.mu.Unlock()
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

func (s *collectorServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *collectorServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

type delayServer struct {
}

func (s *delayServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
milliseconds := rand.Intn(90) + 10
time.Sleep(time.Millisecond * time.Duration(milliseconds))
log.FromContext(ctx).Infof("Thread [%v] finished waiting", in.Url)
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

func (s *delayServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *delayServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}
"github.com/networkservicemesh/sdk/pkg/registry/common/beginloop"
"github.com/networkservicemesh/sdk/pkg/registry/core/next"
)

func TestFIFO(t *testing.T) {
func TestFIFOSequence(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -111,6 +59,7 @@ func TestFIFO(t *testing.T) {

grpcServer := grpc.NewServer()
registry.RegisterNetworkServiceEndpointRegistryServer(grpcServer, server)
defer grpcServer.Stop()

go func() {
serveErr := grpcServer.Serve(serverLis)
Expand All @@ -120,19 +69,32 @@ func TestFIFO(t *testing.T) {
clientConn, err := grpc.Dial(serverLis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
client := registry.NewNetworkServiceEndpointRegistryClient(clientConn)
defer func() {
closeErr := clientConn.Close()
require.NoError(t, closeErr)
}()

count := 3
count := 100
nses := []*registry.NetworkServiceEndpoint{}
for i := 0; i < count; i++ {
nses = append(nses, &registry.NetworkServiceEndpoint{Name: "nse", Url: fmt.Sprint(i)})
}

expected := make([]request, 0)

clientWg.Add(count)
for i := 0; i < count; i++ {
local := i
serverWg.Add(1)
go func() {
_, err := client.Register(begin.WithID(ctx, local), nses[local])
var err error
if local%2 == 0 {
expected = append(expected, request{requestType: register, requestData: nses[local]})
_, err = client.Register(ctx, nses[local])
} else {
expected = append(expected, request{requestType: unregister, requestData: nses[local]})
_, err = client.Unregister(ctx, nses[local])
}
require.NoError(t, err)
clientWg.Done()
}()
Expand All @@ -146,6 +108,87 @@ func TestFIFO(t *testing.T) {
registrations := collector.registrations

for i, registration := range registrations {
require.Equal(t, registration.Url, fmt.Sprint(i))
require.Equal(t, registration.requestData.Url, expected[i].requestData.Url)
require.Equal(t, registration.requestType, expected[i].requestType)
}
}

type eventType int

const (
register eventType = 0
unregister eventType = 1
)

type request struct {
requestType eventType
requestData *registry.NetworkServiceEndpoint
}

type waitGroupServer struct {
wg *sync.WaitGroup
}

func (s *waitGroupServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
s.wg.Done()
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

func (s *waitGroupServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *waitGroupServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
s.wg.Done()
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

type collectorServer struct {
mu sync.Mutex
registrations []request
}

func (s *collectorServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
s.mu.Lock()
s.registrations = append(s.registrations, request{
requestType: register,
requestData: in,
})
s.mu.Unlock()
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

func (s *collectorServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *collectorServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
s.mu.Lock()
s.registrations = append(s.registrations, request{
requestType: unregister,
requestData: in,
})
s.mu.Unlock()
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

type delayServer struct {
}

func (s *delayServer) Register(ctx context.Context, in *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
n, _ := rand.Int(rand.Reader, big.NewInt(90))
milliseconds := n.Int64() + 10
time.Sleep(time.Millisecond * time.Duration(milliseconds))
return next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, in)
}

func (s *delayServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server)
}

func (s *delayServer) Unregister(ctx context.Context, in *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
n, _ := rand.Int(rand.Reader, big.NewInt(90))
milliseconds := n.Int64() + 10
time.Sleep(time.Millisecond * time.Duration(milliseconds))
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, in)
}

0 comments on commit 06ddf5c

Please sign in to comment.