Skip to content

Commit

Permalink
Add eventchannel networkservice chain element (#105)
Browse files Browse the repository at this point in the history
This provides machinery to allow the creation of various MonitorConnection clients/servers backed
by event channels.  This aids in construction of both adapters and the building of tests.

Signed-off-by: Ed Warnicke <hagbard@gmail.com>
  • Loading branch information
edwarnicke authored Feb 16, 2020
1 parent 6a49ecf commit b0081ad
Show file tree
Hide file tree
Showing 8 changed files with 729 additions and 0 deletions.
85 changes: 85 additions & 0 deletions pkg/networkservice/core/eventchannel/monitor_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package eventchannel provides implementations based on event channels of:
// networkservice.MonitorConnectionClient
// networkservice.MonitorConnectionServer
// networkservice.MonitorConnection_MonitorConnectionsClient
// networkservice.MonitorConnection_MonitorConnectionsServer
package eventchannel

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"

"github.com/networkservicemesh/sdk/pkg/tools/serialize"
)

type monitorConnectionClient struct {
eventCh <-chan *networkservice.ConnectionEvent
fanoutEventChs []chan *networkservice.ConnectionEvent
updateExecutor serialize.Executor
}

// NewMonitorConnectionClient - returns networkservice.MonitorConnectionClient
// eventCh - channel that provides events to feed the Recv function
// when an event is sent on the eventCh, all networkservice.MonitorConnection_MonitorConnectionsClient
// returned from calling MonitorConnections receive the event.
// Note: Does not perform filtering basedon MonitorScopeSelector
func NewMonitorConnectionClient(eventCh <-chan *networkservice.ConnectionEvent) networkservice.MonitorConnectionClient {
rv := &monitorConnectionClient{
eventCh: eventCh,
updateExecutor: serialize.NewExecutor(),
}
rv.eventLoop()
return rv
}

func (m *monitorConnectionClient) MonitorConnections(ctx context.Context, in *networkservice.MonitorScopeSelector, opts ...grpc.CallOption) (networkservice.MonitorConnection_MonitorConnectionsClient, error) {
fanoutEventCh := make(chan *networkservice.ConnectionEvent, 100)
m.updateExecutor.AsyncExec(func() {
m.fanoutEventChs = append(m.fanoutEventChs, fanoutEventCh)
go func() {
<-ctx.Done()
m.updateExecutor.AsyncExec(func() {
var newFanoutEventChs []chan *networkservice.ConnectionEvent
for _, ch := range m.fanoutEventChs {
if ch != fanoutEventCh {
newFanoutEventChs = append(newFanoutEventChs, ch)
}
}
m.fanoutEventChs = newFanoutEventChs
close(fanoutEventCh)
})
}()
})
return NewMonitorConnectionMonitorConnectionsClient(fanoutEventCh), nil
}

func (m *monitorConnectionClient) eventLoop() {
go func() {
for event := range m.eventCh {
e := event
m.updateExecutor.AsyncExec(func() {
for _, fanoutEventCh := range m.fanoutEventChs {
fanoutEventCh <- e
}
})
}
}()
}
69 changes: 69 additions & 0 deletions pkg/networkservice/core/eventchannel/monitor_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package eventchannel_test

import (
"context"
"testing"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/assert"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel"
)

func TestNewMonitorConnectionClient_MonitorConnections(t *testing.T) {
maxReceivers := 10
receivers := make([]networkservice.MonitorConnection_MonitorConnectionsClient, maxReceivers)
cancelFuncs := make([]context.CancelFunc, maxReceivers)

numEvents := 10
eventCh := make(chan *networkservice.ConnectionEvent, numEvents)
client := eventchannel.NewMonitorConnectionClient(eventCh)
for i := 0; i < maxReceivers; i++ {
var ctx context.Context
ctx, cancelFuncs[i] = context.WithCancel(context.Background())
receiver, err := client.MonitorConnections(ctx, nil)
receivers[i] = receiver
assert.Nil(t, err)
assert.NotNil(t, receivers[i])
}
eventsIn := make([]*networkservice.ConnectionEvent, numEvents)
for i := 0; i < numEvents; i++ {
eventsIn[i] = &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{
string(i): {
Id: (string(i)),
},
},
}
eventCh <- eventsIn[i]
}
for i := 0; i < maxReceivers; i++ {
for j := 0; j < numEvents; j++ {
eventOut, err := receivers[i].Recv()
assert.Nil(t, err)
assert.Equal(t, eventsIn[j], eventOut)
}
}

cancelFuncs[0]()
_, err := receivers[0].Recv()
assert.NotNil(t, err)
close(eventCh)
}
84 changes: 84 additions & 0 deletions pkg/networkservice/core/eventchannel/monitor_connection_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package eventchannel

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"
)

type monitorConnectionMonitorConnectionsClient struct {
eventCh <-chan *networkservice.ConnectionEvent
ctx context.Context
cancelFunc context.CancelFunc
}

// NewMonitorConnectionMonitorConnectionsClient - returns a networkservice.MonitorConnection_MonitorConnectionsClient
// eventCh - when an event is sent on eventCh, it is returned by the
// call to Recv on the networkservice.MonitorConnection_MonitorConnectionsClient
func NewMonitorConnectionMonitorConnectionsClient(eventCh <-chan *networkservice.ConnectionEvent) networkservice.MonitorConnection_MonitorConnectionsClient {
ctx, cancelFunc := context.WithCancel(context.Background())
return &monitorConnectionMonitorConnectionsClient{
eventCh: eventCh,
ctx: ctx,
cancelFunc: cancelFunc,
}
}

func (m *monitorConnectionMonitorConnectionsClient) Recv() (*networkservice.ConnectionEvent, error) {
event, ok := <-m.eventCh
if !ok {
m.cancelFunc()
return nil, errors.New("No more events, chan closed by sender")
}
return event, nil
}

func (m *monitorConnectionMonitorConnectionsClient) Header() (metadata.MD, error) {
return make(metadata.MD), nil
}

func (m *monitorConnectionMonitorConnectionsClient) Trailer() metadata.MD {
return make(metadata.MD)
}

func (m *monitorConnectionMonitorConnectionsClient) CloseSend() error {
return nil
}

func (m *monitorConnectionMonitorConnectionsClient) Context() context.Context {
return m.ctx
}

func (m *monitorConnectionMonitorConnectionsClient) SendMsg(msg interface{}) error {
return nil
}

func (m *monitorConnectionMonitorConnectionsClient) RecvMsg(msg interface{}) error {
if event, ok := msg.(*networkservice.ConnectionEvent); ok {
e, err := m.Recv()
if err != nil {
return err
}
*event = *e
return nil
}
return errors.Errorf("Not type networkservice.ConnectionEvent - msg (%+v)", msg)
}
115 changes: 115 additions & 0 deletions pkg/networkservice/core/eventchannel/monitor_connection_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2020 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package eventchannel_test

import (
"testing"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/stretchr/testify/assert"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/eventchannel"
)

func TestMonitorConnection_MonitorConnectionsClient_Recv(t *testing.T) {
numEvents := 50
eventCh := make(chan *networkservice.ConnectionEvent, numEvents)
mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh)
eventsIn := make([]*networkservice.ConnectionEvent, numEvents)
for i := 0; i < numEvents; i++ {
eventsIn[i] = &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{
string(i): {
Id: (string(i)),
},
},
}
eventCh <- eventsIn[i]
}
for i := 0; i < 50; i++ {
eventOut, err := mcc.Recv()
assert.Nil(t, err)
assert.Equal(t, eventsIn[i], eventOut)
}
close(eventCh)
_, err := mcc.Recv()
assert.NotNil(t, err)
select {
case <-mcc.Context().Done():
default:
assert.Fail(t, "Context should be Done")
}
}

func TestMonitorConnection_MonitorConnectionsClient_RecvMsg(t *testing.T) {
eventCh := make(chan *networkservice.ConnectionEvent, 100)
mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh)

var wrongTypeEvent struct{}
err := mcc.RecvMsg(wrongTypeEvent)
assert.NotNil(t, err)

eventIn := &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: map[string]*networkservice.Connection{
"foo": {
Id: "foo",
},
},
}
eventCh <- eventIn
eventOut := &networkservice.ConnectionEvent{}
err = mcc.RecvMsg(eventOut)
assert.Nil(t, err)
assert.Equal(t, eventIn, eventOut)

close(eventCh)
err = mcc.RecvMsg(eventOut)
assert.NotNil(t, err)
select {
case <-mcc.Context().Done():
default:
assert.Fail(t, "Context should be Done")
}
}

func TestMonitorConnection_MonitorConnectionsClient_CloseSend(t *testing.T) {
eventCh := make(chan *networkservice.ConnectionEvent)
mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh)
assert.Nil(t, mcc.CloseSend())
}

func TestMonitorConnection_MonitorConnectionsClient_Header(t *testing.T) {
eventCh := make(chan *networkservice.ConnectionEvent)
mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh)
header, err := mcc.Header()
assert.Nil(t, err)
assert.NotNil(t, header)
}

func TestMonitorConnection_MonitorConnectionsClient_SendMsg(t *testing.T) {
eventCh := make(chan *networkservice.ConnectionEvent)
mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh)
assert.Nil(t, mcc.SendMsg(nil))
}

func TestMonitorConnection_MonitorConnectionsClient_Trailer(t *testing.T) {
eventCh := make(chan *networkservice.ConnectionEvent)
mcc := eventchannel.NewMonitorConnectionMonitorConnectionsClient(eventCh)
assert.NotNil(t, mcc.Trailer())
}
Loading

0 comments on commit b0081ad

Please sign in to comment.