Skip to content

Commit

Permalink
Add upstreamrefresh chain element (networkservicemesh#1324)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
Signed-off-by: anastasia.malysheva <anastasia.malysheva@xored.com>
  • Loading branch information
glazychev-art authored and anastasia-malysheva committed Jul 27, 2022
1 parent c891357 commit 9eb8e4f
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 0 deletions.
91 changes: 91 additions & 0 deletions pkg/networkservice/common/upstreamrefresh/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package upstreamrefresh

import (
"context"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/extend"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
)

type upstreamRefreshClient struct {
chainCtx context.Context
localNotifier *notifier
}

// NewClient - returns a new upstreamrefresh chain element
func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkServiceClient {
o := &options{}
for _, opt := range opts {
opt(o)
}

return &upstreamRefreshClient{
chainCtx: chainCtx,
localNotifier: o.localNotifier,
}
}

func (u *upstreamRefreshClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
closeCtxFunc := postpone.ContextWithValues(ctx)
// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx); loaded {
cancelEventLoop()
}

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}

u.localNotifier.subscribe(conn.GetId())

cc, ccLoaded := clientconn.Load(ctx)
if ccLoaded {
cancelEventLoop, eventLoopErr := newEventLoop(
extend.WithValuesFromContext(u.chainCtx, ctx), cc, conn, u.localNotifier)
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, conn)
return nil, errors.Wrap(eventLoopErr, "unable to monitor")
}
store(ctx, cancelEventLoop)
}
return conn, nil
}

func (u *upstreamRefreshClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
// Unsubscribe from local notifications
u.localNotifier.unsubscribe(conn.GetId())

// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx); loaded {
cancelEventLoop()
}

return next.Client(ctx).Close(ctx, conn, opts...)
}
97 changes: 97 additions & 0 deletions pkg/networkservice/common/upstreamrefresh/client_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package upstreamrefresh

import (
"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type clientFilter struct {
conn *networkservice.Connection
networkservice.MonitorConnection_MonitorConnectionsClient

logger log.Logger
}

func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsClient, conn *networkservice.Connection, logger log.Logger) networkservice.MonitorConnection_MonitorConnectionsClient {
return &clientFilter{
MonitorConnection_MonitorConnectionsClient: client,
conn: conn,

logger: logger,
}
}

func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
for {
eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv()
if err != nil {
return nil, err
}
eventOut := &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() != networkservice.ConnectionEventType_UPDATE {
continue
}

connIn = connIn.Clone()
// If we don't have enough PathSegments connIn doesn't match e.conn
if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) {
continue
}
// If the e.conn isn't in the expected PathSegment connIn doesn't match e.conn
if connIn.GetPath().GetPathSegments()[int(c.conn.GetPath().GetIndex())].GetId() != c.conn.GetId() {
continue
}
// If the current index isn't the index of e.conn or what comes after it connIn doesn't match e.conn
if !(connIn.GetPath().GetIndex() == c.conn.GetPath().GetIndex() || connIn.GetPath().GetIndex() == c.conn.GetPath().GetIndex()+1) {
continue
}

// Construct the outgoing Connection
connOut := c.conn.Clone()
connOut.Path = connIn.Path
connOut.GetPath().Index = c.conn.GetPath().GetIndex()
connOut.Context = connIn.Context
connOut.State = connIn.State

// If it's deleted, mark the event state down
if eventIn.GetType() == networkservice.ConnectionEventType_DELETE {
connOut.State = networkservice.State_DOWN
}

// If the connection hasn't changed... don't send the event
if connOut.Equals(c.conn) {
continue
}

// Add the Connection to the outgoing event
eventOut.GetConnections()[connOut.GetId()] = connOut

// Update the event we are watching for:
c.conn = connOut
}
if len(eventOut.GetConnections()) > 0 {
return eventOut, nil
}
}
}
19 changes: 19 additions & 0 deletions pkg/networkservice/common/upstreamrefresh/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package upstreamrefresh provides a client chain element that receives monitor connectionEvents
// and processes those that have refresh_requested state
package upstreamrefresh
141 changes: 141 additions & 0 deletions pkg/networkservice/common/upstreamrefresh/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package upstreamrefresh

import (
"context"

"github.com/pkg/errors"
"google.golang.org/grpc"

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type eventLoop struct {
eventLoopCtx context.Context
conn *networkservice.Connection
eventFactory begin.EventFactory
client networkservice.MonitorConnection_MonitorConnectionsClient
localNotifier *notifier
logger log.Logger
}

func newEventLoop(ctx context.Context, cc grpc.ClientConnInterface, conn *networkservice.Connection, ln *notifier) (context.CancelFunc, error) {
conn = conn.Clone()

ev := begin.FromContext(ctx)
if ev == nil {
return func() {}, nil
}

// Create new eventLoopCtx and store its eventLoopCancel
eventLoopCtx, eventLoopCancel := context.WithCancel(ctx)

// Create selector to only ask for events related to our Connection
selector := &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Id: conn.GetCurrentPathSegment().GetId(),
Name: conn.GetCurrentPathSegment().GetName(),
},
},
}

client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector)
if err != nil {
eventLoopCancel()
return nil, errors.WithStack(err)
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.WithStack(err)
}

logger := log.FromContext(ctx).WithField("upstreamrefresh", "eventLoop")
cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
eventFactory: ev,
client: newClientFilter(client, conn, logger),
localNotifier: ln,
logger: logger,
}

// Start the eventLoop
go cev.eventLoop()
return eventLoopCancel, nil
}

func (cev *eventLoop) eventLoop() {
upstreamCh := cev.monitorUpstream()
var localCh <-chan struct{}

if cev.localNotifier.get(cev.conn.GetId()) != nil {
localCh = cev.localNotifier.get(cev.conn.GetId())
}

select {
case _, ok := <-upstreamCh:
if !ok {
// Connection closed
return
}
cev.logger.Debug("refresh requested from upstream")

<-cev.eventFactory.Request()
cev.localNotifier.notify(cev.eventLoopCtx, cev.conn.GetId())

case _, ok := <-localCh:
if !ok {
// Unsubscribed
return
}
cev.logger.Debug("refresh requested from other connection")
<-cev.eventFactory.Request()
case <-cev.eventLoopCtx.Done():
return
}
}

func (cev *eventLoop) monitorUpstream() <-chan struct{} {
res := make(chan struct{}, 1)

go func() {
defer close(res)

for {
eventIn, err := cev.client.Recv()
if cev.eventLoopCtx.Err() != nil || err != nil {
return
}

// Handle event
if eventIn.GetConnections()[cev.conn.GetId()].GetState() == networkservice.State_REFRESH_REQUESTED {
res <- struct{}{}
return
}
}
}()

return res
}
28 changes: 28 additions & 0 deletions pkg/networkservice/common/upstreamrefresh/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package upstreamrefresh

import (
"sync"
)

type typeCh = chan struct{}

//go:generate go-syncmap -output notifier_map.gen.go -type notifierMap<string,typeCh>

// clientMap - sync.Map with key == string and value == chan struct{}
type notifierMap sync.Map
Loading

0 comments on commit 9eb8e4f

Please sign in to comment.