From b0081adc094b64765c0a398c1773104bf5b17986 Mon Sep 17 00:00:00 2001 From: Ed Warnicke Date: Sun, 16 Feb 2020 16:21:36 -0600 Subject: [PATCH] Add eventchannel networkservice chain element (#105) This provides machinery to allow the creation of various MonitorConnection clients/servers backed by event channels. This aids in construction of both adapters and the building of tests. Signed-off-by: Ed Warnicke --- .../core/eventchannel/monitor_client.go | 85 +++++++++++++ .../core/eventchannel/monitor_client_test.go | 69 +++++++++++ .../eventchannel/monitor_connection_client.go | 84 +++++++++++++ .../monitor_connection_client_test.go | 115 ++++++++++++++++++ .../eventchannel/monitor_connection_server.go | 80 ++++++++++++ .../monitor_connection_server_test.go | 112 +++++++++++++++++ .../core/eventchannel/monitor_server.go | 98 +++++++++++++++ .../core/eventchannel/monitor_server_test.go | 86 +++++++++++++ 8 files changed, 729 insertions(+) create mode 100644 pkg/networkservice/core/eventchannel/monitor_client.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_client_test.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_connection_client.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_connection_client_test.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_connection_server.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_connection_server_test.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_server.go create mode 100644 pkg/networkservice/core/eventchannel/monitor_server_test.go diff --git a/pkg/networkservice/core/eventchannel/monitor_client.go b/pkg/networkservice/core/eventchannel/monitor_client.go new file mode 100644 index 000000000..6ed86c82f --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_client.go @@ -0,0 +1,85 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package eventchannel provides implementations based on event channels of: +// networkservice.MonitorConnectionClient +// networkservice.MonitorConnectionServer +// networkservice.MonitorConnection_MonitorConnectionsClient +// networkservice.MonitorConnection_MonitorConnectionsServer +package eventchannel + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/tools/serialize" +) + +type monitorConnectionClient struct { + eventCh <-chan *networkservice.ConnectionEvent + fanoutEventChs []chan *networkservice.ConnectionEvent + updateExecutor serialize.Executor +} + +// NewMonitorConnectionClient - returns networkservice.MonitorConnectionClient +// eventCh - channel that provides events to feed the Recv function +// when an event is sent on the eventCh, all networkservice.MonitorConnection_MonitorConnectionsClient +// returned from calling MonitorConnections receive the event. +// Note: Does not perform filtering basedon MonitorScopeSelector +func NewMonitorConnectionClient(eventCh <-chan *networkservice.ConnectionEvent) networkservice.MonitorConnectionClient { + rv := &monitorConnectionClient{ + eventCh: eventCh, + updateExecutor: serialize.NewExecutor(), + } + rv.eventLoop() + return rv +} + +func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, in *networkservice.MonitorScopeSelector, opts ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) { + fanoutEventCh := make(chan *networkservice.ConnectionEvent, 100) + m.updateExecutor.AsyncExec(func() { + m.fanoutEventChs = append(m.fanoutEventChs, fanoutEventCh) + go func() { + <-ctx.Done() + m.updateExecutor.AsyncExec(func() { + var newFanoutEventChs []chan *networkservice.ConnectionEvent + for _, ch := range m.fanoutEventChs { + if ch != fanoutEventCh { + newFanoutEventChs = append(newFanoutEventChs, ch) + } + } + m.fanoutEventChs = newFanoutEventChs + close(fanoutEventCh) + }) + }() + }) + return NewMonitorConnectionMonitorConnectionsClient(fanoutEventCh), nil +} + +func (m *monitorConnectionClient) eventLoop() { + go func() { + for event := range m.eventCh { + e := event + m.updateExecutor.AsyncExec(func() { + for _, fanoutEventCh := range m.fanoutEventChs { + fanoutEventCh <- e + } + }) + } + }() +} diff --git a/pkg/networkservice/core/eventchannel/monitor_client_test.go b/pkg/networkservice/core/eventchannel/monitor_client_test.go new file mode 100644 index 000000000..36f9c89bd --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_client_test.go @@ -0,0 +1,69 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel_test + +import ( + "context" + "testing" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" +) + +func TestNewMonitorConnectionClient_MonitorConnections(t *testing.T) { + maxReceivers := 10 + receivers := make([]networkservice.MonitorConnection_MonitorConnectionsClient, maxReceivers) + cancelFuncs := make([]context.CancelFunc, maxReceivers) + + numEvents := 10 + eventCh := make(chan *networkservice.ConnectionEvent, numEvents) + client := eventchannel.NewMonitorConnectionClient(eventCh) + for i := 0; i < maxReceivers; i++ { + var ctx context.Context + ctx, cancelFuncs[i] = context.WithCancel(context.Background()) + receiver, err := client.MonitorConnections(ctx, nil) + receivers[i] = receiver + assert.Nil(t, err) + assert.NotNil(t, receivers[i]) + } + eventsIn := make([]*networkservice.ConnectionEvent, numEvents) + for i := 0; i < numEvents; i++ { + eventsIn[i] = &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + string(i): { + Id: (string(i)), + }, + }, + } + eventCh <- eventsIn[i] + } + for i := 0; i < maxReceivers; i++ { + for j := 0; j < numEvents; j++ { + eventOut, err := receivers[i].Recv() + assert.Nil(t, err) + assert.Equal(t, eventsIn[j], eventOut) + } + } + + cancelFuncs[0]() + _, err := receivers[0].Recv() + assert.NotNil(t, err) + close(eventCh) +} diff --git a/pkg/networkservice/core/eventchannel/monitor_connection_client.go b/pkg/networkservice/core/eventchannel/monitor_connection_client.go new file mode 100644 index 000000000..c256aa12c --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_connection_client.go @@ -0,0 +1,84 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/pkg/errors" + "google.golang.org/grpc/metadata" +) + +type monitorConnectionMonitorConnectionsClient struct { + eventCh <-chan *networkservice.ConnectionEvent + ctx context.Context + cancelFunc context.CancelFunc +} + +// NewMonitorConnectionMonitorConnectionsClient - returns a networkservice.MonitorConnection_MonitorConnectionsClient +// eventCh - when an event is sent on eventCh, it is returned by the +// call to Recv on the networkservice.MonitorConnection_MonitorConnectionsClient +func NewMonitorConnectionMonitorConnectionsClient(eventCh <-chan *networkservice.ConnectionEvent) networkservice.MonitorConnection_MonitorConnectionsClient { + ctx, cancelFunc := context.WithCancel(context.Background()) + return &monitorConnectionMonitorConnectionsClient{ + eventCh: eventCh, + ctx: ctx, + cancelFunc: cancelFunc, + } +} + +func (m *monitorConnectionMonitorConnectionsClient) Recv() (*networkservice.ConnectionEvent, error) { + event, ok := <-m.eventCh + if !ok { + m.cancelFunc() + return nil, errors.New("No more events, chan closed by sender") + } + return event, nil +} + +func (m *monitorConnectionMonitorConnectionsClient) Header() (metadata.MD, error) { + return make(metadata.MD), nil +} + +func (m *monitorConnectionMonitorConnectionsClient) Trailer() metadata.MD { + return make(metadata.MD) +} + +func (m *monitorConnectionMonitorConnectionsClient) CloseSend() error { + return nil +} + +func (m *monitorConnectionMonitorConnectionsClient) Context() context.Context { + return m.ctx +} + +func (m *monitorConnectionMonitorConnectionsClient) SendMsg(msg interface{}) error { + return nil +} + +func (m *monitorConnectionMonitorConnectionsClient) RecvMsg(msg interface{}) error { + if event, ok := msg.(*networkservice.ConnectionEvent); ok { + e, err := m.Recv() + if err != nil { + return err + } + *event = *e + return nil + } + return errors.Errorf("Not type networkservice.ConnectionEvent - msg (%+v)", msg) +} diff --git a/pkg/networkservice/core/eventchannel/monitor_connection_client_test.go b/pkg/networkservice/core/eventchannel/monitor_connection_client_test.go new file mode 100644 index 000000000..e47f6a95a --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_connection_client_test.go @@ -0,0 +1,115 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel_test + +import ( + "testing" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" +) + +func TestMonitorConnection_MonitorConnectionsClient_Recv(t *testing.T) { + numEvents := 50 + eventCh := make(chan *networkservice.ConnectionEvent, numEvents) + mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh) + eventsIn := make([]*networkservice.ConnectionEvent, numEvents) + for i := 0; i < numEvents; i++ { + eventsIn[i] = &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + string(i): { + Id: (string(i)), + }, + }, + } + eventCh <- eventsIn[i] + } + for i := 0; i < 50; i++ { + eventOut, err := mcc.Recv() + assert.Nil(t, err) + assert.Equal(t, eventsIn[i], eventOut) + } + close(eventCh) + _, err := mcc.Recv() + assert.NotNil(t, err) + select { + case <-mcc.Context().Done(): + default: + assert.Fail(t, "Context should be Done") + } +} + +func TestMonitorConnection_MonitorConnectionsClient_RecvMsg(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh) + + var wrongTypeEvent struct{} + err := mcc.RecvMsg(wrongTypeEvent) + assert.NotNil(t, err) + + eventIn := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + "foo": { + Id: "foo", + }, + }, + } + eventCh <- eventIn + eventOut := &networkservice.ConnectionEvent{} + err = mcc.RecvMsg(eventOut) + assert.Nil(t, err) + assert.Equal(t, eventIn, eventOut) + + close(eventCh) + err = mcc.RecvMsg(eventOut) + assert.NotNil(t, err) + select { + case <-mcc.Context().Done(): + default: + assert.Fail(t, "Context should be Done") + } +} + +func TestMonitorConnection_MonitorConnectionsClient_CloseSend(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent) + mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh) + assert.Nil(t, mcc.CloseSend()) +} + +func TestMonitorConnection_MonitorConnectionsClient_Header(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent) + mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh) + header, err := mcc.Header() + assert.Nil(t, err) + assert.NotNil(t, header) +} + +func TestMonitorConnection_MonitorConnectionsClient_SendMsg(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent) + mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh) + assert.Nil(t, mcc.SendMsg(nil)) +} + +func TestMonitorConnection_MonitorConnectionsClient_Trailer(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent) + mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh) + assert.NotNil(t, mcc.Trailer()) +} diff --git a/pkg/networkservice/core/eventchannel/monitor_connection_server.go b/pkg/networkservice/core/eventchannel/monitor_connection_server.go new file mode 100644 index 000000000..61ebfa9d4 --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_connection_server.go @@ -0,0 +1,80 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel + +import ( + "context" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/pkg/errors" + "google.golang.org/grpc/metadata" +) + +type monitorConnectionMonitorConnectionsServer struct { + ctx context.Context + eventCh chan<- *networkservice.ConnectionEvent +} + +// NewMonitorConnectionMonitorConnectionsServer - returns a networkservice.MonitorConnection_MonitorConnectionsServer +// eventCh - when an event is passed to the Send() method, it is inserted +// into eventCh +func NewMonitorConnectionMonitorConnectionsServer(ctx context.Context, eventCh chan<- *networkservice.ConnectionEvent) networkservice.MonitorConnection_MonitorConnectionsServer { + rv := &monitorConnectionMonitorConnectionsServer{ + ctx: ctx, + eventCh: eventCh, + } + go func() { + <-ctx.Done() + close(eventCh) + }() + return rv +} + +func (m *monitorConnectionMonitorConnectionsServer) Send(event *networkservice.ConnectionEvent) error { + select { + case <-m.ctx.Done(): + return errors.New("Can no longer Send") + default: + m.eventCh <- event + return nil + } +} + +func (m *monitorConnectionMonitorConnectionsServer) Context() context.Context { + return m.ctx +} + +func (m *monitorConnectionMonitorConnectionsServer) SetHeader(metadata.MD) error { + return nil +} + +func (m *monitorConnectionMonitorConnectionsServer) SendHeader(metadata.MD) error { + return nil +} + +func (m *monitorConnectionMonitorConnectionsServer) SetTrailer(metadata.MD) {} + +func (m *monitorConnectionMonitorConnectionsServer) SendMsg(msg interface{}) error { + if event, ok := msg.(*networkservice.ConnectionEvent); ok { + return m.Send(event) + } + return errors.Errorf("Not type networkservice.ConnectionEvent - msg (%+v)", msg) +} + +func (m *monitorConnectionMonitorConnectionsServer) RecvMsg(msg interface{}) error { + return nil +} diff --git a/pkg/networkservice/core/eventchannel/monitor_connection_server_test.go b/pkg/networkservice/core/eventchannel/monitor_connection_server_test.go new file mode 100644 index 000000000..265651849 --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_connection_server_test.go @@ -0,0 +1,112 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel_test + +import ( + "context" + "testing" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/metadata" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" +) + +func TestMonitorConnection_MonitorConnectionsServer_Send(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + ctx, cancelFunc := context.WithCancel(context.Background()) + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) + eventIn := &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + "foo": { + Id: "foo", + }, + }, + } + assert.Nil(t, mcs.Send(eventIn)) + eventOut := <-eventCh + assert.Equal(t, eventIn, eventOut) + cancelFunc() +} + +func TestMonitorConnection_MonitorConnectionsServer_SendMsg(t *testing.T) { + numEvents := 50 + eventCh := make(chan *networkservice.ConnectionEvent, numEvents) + ctx := context.Background() + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) + + var wrongTypeEvent struct{} + assert.NotNil(t, mcs.SendMsg(wrongTypeEvent)) + eventsIn := make([]*networkservice.ConnectionEvent, numEvents) + for i := 0; i < numEvents; i++ { + eventsIn[i] = &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + "foo": { + Id: "foo", + }, + }, + } + assert.Nil(t, mcs.SendMsg(eventsIn[i])) + eventOut := <-eventCh + assert.Equal(t, eventsIn[i], eventOut) + } +} + +func TestMonitorConnection_MonitorConnectionsServer_RecvMsg(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + ctx := context.Background() + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) + eventIn := &networkservice.ConnectionEvent{} + assert.Nil(t, mcs.RecvMsg(eventIn)) +} + +func TestMonitorConnection_MonitorConnectionsServer_Context(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + ctxIn, cancelFunc := context.WithCancel(context.Background()) + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctxIn, eventCh) + ctxOut := mcs.Context() + cancelFunc() + select { + case <-ctxOut.Done(): + default: + assert.Fail(t, "Mismatched contexts") + } +} + +func TestMonitorConnection_MonitorConnectionsServer_SendHeader(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + ctx := context.Background() + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) + assert.Nil(t, mcs.SendHeader(make(metadata.MD))) +} + +func TestMonitorConnection_MonitorConnectionsServer_SetHeader(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + ctx := context.Background() + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) + assert.Nil(t, mcs.SetHeader(make(metadata.MD))) +} + +func TestMonitorConnection_MonitorConnectionsServer_SetTrailer(t *testing.T) { + eventCh := make(chan *networkservice.ConnectionEvent, 100) + ctx := context.Background() + mcs := eventchannel.NewMonitorConnectionMonitorConnectionsServer(ctx, eventCh) + mcs.SetTrailer(make(metadata.MD)) +} diff --git a/pkg/networkservice/core/eventchannel/monitor_server.go b/pkg/networkservice/core/eventchannel/monitor_server.go new file mode 100644 index 000000000..b6f7f63ff --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_server.go @@ -0,0 +1,98 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel + +import ( + "errors" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/tools/serialize" +) + +type monitorConnectionServer struct { + eventCh <-chan *networkservice.ConnectionEvent + closeCh chan struct{} + servers []networkservice.MonitorConnection_MonitorConnectionsServer + selectors []*networkservice.MonitorScopeSelector + executor serialize.Executor +} + +// NewMonitorServer - returns a networkservice.MonitorConnectionServer +// eventCh - when Send() is called on any of the NewMonitorConnection_MonitorConnectionsServers +// returned by a call to MonitorConnections, it is inserted into eventCh +func NewMonitorServer(eventCh <-chan *networkservice.ConnectionEvent) networkservice.MonitorConnectionServer { + rv := &monitorConnectionServer{ + eventCh: eventCh, + closeCh: make(chan struct{}), + executor: serialize.NewExecutor(), + } + rv.eventLoop() + return rv +} + +func (m *monitorConnectionServer) MonitorConnections(selector *networkservice.MonitorScopeSelector, srv networkservice.MonitorConnection_MonitorConnectionsServer) error { + select { + case <-m.closeCh: + return errors.New("sending is no longer possible") + default: + m.executor.AsyncExec(func() { + m.servers = append(m.servers, srv) + m.selectors = append(m.selectors, selector) + }) + select { + case <-srv.Context().Done(): + case <-m.closeCh: + } + m.executor.AsyncExec(func() { + var newServers []networkservice.MonitorConnection_MonitorConnectionsServer + var newSelectors []*networkservice.MonitorScopeSelector + for i := range m.servers { + if m.servers[i] != srv { + newServers = append(newServers, m.servers[i]) + newSelectors = append(newSelectors, m.selectors[i]) + } + } + m.servers = newServers + m.selectors = newSelectors + }) + return nil + } +} + +func (m *monitorConnectionServer) eventLoop() { + go func() { + for event := range m.eventCh { + e := event + m.executor.AsyncExec(func() { + for i, srv := range m.servers { + filteredEvent := &networkservice.ConnectionEvent{ + Type: e.Type, + Connections: networkservice.FilterMapOnManagerScopeSelector(e.GetConnections(), m.selectors[i]), + } + if filteredEvent.Type == networkservice.ConnectionEventType_INITIAL_STATE_TRANSFER || len(filteredEvent.GetConnections()) > 0 { + // TODO - figure out what if any error handling to do here + _ = srv.Send(filteredEvent) + } + } + }) + } + m.executor.AsyncExec(func() { + close(m.closeCh) + }) + }() +} diff --git a/pkg/networkservice/core/eventchannel/monitor_server_test.go b/pkg/networkservice/core/eventchannel/monitor_server_test.go new file mode 100644 index 000000000..422880192 --- /dev/null +++ b/pkg/networkservice/core/eventchannel/monitor_server_test.go @@ -0,0 +1,86 @@ +// Copyright (c) 2020 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventchannel_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/assert" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel" +) + +func TestMonitorConnectionServer_MonitorConnections(t *testing.T) { + numSenders := 10 + senders := make([]networkservice.MonitorConnection_MonitorConnectionsServer, numSenders) + senderEventChs := make([]chan *networkservice.ConnectionEvent, numSenders) + senderCancelFunc := make([]context.CancelFunc, numSenders) + + numEvents := 10 + eventCh := make(chan *networkservice.ConnectionEvent, numEvents) + selector := &networkservice.MonitorScopeSelector{} // TODO + + server := eventchannel.NewMonitorServer(eventCh) + + for i := 0; i < numSenders; i++ { + var senderCtx context.Context + senderCtx, senderCancelFunc[i] = context.WithCancel(context.Background()) + senderEventChs[i] = make(chan *networkservice.ConnectionEvent, numEvents) + sender := eventchannel.NewMonitorConnectionMonitorConnectionsServer(senderCtx, senderEventChs[i]) + senders[i] = sender + go func() { + err := server.MonitorConnections(selector, sender) + assert.Nil(t, err) + }() + } + // Give the go functions calling server.MonitorConnections(selector,sender) a chance to run + <-time.After(time.Millisecond) + senderCancelFunc[numSenders-1]() + eventsIn := make([]*networkservice.ConnectionEvent, numEvents) + for i := 0; i < numEvents; i++ { + eventsIn[i] = &networkservice.ConnectionEvent{ + Type: networkservice.ConnectionEventType_UPDATE, + Connections: map[string]*networkservice.Connection{ + fmt.Sprintf("%d", i): { + Id: fmt.Sprintf("%d", i), + }, + }, + } + eventCh <- eventsIn[i] + } + senderCancelFunc[numSenders-2]() + for i := 0; i < numSenders-2; i++ { + for j := 0; j < numEvents; j++ { + event, ok := <-senderEventChs[i] + assert.True(t, ok) + assert.Equal(t, eventsIn[j], event) + } + } + close(eventCh) + senderEventCh := make(chan *networkservice.ConnectionEvent, numEvents) + srv := eventchannel.NewMonitorConnectionMonitorConnectionsServer(context.Background(), senderEventCh) + for { + err := server.MonitorConnections(&networkservice.MonitorScopeSelector{}, srv) + if err != nil { + break + } + } +}