Skip to content

Commit

Permalink
Very very rough first pass at heal.
Browse files Browse the repository at this point in the history
Signed-off-by: Ed Warnicke <hagbard@gmail.com>
  • Loading branch information
edwarnicke committed Oct 18, 2021
1 parent 929eceb commit cb1c868
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 0 deletions.
81 changes: 81 additions & 0 deletions pkg/networkservice/common/heal/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heal

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/clientconn"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/postpone"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

type healClient struct {
chainCtx context.Context
livelinessCheck LivelinessCheck
}

func NewClient(chainCtx context.Context, opts ...Option) networkservice.NetworkServiceClient {
o := &option{
livelinessCheck: func(_ *networkservice.Connection) bool { return false },
}
for _, opt := range opts {
opt(o)
}
return &healClient{
chainCtx: chainCtx,
livelinessCheck: o.livelinessCheck,
}
}

func (h *healClient) Request(ctx context.Context, request *networkservice.NetworkServiceRequest, opts ...grpc.CallOption) (*networkservice.Connection, error) {
closeCtxFunc := postpone.ContextWithValues(ctx)
// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx); loaded {
cancelEventLoop()
}

conn, err := next.Client(ctx).Request(ctx, request, opts...)
if err != nil {
return nil, err
}
cc, ccLoaded := clientconn.Load(ctx)
if ccLoaded {
cancelEventLoop, eventLoopErr := newEventLoop(h.chainCtx, begin.FromContext(ctx), cc, conn, h.livelinessCheck)
if eventLoopErr != nil {
closeCtx, closeCancel := closeCtxFunc()
defer closeCancel()
_, _ = next.Client(closeCtx).Close(closeCtx, conn)
return nil, errors.Wrap(eventLoopErr, "unable to monitor")
}
store(ctx, cancelEventLoop)
}
return conn, nil
}

func (h *healClient) Close(ctx context.Context, conn *networkservice.Connection, opts ...grpc.CallOption) (*emptypb.Empty, error) {
// Cancel any existing eventLoop
if cancelEventLoop, loaded := loadAndDelete(ctx); loaded {
cancelEventLoop()
}
return next.Client(ctx).Close(ctx, conn)
}
90 changes: 90 additions & 0 deletions pkg/networkservice/common/heal/client_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heal

import (
"github.com/networkservicemesh/api/pkg/api/networkservice"
)

type clientFilter struct {
conn *networkservice.Connection
networkservice.MonitorConnection_MonitorConnectionsClient
}

func newClientFilter(client networkservice.MonitorConnection_MonitorConnectionsClient, conn *networkservice.Connection) networkservice.MonitorConnection_MonitorConnectionsClient {
return &clientFilter{
MonitorConnection_MonitorConnectionsClient: client,
conn: conn,
}
}

func (c *clientFilter) Recv() (*networkservice.ConnectionEvent, error) {
for {
eventIn, err := c.MonitorConnection_MonitorConnectionsClient.Recv()
if err != nil {
return nil, err
}
eventOut := &networkservice.ConnectionEvent{
Type: networkservice.ConnectionEventType_UPDATE,
Connections: make(map[string]*networkservice.Connection),
}
for _, connIn := range eventIn.GetConnections() {
if eventIn.GetType() == networkservice.ConnectionEventType_DELETE {
connIn = connIn.Clone()
connIn.State = networkservice.State_DOWN
}
// If we don't have enough PathSegments connIn doesn't match e.conn
if len(connIn.GetPath().GetPathSegments()) < int(c.conn.GetPath().GetIndex()+1) {
continue
}
// If the e.conn isn't in the expected PathSegment connIn doesn't match e.conn
if connIn.GetPath().GetPathSegments()[int(c.conn.GetPath().GetIndex())].GetId() != c.conn.GetId() {
continue
}
// If the current index isn't the index of e.conn or what comes after it connIn doesn't match e.conn
if !(connIn.GetPath().GetIndex() == c.conn.GetPath().GetIndex() || connIn.GetPath().GetIndex() == c.conn.GetPath().GetIndex()+1) {
continue
}

// Construct the outgoing Connection
connOut := c.conn.Clone()
connOut.Path = connIn.Path
connOut.GetPath().Index = c.conn.GetPath().GetIndex()
connOut.Context = connIn.Context
connOut.State = connIn.State

// If it's deleted, mark the event state down
if eventIn.GetType() == networkservice.ConnectionEventType_DELETE {
connOut.State = networkservice.State_DOWN
}

// If the connection hasn't changed... don't send the event
if connOut.Equals(c.conn) {
continue
}

// Add the Connection to the outgoing event
eventOut.GetConnections()[connOut.GetId()] = connOut

// Update the event we are watching for:
c.conn = connOut
}
if len(eventOut.GetConnections()) > 0 {
return eventOut, nil
}
}
}
94 changes: 94 additions & 0 deletions pkg/networkservice/common/heal/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heal

import (
"context"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

type eventLoop struct {
eventLoopCtx context.Context
conn *networkservice.Connection
eventFactory begin.EventFactory
client networkservice.MonitorConnection_MonitorConnectionsClient
livelinessCheck LivelinessCheck
}

func newEventLoop(ctx context.Context, ev begin.EventFactory, cc grpc.ClientConnInterface, conn *networkservice.Connection, livelinessCheck LivelinessCheck) (context.CancelFunc, error) {
conn = conn.Clone()
// Is another chain element asking for events? If not, no need to monitor
if ev == nil {
return func() {}, nil
}

// Create new eventLoopCtx and store its eventLoopCancel
eventLoopCtx, eventLoopCancel := context.WithCancel(ctx)

// Create selector to only ask for events related to our Connection
selector := &networkservice.MonitorScopeSelector{
PathSegments: []*networkservice.PathSegment{
{
Id: conn.GetCurrentPathSegment().GetId(),
Name: conn.GetCurrentPathSegment().GetName(),
},
},
}

client, err := networkservice.NewMonitorConnectionClient(cc).MonitorConnections(eventLoopCtx, selector)
if err != nil {
eventLoopCancel()
return nil, errors.WithStack(err)
}

// get the initial state transfer and use it to detect whether we have a real connection or not
_, err = client.Recv()
if err != nil {
eventLoopCancel()
return nil, errors.WithStack(err)
}

cev := &eventLoop{
eventLoopCtx: eventLoopCtx,
conn: conn,
eventFactory: ev,
client: newClientFilter(client, conn),
livelinessCheck: livelinessCheck,
}

// Start the eventLoop
go cev.eventLoop()
return eventLoopCancel, nil
}

func (cev *eventLoop) eventLoop() {
for {
eventIn, err := cev.client.Recv()
if cev.eventLoopCtx.Err() != nil {
return
}
if (err != nil || eventIn.GetConnections()[cev.conn.GetId()].GetState() == networkservice.State_DOWN) &&
!cev.livelinessCheck(cev.conn) {
_ = cev.eventFactory.Request(begin.WithReselect())
return
}
}
}
41 changes: 41 additions & 0 deletions pkg/networkservice/common/heal/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heal

import (
"context"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type key struct{}

// store sets the context.CancelFunc stored in per Connection.Id metadata.
func store(ctx context.Context, cancel context.CancelFunc) {
metadata.Map(ctx, true).Store(key{}, cancel)
}

// loadAndDelete deletes the context.CancelFunc stored in per Connection.Id metadata,
// returning the previous value if any. The loaded result reports whether the key was present.
func loadAndDelete(ctx context.Context) (value context.CancelFunc, ok bool) {
rawValue, ok := metadata.Map(ctx, true).LoadAndDelete(key{})
if !ok {
return
}
value, ok = rawValue.(context.CancelFunc)
return value, ok
}
35 changes: 35 additions & 0 deletions pkg/networkservice/common/heal/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2021 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heal

import (
"github.com/networkservicemesh/api/pkg/api/networkservice"
)

type LivelinessCheck func(conn *networkservice.Connection) bool

type option struct {
livelinessCheck LivelinessCheck
}

type Option func(o *option)

func WithLivelinessCheck(livelinessCheck LivelinessCheck) Option {
return func(o *option) {
o.livelinessCheck = livelinessCheck
}
}

0 comments on commit cb1c868

Please sign in to comment.