From b55a2a042f23930a5c20bf4699fe88f5238a64cd Mon Sep 17 00:00:00 2001 From: denis-tingajkin Date: Mon, 12 Jul 2021 23:38:50 +0700 Subject: [PATCH] fix issue sdk#1015 Signed-off-by: denis-tingajkin --- go.mod | 1 + .../chains/nsmgrproxy/server.go | 30 +++- pkg/networkservice/common/swapip/server.go | 16 +-- .../common/swapip/server_test.go | 23 +++- pkg/registry/common/swapip/nse_registry.go | 111 +++++++++++++++ .../common/swapip/nse_registry_test.go | 130 ++++++++++++++++++ .../utils/checks/checknse/nse_server.go | 1 + 7 files changed, 295 insertions(+), 17 deletions(-) create mode 100644 pkg/registry/common/swapip/nse_registry.go create mode 100644 pkg/registry/common/swapip/nse_registry_test.go diff --git a/go.mod b/go.mod index 41983d3f7..f891668ec 100644 --- a/go.mod +++ b/go.mod @@ -35,4 +35,5 @@ require ( gonum.org/v1/gonum v0.6.2 google.golang.org/grpc v1.35.0 google.golang.org/protobuf v1.25.0 + gopkg.in/yaml.v2 v2.2.2 ) diff --git a/pkg/networkservice/chains/nsmgrproxy/server.go b/pkg/networkservice/chains/nsmgrproxy/server.go index c8f14e116..18401efbf 100644 --- a/pkg/networkservice/chains/nsmgrproxy/server.go +++ b/pkg/networkservice/chains/nsmgrproxy/server.go @@ -24,6 +24,7 @@ import ( "github.com/google/uuid" "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" + "gopkg.in/yaml.v2" registryapi "github.com/networkservicemesh/api/pkg/api/registry" @@ -42,10 +43,13 @@ import ( registryconnect "github.com/networkservicemesh/sdk/pkg/registry/common/connect" "github.com/networkservicemesh/sdk/pkg/registry/common/proxy" "github.com/networkservicemesh/sdk/pkg/registry/common/seturl" + registryswapip "github.com/networkservicemesh/sdk/pkg/registry/common/swapip" registryadapter "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" "github.com/networkservicemesh/sdk/pkg/registry/core/chain" "github.com/networkservicemesh/sdk/pkg/tools/addressof" + "github.com/networkservicemesh/sdk/pkg/tools/fs" "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/log" "github.com/networkservicemesh/sdk/pkg/tools/token" ) @@ -71,6 +75,27 @@ type serverOptions struct { registryConnectOptions []registryconnect.Option } +func (s *serverOptions) openMapIPChannel(ctx context.Context) <-chan map[string]string { + var r = make(chan map[string]string) + var fCh = fs.WatchFile(ctx, s.mapipFilePath) + go func() { + defer close(r) + for data := range fCh { + var m map[string]string + if err := yaml.Unmarshal(data, &m); err != nil { + log.FromContext(ctx).Errorf("An error during umarshal ipmap: %v", err.Error()) + continue + } + select { + case <-ctx.Done(): + return + case r <- m: + } + } + }() + return r +} + // Option modifies option value type Option func(o *serverOptions) @@ -84,7 +109,7 @@ func WithName(name string) Option { // WithAuthorizeServer sets authorize server for the server func WithAuthorizeServer(authorizeServer networkservice.NetworkServiceServer) Option { if authorizeServer == nil { - panic("Authorize server cannot be nil") + panic("authorizeServer cannot be nil") } return func(o *serverOptions) { @@ -156,7 +181,7 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to endpoint.WithAdditionalFunctionality( interdomainurl.NewServer(&nseStockServer), discover.NewServer(nsClient, nseClient), - swapip.NewServer(ctx, opts.mapipFilePath), + swapip.NewServer(opts.openMapIPChannel(ctx)), heal.NewServer(ctx, heal.WithOnHeal(addressof.NetworkServiceClient(adapters.NewServerToClient(rv)))), connect.NewServer(ctx, @@ -175,6 +200,7 @@ func NewServer(ctx context.Context, regURL, proxyURL *url.URL, tokenGenerator to var nseServerChain = chain.NewNetworkServiceEndpointRegistryServer( proxy.NewNetworkServiceEndpointRegistryServer(proxyURL), seturl.NewNetworkServiceEndpointRegistryServer(opts.listenOn), + registryswapip.NewNetworkServiceEndpointRegistryServer(opts.openMapIPChannel(ctx)), nseStockServer, registryconnect.NewNetworkServiceEndpointRegistryServer(ctx, opts.registryConnectOptions...), ) diff --git a/pkg/networkservice/common/swapip/server.go b/pkg/networkservice/common/swapip/server.go index 6bc9e40a1..d82f1b9a2 100644 --- a/pkg/networkservice/common/swapip/server.go +++ b/pkg/networkservice/common/swapip/server.go @@ -22,14 +22,11 @@ import ( "context" "sync/atomic" - "github.com/ghodss/yaml" "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" - "github.com/networkservicemesh/sdk/pkg/tools/fs" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) type swapIPServer struct { @@ -83,19 +80,12 @@ func (i *swapIPServer) Close(ctx context.Context, conn *networkservice.Connectio } // NewServer creates new swap chain element. Expects public IP address of node -func NewServer(ctx context.Context, pathToDir string) networkservice.NetworkServiceServer { +func NewServer(updateIPMapCh <-chan map[string]string) networkservice.NetworkServiceServer { var v = new(atomic.Value) v.Store(map[string]string{}) go func() { - logger := log.FromContext(ctx).WithField("swapIPServer", "monitor map ip") - for data := range fs.WatchFile(ctx, pathToDir) { - var m map[string]string - err := yaml.Unmarshal(data, &m) - if err != nil { - logger.Error(err.Error()) - continue - } - v.Store(m) + for data := range updateIPMapCh { + v.Store(data) } }() return &swapIPServer{internalToExternalMap: v} diff --git a/pkg/networkservice/common/swapip/server_test.go b/pkg/networkservice/common/swapip/server_test.go index bf7eeb4df..b6f49827d 100644 --- a/pkg/networkservice/common/swapip/server_test.go +++ b/pkg/networkservice/common/swapip/server_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/ghodss/yaml" "github.com/networkservicemesh/api/pkg/api/networkservice" "github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/common" "github.com/stretchr/testify/require" @@ -33,6 +34,7 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkrequest" "github.com/networkservicemesh/sdk/pkg/networkservice/utils/checks/checkresponse" + "github.com/networkservicemesh/sdk/pkg/tools/fs" ) func TestSwapIPServer_Request(t *testing.T) { @@ -50,6 +52,9 @@ func TestSwapIPServer_Request(t *testing.T) { err = ioutil.WriteFile(p2, []byte(`172.16.2.100: 172.16.1.100`), os.ModePerm) require.NoError(t, err) + ch1 := convertBytesChToMapCh(fs.WatchFile(ctx, p1)) + ch2 := convertBytesChToMapCh(fs.WatchFile(ctx, p2)) + var testChain = next.NewNetworkServiceServer( checkresponse.NewServer(t, func(t *testing.T, c *networkservice.Connection) { require.Equal(t, "172.16.2.10", c.Mechanism.Parameters[common.SrcIP]) @@ -57,7 +62,7 @@ func TestSwapIPServer_Request(t *testing.T) { require.Equal(t, "172.16.1.100", c.Mechanism.Parameters[common.DstIP]) require.Equal(t, "172.16.2.100", c.Mechanism.Parameters[common.DstOriginalIP]) }), - swapip.NewServer(ctx, p1), + swapip.NewServer(ch1), checkrequest.NewServer(t, func(t *testing.T, r *networkservice.NetworkServiceRequest) { require.Equal(t, "172.16.1.10", r.Connection.Mechanism.Parameters[common.SrcIP]) require.Equal(t, "172.16.2.10", r.Connection.Mechanism.Parameters[common.SrcOriginalIP]) @@ -66,7 +71,7 @@ func TestSwapIPServer_Request(t *testing.T) { require.Equal(t, "172.16.1.100", c.Mechanism.Parameters[common.DstIP]) require.Equal(t, "172.16.2.100", c.Mechanism.Parameters[common.DstOriginalIP]) }), - swapip.NewServer(ctx, p2), + swapip.NewServer(ch2), checkrequest.NewServer(t, func(t *testing.T, r *networkservice.NetworkServiceRequest) { require.Equal(t, "", r.Connection.Mechanism.Parameters[common.DstOriginalIP]) require.Equal(t, "", r.Connection.Mechanism.Parameters[common.DstIP]) @@ -93,3 +98,17 @@ func TestSwapIPServer_Request(t *testing.T) { _, err = testChain.Request(ctx, &networkservice.NetworkServiceRequest{Connection: resp}) require.NoError(t, err) } + +func convertBytesChToMapCh(in <-chan []byte) <-chan map[string]string { + var out = make(chan map[string]string) + go func() { + for data := range in { + var r map[string]string + _ = yaml.Unmarshal(data, &r) + out <- r + } + close(out) + }() + + return out +} diff --git a/pkg/registry/common/swapip/nse_registry.go b/pkg/registry/common/swapip/nse_registry.go new file mode 100644 index 000000000..c10364982 --- /dev/null +++ b/pkg/registry/common/swapip/nse_registry.go @@ -0,0 +1,111 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package swapip allows to replace internal NSE address to external for register/unregister/find queries. +package swapip + +import ( + "context" + "net" + "net/url" + "sync/atomic" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/registry" + + "github.com/networkservicemesh/sdk/pkg/registry/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type swapIPFindNSEServer struct { + registry.NetworkServiceEndpointRegistry_FindServer + m map[string]string + ctx context.Context +} + +func (s *swapIPFindNSEServer) Send(nse *registry.NetworkServiceEndpoint) error { + trySwapIP(s.ctx, nse, s.m) + return s.NetworkServiceEndpointRegistry_FindServer.Send(nse) +} + +type swapIPNSEServer struct { + swapIPMap *atomic.Value +} + +func (n *swapIPNSEServer) Register(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*registry.NetworkServiceEndpoint, error) { + m := n.swapIPMap.Load().(map[string]string) + trySwapIP(ctx, nse, m) + resp, err := next.NetworkServiceEndpointRegistryServer(ctx).Register(ctx, nse) + if err == nil { + trySwapIP(ctx, resp, m) + } + return resp, err +} + +func (n *swapIPNSEServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error { + m := n.swapIPMap.Load().(map[string]string) + trySwapIP(server.Context(), query.NetworkServiceEndpoint, m) + return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, &swapIPFindNSEServer{NetworkServiceEndpointRegistry_FindServer: server, m: m, ctx: server.Context()}) +} + +func (n *swapIPNSEServer) Unregister(ctx context.Context, nse *registry.NetworkServiceEndpoint) (*empty.Empty, error) { + m := n.swapIPMap.Load().(map[string]string) + trySwapIP(ctx, nse, m) + return next.NetworkServiceEndpointRegistryServer(ctx).Unregister(ctx, nse) +} + +func trySwapIP(ctx context.Context, nse *registry.NetworkServiceEndpoint, ipMap map[string]string) { + logger := log.FromContext(ctx) + + u, err := url.Parse(nse.Url) + defer func() { + if err != nil { + logger.Debugf("can not parse incomming url: %v, err: %v", nse.Url, err) + } + }() + + if err != nil { + return + } + + h, p, err := net.SplitHostPort(u.Host) + + if err != nil { + return + } + + if v, ok := ipMap[h]; ok { + logger.Debugf("swapping %v to %v", h, v) + u.Host = net.JoinHostPort(v, p) + nse.Url = u.String() + } +} + +// NewNetworkServiceEndpointRegistryServer creates a new seturl registry.NetworkServiceEndpointRegistryServer +func NewNetworkServiceEndpointRegistryServer(updateMapCh <-chan map[string]string) registry.NetworkServiceEndpointRegistryServer { + var v = new(atomic.Value) + v.Store(map[string]string{}) + + go func() { + for m := range updateMapCh { + v.Store(m) + } + }() + + return &swapIPNSEServer{ + swapIPMap: v, + } +} diff --git a/pkg/registry/common/swapip/nse_registry_test.go b/pkg/registry/common/swapip/nse_registry_test.go new file mode 100644 index 000000000..80a63abe4 --- /dev/null +++ b/pkg/registry/common/swapip/nse_registry_test.go @@ -0,0 +1,130 @@ +// Copyright (c) 2021 Doc.ai and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package swapip_test + +import ( + "context" + "testing" + "time" + + "github.com/networkservicemesh/api/pkg/api/registry" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/networkservicemesh/sdk/pkg/registry/common/memory" + "github.com/networkservicemesh/sdk/pkg/registry/common/swapip" + "github.com/networkservicemesh/sdk/pkg/registry/core/adapters" + "github.com/networkservicemesh/sdk/pkg/registry/core/chain" + "github.com/networkservicemesh/sdk/pkg/registry/utils/checks/checknse" +) + +func TestSwapIPNSERegistryServer_Register(t *testing.T) { + defer goleak.VerifyNone(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var ipMapCh = make(chan map[string]string) + s := chain.NewNetworkServiceEndpointRegistryServer( + swapip.NewNetworkServiceEndpointRegistryServer(ipMapCh), + checknse.NewServer(t, func(t *testing.T, nse *registry.NetworkServiceEndpoint) { + require.Equal(t, "tcp://8.8.8.8:5001", nse.Url) + }), + ) + + ipMapCh <- map[string]string{ + "127.0.0.1": "8.8.8.8", + "8.8.8.8": "127.0.0.1", + } + + defer close(ipMapCh) + + resp, err := s.Register(ctx, ®istry.NetworkServiceEndpoint{ + Url: "tcp://127.0.0.1:5001", + }) + require.NoError(t, err) + require.Equal(t, "tcp://127.0.0.1:5001", resp.Url) +} + +func TestSwapIPNSERegistryServer_Unregister(t *testing.T) { + defer goleak.VerifyNone(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var ipMapCh = make(chan map[string]string) + + s := chain.NewNetworkServiceEndpointRegistryServer( + swapip.NewNetworkServiceEndpointRegistryServer(ipMapCh), + checknse.NewServer(t, func(t *testing.T, nse *registry.NetworkServiceEndpoint) { + require.Equal(t, "tcp://8.8.8.8:5001", nse.Url) + }), + ) + defer close(ipMapCh) + + ipMapCh <- map[string]string{ + "127.0.0.1": "8.8.8.8", + "8.8.8.8": "127.0.0.1", + } + + _, err := s.Unregister(ctx, ®istry.NetworkServiceEndpoint{ + Url: "tcp://127.0.0.1:5001", + }) + require.NoError(t, err) +} + +func TestSwapIPNSERegistryServer_Find(t *testing.T) { + defer goleak.VerifyNone(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var ipMapCh = make(chan map[string]string) + + m := memory.NewNetworkServiceEndpointRegistryServer() + + _, err := m.Register(ctx, ®istry.NetworkServiceEndpoint{ + Url: "tcp://8.8.8.8:5001", + }) + require.NoError(t, err) + + s := chain.NewNetworkServiceEndpointRegistryServer( + swapip.NewNetworkServiceEndpointRegistryServer(ipMapCh), + checknse.NewServer(t, func(t *testing.T, nse *registry.NetworkServiceEndpoint) { + require.Equal(t, "tcp://8.8.8.8:5001", nse.Url) + }), + m, + ) + defer close(ipMapCh) + + ipMapCh <- map[string]string{ + "127.0.0.1": "8.8.8.8", + "8.8.8.8": "127.0.0.1", + } + + stream, err := adapters.NetworkServiceEndpointServerToClient(s).Find(ctx, ®istry.NetworkServiceEndpointQuery{ + NetworkServiceEndpoint: ®istry.NetworkServiceEndpoint{ + Url: "tcp://127.0.0.1:5001", + }, + }) + require.NoError(t, err) + + list := registry.ReadNetworkServiceEndpointList(stream) + require.Len(t, list, 1) + + require.Equal(t, "tcp://127.0.0.1:5001", list[0].Url) +} diff --git a/pkg/registry/utils/checks/checknse/nse_server.go b/pkg/registry/utils/checks/checknse/nse_server.go index 4eee181b5..0b3847341 100644 --- a/pkg/registry/utils/checks/checknse/nse_server.go +++ b/pkg/registry/utils/checks/checknse/nse_server.go @@ -48,6 +48,7 @@ func (s *checkNSEServer) Register(ctx context.Context, nse *registry.NetworkServ } func (s *checkNSEServer) Find(query *registry.NetworkServiceEndpointQuery, server registry.NetworkServiceEndpointRegistry_FindServer) error { + s.check(s.T, query.NetworkServiceEndpoint) return next.NetworkServiceEndpointRegistryServer(server.Context()).Find(query, server) }