Skip to content

Commit

Permalink
fix issue sdk#1015 (#1016)
Browse files Browse the repository at this point in the history
Signed-off-by: denis-tingajkin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin authored Jul 13, 2021
1 parent d11653e commit 07ca121
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 17 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ require (
gonum.org/v1/gonum v0.6.2
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.2.2
)
30 changes: 28 additions & 2 deletions pkg/networkservice/chains/nsmgrproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/uuid"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

registryapi "github.com/networkservicemesh/api/pkg/api/registry"

Expand All @@ -42,10 +43,13 @@ import (
registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect"
"github.com/networkservicemesh/sdk/pkg/registry/common/proxy"
"github.com/networkservicemesh/sdk/pkg/registry/common/seturl"
registryswapip "github.com/networkservicemesh/sdk/pkg/registry/common/swapip"
registryadapter "github.com/networkservicemesh/sdk/pkg/registry/core/adapters"
"github.com/networkservicemesh/sdk/pkg/registry/core/chain"
"github.com/networkservicemesh/sdk/pkg/tools/addressof"
"github.com/networkservicemesh/sdk/pkg/tools/fs"
"github.com/networkservicemesh/sdk/pkg/tools/grpcutils"
"github.com/networkservicemesh/sdk/pkg/tools/log"
"github.com/networkservicemesh/sdk/pkg/tools/token"
)

Expand All @@ -71,6 +75,27 @@ type serverOptions struct {
registryConnectOptions []registryconnect.Option
}

func (s *serverOptions) openMapIPChannel(ctx context.Context) <-chan map[string]string {
var r = make(chan map[string]string)
var fCh = fs.WatchFile(ctx, s.mapipFilePath)
go func() {
defer close(r)
for data := range fCh {
var m map[string]string
if err := yaml.Unmarshal(data, &m); err != nil {
log.FromContext(ctx).Errorf("An error during umarshal ipmap: %v", err.Error())
continue
}
select {
case <-ctx.Done():
return
case r <- m:
}
}
}()
return r
}

// Option modifies option value
type Option func(o *serverOptions)

Expand All @@ -84,7 +109,7 @@ func WithName(name string) Option {
// WithAuthorizeServer sets authorize server for the server
func WithAuthorizeServer(authorizeServer networkservice.NetworkServiceServer) Option {
if authorizeServer == nil {
panic("Authorize server cannot be nil")
panic("authorizeServer cannot be nil")
}

return func(o *serverOptions) {
Expand Down Expand Up @@ -156,7 +181,7 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to
endpoint.WithAdditionalFunctionality(
interdomainurl.NewServer(&nseStockServer),
discover.NewServer(nsClient, nseClient),
swapip.NewServer(ctx, opts.mapipFilePath),
swapip.NewServer(opts.openMapIPChannel(ctx)),
heal.NewServer(ctx,
heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(rv)))),
connect.NewServer(ctx,
Expand All @@ -175,6 +200,7 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to
var nseServerChain = chain.NewNetworkServiceEndpointRegistryServer(
proxy.NewNetworkServiceEndpointRegistryServer(proxyURL),
seturl.NewNetworkServiceEndpointRegistryServer(opts.listenOn),
registryswapip.NewNetworkServiceEndpointRegistryServer(opts.openMapIPChannel(ctx)),
nseStockServer,
registryconnect.NewNetworkServiceEndpointRegistryServer(ctx, opts.registryConnectOptions...),
)
Expand Down
16 changes: 3 additions & 13 deletions pkg/networkservice/common/swapip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ import (
"context"
"sync/atomic"

"github.com/ghodss/yaml"
"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/fs"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type swapIPServer struct {
Expand Down Expand Up @@ -83,19 +80,12 @@ func (i *swapIPServer) Close(ctx context.Context, conn *networkservice.Connectio
}

// NewServer creates new swap chain element. Expects public IP address of node
func NewServer(ctx context.Context, pathToDir string) networkservice.NetworkServiceServer {
func NewServer(updateIPMapCh <-chan map[string]string) networkservice.NetworkServiceServer {
var v = new(atomic.Value)
v.Store(map[string]string{})
go func() {
logger := log.FromContext(ctx).WithField("swapIPServer", "monitor map ip")
for data := range fs.WatchFile(ctx, pathToDir) {
var m map[string]string
err := yaml.Unmarshal(data, &m)
if err != nil {
logger.Error(err.Error())
continue
}
v.Store(m)
for data := range updateIPMapCh {
v.Store(data)
}
}()
return &swapIPServer{internalToExternalMap: v}
Expand Down
23 changes: 21 additions & 2 deletions pkg/networkservice/common/swapip/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/ghodss/yaml"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common"
"github.com/stretchr/testify/require"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse"
"github.com/networkservicemesh/sdk/pkg/tools/fs"
)

func TestSwapIPServer_Request(t *testing.T) {
Expand All @@ -50,14 +52,17 @@ func TestSwapIPServer_Request(t *testing.T) {
err = ioutil.WriteFile(p2, []byte(`172.16.2.100: 172.16.1.100`), os.ModePerm)
require.NoError(t, err)

ch1 := convertBytesChToMapCh(fs.WatchFile(ctx, p1))
ch2 := convertBytesChToMapCh(fs.WatchFile(ctx, p2))

var testChain = next.NewNetworkServiceServer(
checkresponse.NewServer(t, func(t *testing.T, c *networkservice.Connection) {
require.Equal(t, "172.16.2.10", c.Mechanism.Parameters[common.SrcIP])
require.Equal(t, "", c.Mechanism.Parameters[common.SrcOriginalIP])
require.Equal(t, "172.16.1.100", c.Mechanism.Parameters[common.DstIP])
require.Equal(t, "172.16.2.100", c.Mechanism.Parameters[common.DstOriginalIP])
}),
swapip.NewServer(ctx, p1),
swapip.NewServer(ch1),
checkrequest.NewServer(t, func(t *testing.T, r *networkservice.NetworkServiceRequest) {
require.Equal(t, "172.16.1.10", r.Connection.Mechanism.Parameters[common.SrcIP])
require.Equal(t, "172.16.2.10", r.Connection.Mechanism.Parameters[common.SrcOriginalIP])
Expand All @@ -66,7 +71,7 @@ func TestSwapIPServer_Request(t *testing.T) {
require.Equal(t, "172.16.1.100", c.Mechanism.Parameters[common.DstIP])
require.Equal(t, "172.16.2.100", c.Mechanism.Parameters[common.DstOriginalIP])
}),
swapip.NewServer(ctx, p2),
swapip.NewServer(ch2),
checkrequest.NewServer(t, func(t *testing.T, r *networkservice.NetworkServiceRequest) {
require.Equal(t, "", r.Connection.Mechanism.Parameters[common.DstOriginalIP])
require.Equal(t, "", r.Connection.Mechanism.Parameters[common.DstIP])
Expand All @@ -93,3 +98,17 @@ func TestSwapIPServer_Request(t *testing.T) {
_, err = testChain.Request(ctx, &networkservice.NetworkServiceRequest{Connection: resp})
require.NoError(t, err)
}

func convertBytesChToMapCh(in <-chan []byte) <-chan map[string]string {
var out = make(chan map[string]string)
go func() {
for data := range in {
var r map[string]string
_ = yaml.Unmarshal(data, &r)
out <- r
}
close(out)
}()

return out
}
111 changes: 111 additions & 0 deletions pkg/registry/common/swapip/nse_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2021 Doc.ai and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package swapip allows to replace internal NSE address to external for register/unregister/find queries.
package swapip

import (
"context"
"net"
"net/url"
"sync/atomic"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/registry"

"github.com/networkservicemesh/sdk/pkg/registry/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

type swapIPFindNSEServer struct {
registry.NetworkServiceEndpointRegistry_FindServer
m map[string]string
ctx context.Context
}

func (s *swapIPFindNSEServer) Send(nse *registry.NetworkServiceEndpoint) error {
trySwapIP(s.ctx, nse, s.m)
return s.NetworkServiceEndpointRegistry_FindServer.Send(nse)
}

type swapIPNSEServer struct {
swapIPMap *atomic.Value
}

func (n *swapIPNSEServer) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) {
m := n.swapIPMap.Load().(map[string]string)
trySwapIP(ctx, nse, m)
resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse)
if err == nil {
trySwapIP(ctx, resp, m)
}
return resp, err
}

func (n *swapIPNSEServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error {
m := n.swapIPMap.Load().(map[string]string)
trySwapIP(server.Context(), query.NetworkServiceEndpoint, m)
return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, &swapIPFindNSEServer{NetworkServiceEndpointRegistry_FindServer: server, m: m, ctx: server.Context()})
}

func (n *swapIPNSEServer) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*empty.Empty, error) {
m := n.swapIPMap.Load().(map[string]string)
trySwapIP(ctx, nse, m)
return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse)
}

func trySwapIP(ctx context.Context, nse *registry.NetworkServiceEndpoint, ipMap map[string]string) {
logger := log.FromContext(ctx)

u, err := url.Parse(nse.Url)
defer func() {
if err != nil {
logger.Debugf("can not parse incomming url: %v, err: %v", nse.Url, err)
}
}()

if err != nil {
return
}

h, p, err := net.SplitHostPort(u.Host)

if err != nil {
return
}

if v, ok := ipMap[h]; ok {
logger.Debugf("swapping %v to %v", h, v)
u.Host = net.JoinHostPort(v, p)
nse.Url = u.String()
}
}

// NewNetworkServiceEndpointRegistryServer creates a new seturl registry.NetworkServiceEndpointRegistryServer
func NewNetworkServiceEndpointRegistryServer(updateMapCh <-chan map[string]string) registry.NetworkServiceEndpointRegistryServer {
var v = new(atomic.Value)
v.Store(map[string]string{})

go func() {
for m := range updateMapCh {
v.Store(m)
}
}()

return &swapIPNSEServer{
swapIPMap: v,
}
}
Loading

0 comments on commit 07ca121

Please sign in to comment.