Skip to content

Commit

Permalink
Fix monitor blocking sending events on non-reading client
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Popov <vladimir.popov@xored.com>
  • Loading branch information
Vladimir Popov committed Aug 16, 2021
1 parent 1fce3fe commit 9611749
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 68 deletions.
5 changes: 5 additions & 0 deletions pkg/networkservice/common/monitor/filter.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020 Cisco Systems, Inc.
//
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -17,11 +19,14 @@
package monitor

import (
"github.com/edwarnicke/serialize"
"github.com/networkservicemesh/api/pkg/api/networkservice"
)

type monitorFilter struct {
selector *networkservice.MonitorScopeSelector
executor serialize.Executor

networkservice.MonitorConnection_MonitorConnectionsServer
}

Expand Down
83 changes: 46 additions & 37 deletions pkg/networkservice/common/monitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"

"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"

"github.com/edwarnicke/serialize"

Expand All @@ -34,10 +35,10 @@ import (
)

type monitorServer struct {
ctx context.Context
connections map[string]*networkservice.Connection
monitors []networkservice.MonitorConnection_MonitorConnectionsServer
filters map[string]*monitorFilter
executor serialize.Executor
ctx context.Context
}

// NewServer - creates a NetworkServiceServer chain element that will properly update a MonitorConnectionServer
Expand All @@ -53,27 +54,35 @@ func NewServer(ctx context.Context, monitorServerPtr *networkservice.MonitorConn
rv := &monitorServer{
ctx: ctx,
connections: make(map[string]*networkservice.Connection),
monitors: nil, // Intentionally nil
filters: make(map[string]*monitorFilter),
}

*monitorServerPtr = rv

return rv
}

func (m *monitorServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, srv networkservice.MonitorConnection_MonitorConnectionsServer) error {
m.executor.AsyncExec(func() {
monitor := newMonitorFilter(selector, srv)
m.monitors = append(m.monitors, monitor)
filter := newMonitorFilter(selector, srv)
m.filters[uuid.New().String()] = filter

connections := networkservice.FilterMapOnManagerScopeSelector(m.connections, selector)

// Send initial transfer of all data available
_ = monitor.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER,
Connections: connections,
filter.executor.AsyncExec(func() {
_ = filter.Send(&networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER,
Connections: connections,
})
})
})

select {
case <-srv.Context().Done():
case <-m.ctx.Done():
}

return nil
}

Expand All @@ -83,53 +92,53 @@ func (m *monitorServer) Request(ctx context.Context, request *networkservice.Net
eventConn := conn.Clone()
m.executor.AsyncExec(func() {
m.connections[eventConn.GetId()] = eventConn

// Send update event
event := &networkservice.ConnectionEvent{
// Send UPDATE
m.send(ctx, &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn},
}
if sendErr := m.send(ctx, event); sendErr != nil {
log.FromContext(ctx).Errorf("Error during sending event: %v", sendErr)
}
})
})
}
return conn, err
}

func (m *monitorServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) {
_, closeErr := next.Server(ctx).Close(ctx, conn)
rv, err := next.Server(ctx).Close(ctx, conn)

// Remove connection object we have and send DELETE
eventConn := conn.Clone()
m.executor.AsyncExec(func() {
delete(m.connections, eventConn.GetId())

event := &networkservice.ConnectionEvent{
m.send(ctx, &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_DELETE,
Connections: map[string]*networkservice.Connection{eventConn.GetId(): eventConn},
}
if err := m.send(ctx, event); err != nil {
log.FromContext(ctx).Errorf("Error during sending event: %v", err)
}
})
})
return &empty.Empty{}, closeErr

return rv, err
}

// send - perform a send to clients.
func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) (err error) {
newMonitors := []networkservice.MonitorConnection_MonitorConnectionsServer{}
for _, filter := range m.monitors {
select {
case <-filter.Context().Done():
default:
if err = filter.Send(event.Clone()); err != nil {
log.FromContext(ctx).Errorf("Error sending event: %+v: %+v", event, err)
func (m *monitorServer) send(ctx context.Context, event *networkservice.ConnectionEvent) {
logger := log.FromContext(ctx).WithField("monitorServer", "send")
for id, filter := range m.filters {
id, filter := id, filter
event = event.Clone()
filter.executor.AsyncExec(func() {
var err error
select {
case <-filter.Context().Done():
err = filter.Context().Err()
default:
err = filter.Send(event)
}
if err == nil {
return
}
newMonitors = append(newMonitors, filter)
}
}

m.monitors = newMonitors
return err
logger.Errorf("error sending event: %+v %s", event, err.Error())
m.executor.AsyncExec(func() {
delete(m.filters, id)
})
})
}
}
62 changes: 37 additions & 25 deletions pkg/networkservice/common/monitor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,26 @@ package monitor_test
import (
"context"
"testing"
"time"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/monitor"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters"
)

func TestMonitor(t *testing.T) {
func TestMonitorServer(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Specify pathSegments to test
segmentNames := []string{"local-nsm", "remote-nsm"}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create monitorServer, monitorClient, and server.
var monitorServer networkservice.MonitorConnectionServer
server := monitor.NewServer(ctx, &monitorServer)
Expand All @@ -44,26 +48,32 @@ func TestMonitor(t *testing.T) {
connections := make(map[string]*networkservice.Connection)
receivers := make(map[string]networkservice.MonitorConnection_MonitorConnectionsClient)

// Create non-reading monitor client for all connections
_, monitorErr := monitorClient.MonitorConnections(ctx, new(networkservice.MonitorScopeSelector))
require.NoError(t, monitorErr)

// Get Empty initial state transfers
for _, segmentName := range segmentNames {
var err error
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
receivers[segmentName], err = monitorClient.MonitorConnections(ctx, &networkservice.MonitorScopeSelector{
monitorCtx, cancelMonitor := context.WithCancel(ctx)
defer cancelMonitor()

receivers[segmentName], monitorErr = monitorClient.MonitorConnections(monitorCtx, &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{{Name: segmentName}},
})
assert.Nil(t, err)
require.NoError(t, monitorErr)

event, err := receivers[segmentName].Recv()
assert.Nil(t, err)
require.NoError(t, err)

require.NotNil(t, event)
assert.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType())
require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 0)
require.Equal(t, networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER, event.GetType())
require.Empty(t, event.GetConnections()[segmentName].GetPath().GetPathSegments())
}

// Send requests
for _, segmentName := range segmentNames {
var err error
connections[segmentName], err = server.Request(context.Background(), &networkservice.NetworkServiceRequest{
connections[segmentName], err = server.Request(ctx, &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{
Id: segmentName,
Path: &networkservice.Path{
Expand All @@ -76,32 +86,34 @@ func TestMonitor(t *testing.T) {
},
},
})
assert.Nil(t, err)
require.NoError(t, err)
}

// Get Updates and insure we've properly filtered by segmentName
for _, segmentName := range segmentNames {
event, err := receivers[segmentName].Recv()
assert.Nil(t, err)
require.NoError(t, err)

require.NotNil(t, event)
assert.Equal(t, networkservice.ConnectionEventType_UPDATE, event.GetType())
require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 1)
assert.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
require.Equal(t, networkservice.ConnectionEventType_UPDATE, event.GetType())
require.Len(t, event.GetConnections()[segmentName].GetPath().GetPathSegments(), 1)
require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
}

// Close Connections
for _, conn := range connections {
_, err := server.Close(context.Background(), conn)
assert.Nil(t, err)
_, err := server.Close(ctx, conn)
require.NoError(t, err)
}

// Get Delete Events and insure we've properly filtered by segmentName
for _, segmentName := range segmentNames {
event, err := receivers[segmentName].Recv()
assert.Nil(t, err)
require.NoError(t, err)

require.NotNil(t, event)
assert.Equal(t, networkservice.ConnectionEventType_DELETE, event.GetType())
require.Equal(t, len(event.GetConnections()[segmentName].GetPath().GetPathSegments()), 1)
assert.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
require.Equal(t, networkservice.ConnectionEventType_DELETE, event.GetType())
require.Len(t, event.GetConnections()[segmentName].GetPath().GetPathSegments(), 1)
require.Equal(t, segmentName, event.GetConnections()[segmentName].GetPath().GetPathSegments()[0].GetName())
}
}
4 changes: 2 additions & 2 deletions pkg/networkservice/core/adapters/monitor_server_to_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewMonitorServerToClient(server networkservice.MonitorConnectionServer) net
return &monitorServerToClient{server: server}
}

func (m *monitorServerToClient) MonitorConnections(ctx context.Context, selector *networkservice.MonitorScopeSelector, opts ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
eventCh := make(chan *networkservice.ConnectionEvent, 100)
func (m *monitorServerToClient) MonitorConnections(ctx context.Context, selector *networkservice.MonitorScopeSelector, _ ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
eventCh := make(chan *networkservice.ConnectionEvent, 1)
srv := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh)
go func() {
_ = m.server.MonitorConnections(selector, srv)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -41,11 +43,12 @@ func NewMonitorConnectionMonitorConnectionsServer(ctx context.Context, eventCh c
}

func (m *monitorConnectionMonitorConnectionsServer) Send(event *networkservice.ConnectionEvent) error {
if err := m.ctx.Err(); err != nil {
return errors.Wrap(err, "Can no longer Send")
select {
case <-m.ctx.Done():
return m.ctx.Err()
case m.eventCh <- event:
return nil
}
m.eventCh <- event
return nil
}

func (m *monitorConnectionMonitorConnectionsServer) Context() context.Context {
Expand Down

0 comments on commit 9611749

Please sign in to comment.