Skip to content

Commit

Permalink
Use NetworkServiceResponses instead of NetworkServices in memory regi…
Browse files Browse the repository at this point in the history
…stry (#1677)

* use NetworkServiceResponses instead of NetworkServices in memory registry

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* add a test for checking  field

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* fix go lintre issues

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

* fix concurrency problems in the test

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>

---------

Signed-off-by: NikitaSkrynnik <nikita.skrynnik@xored.com>
  • Loading branch information
NikitaSkrynnik authored Oct 7, 2024
1 parent 95c7ff7 commit 76b397f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 19 deletions.
34 changes: 16 additions & 18 deletions pkg/registry/common/memory/ns_server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco and/or its affiliates.
// Copyright (c) 2023-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -37,15 +37,15 @@ import (
type memoryNSServer struct {
networkServices genericsync.Map[string, *registry.NetworkService]
executor serialize.Executor
eventChannels map[string]chan *registry.NetworkService
eventChannels map[string]chan *registry.NetworkServiceResponse
eventChannelSize int
}

// NewNetworkServiceRegistryServer creates new memory based NetworkServiceRegistryServer
func NewNetworkServiceRegistryServer(options ...Option) registry.NetworkServiceRegistryServer {
s := &memoryNSServer{
eventChannelSize: defaultEventChannelSize,
eventChannels: make(map[string]chan *registry.NetworkService),
eventChannels: make(map[string]chan *registry.NetworkServiceResponse),
}
for _, o := range options {
o.apply(s)
Expand All @@ -65,12 +65,12 @@ func (s *memoryNSServer) Register(ctx context.Context, ns *registry.NetworkServi

s.networkServices.Store(r.Name, r.Clone())

s.sendEvent(r)
s.sendEvent(&registry.NetworkServiceResponse{NetworkService: r})

return r, nil
}

func (s *memoryNSServer) sendEvent(event *registry.NetworkService) {
func (s *memoryNSServer) sendEvent(event *registry.NetworkServiceResponse) {
event = event.Clone()
s.executor.AsyncExec(func() {
for _, ch := range s.eventChannels {
Expand All @@ -93,13 +93,13 @@ func (s *memoryNSServer) Find(query *registry.NetworkServiceQuery, server regist
return next.NetworkServiceRegistryServer(server.Context()).Find(query, server)
}

eventCh := make(chan *registry.NetworkService, s.eventChannelSize)
eventCh := make(chan *registry.NetworkServiceResponse, s.eventChannelSize)
id := uuid.New().String()

s.executor.AsyncExec(func() {
s.eventChannels[id] = eventCh
for _, entity := range s.allMatches(query) {
eventCh <- entity
eventCh <- &registry.NetworkServiceResponse{NetworkService: entity}
}
})
defer s.closeEventChannel(id, eventCh)
Expand All @@ -123,7 +123,7 @@ func (s *memoryNSServer) allMatches(query *registry.NetworkServiceQuery) (matche
return matches
}

func (s *memoryNSServer) closeEventChannel(id string, eventCh <-chan *registry.NetworkService) {
func (s *memoryNSServer) closeEventChannel(id string, eventCh <-chan *registry.NetworkServiceResponse) {
ctx, cancel := context.WithCancel(context.Background())

s.executor.AsyncExec(func() {
Expand All @@ -143,30 +143,28 @@ func (s *memoryNSServer) closeEventChannel(id string, eventCh <-chan *registry.N
func (s *memoryNSServer) receiveEvent(
query *registry.NetworkServiceQuery,
server registry.NetworkServiceRegistry_FindServer,
eventCh <-chan *registry.NetworkService,
eventCh <-chan *registry.NetworkServiceResponse,
) error {
select {
case <-server.Context().Done():
return errors.WithStack(io.EOF)
case event := <-eventCh:
if matchutils.MatchNetworkServices(query.NetworkService, event) {
nse := &registry.NetworkServiceResponse{
NetworkService: event,
}

if err := server.Send(nse); err != nil {
if matchutils.MatchNetworkServices(query.NetworkService, event.NetworkService) {
if err := server.Send(event); err != nil {
if server.Context().Err() != nil {
return errors.WithStack(io.EOF)
}
return errors.Wrapf(err, "NetworkServiceRegistry find server failed to send a response %s", nse.String())
return errors.Wrapf(err, "NetworkServiceRegistry find server failed to send a response %s", event.String())
}
}
return nil
}
}

func (s *memoryNSServer) Unregister(ctx context.Context, ns *registry.NetworkService) (*empty.Empty, error) {
s.networkServices.Delete(ns.Name)

if unregisterNS, ok := s.networkServices.LoadAndDelete(ns.GetName()); ok {
unregisterNS = unregisterNS.Clone()
s.sendEvent(&registry.NetworkServiceResponse{NetworkService: unregisterNS, Deleted: true})
}
return next.NetworkServiceRegistryServer(ctx).Unregister(ctx, ns)
}
38 changes: 37 additions & 1 deletion pkg/registry/common/memory/ns_server_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2020-2022 Doc.ai and/or its affiliates.
//
// Copyright (c) 2023 Cisco Systems, Inc.
// Copyright (c) 2023-2024 Cisco Systems, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -251,6 +251,42 @@ func TestNetworkServiceRegistryServer_ShouldReceiveAllRegisters(t *testing.T) {
wgWait(ctx, t, &wg)
}

func TestNetworkServiceRegistryServer_DeleteEvent(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

s := memory.NewNetworkServiceRegistryServer()

ns, err := s.Register(ctx, &registry.NetworkService{Name: "ns"})
require.NoError(t, err)

findCtx, findCancel := context.WithCancel(ctx)
defer findCancel()

ch := make(chan *registry.NetworkServiceResponse, 2)
go func() {
defer close(ch)
findErr := s.Find(&registry.NetworkServiceQuery{
NetworkService: &registry.NetworkService{Name: "ns"},
Watch: true,
}, streamchannel.NewNetworkServiceFindServer(findCtx, ch))
require.NoError(t, findErr)
}()

nsResp := <-ch
require.False(t, nsResp.Deleted)

_, err = s.Unregister(ctx, ns)
require.NoError(t, err)

// Read unregister event
nsResp, err = readNSResponse(findCtx, ch)
require.NoError(t, err)
require.True(t, nsResp.Deleted)
}

func readNSResponse(ctx context.Context, ch <-chan *registry.NetworkServiceResponse) (*registry.NetworkServiceResponse, error) {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 76b397f

Please sign in to comment.