From 1955d6165f5f53f4cd6a5d149dd762e3ab3a8e67 Mon Sep 17 00:00:00 2001 From: Denis Tingaikin <49399980+denis-tingaikin@users.noreply.github.com> Date: Tue, 17 May 2022 05:08:00 +0300 Subject: [PATCH] feat: add externaldnscontext networkservice chain element (#1295) * add externaldnscontext pkg Signed-off-by: denis-tingaikin * fix typos Signed-off-by: denis-tingaikin --- .../externaldnscontext/server.go | 155 ++++++++++++ .../externaldnscontext/server_test.go | 237 ++++++++++++++++++ 2 files changed, 392 insertions(+) create mode 100644 pkg/networkservice/connectioncontext/externaldnscontext/server.go create mode 100644 pkg/networkservice/connectioncontext/externaldnscontext/server_test.go diff --git a/pkg/networkservice/connectioncontext/externaldnscontext/server.go b/pkg/networkservice/connectioncontext/externaldnscontext/server.go new file mode 100644 index 000000000..edc0c0d83 --- /dev/null +++ b/pkg/networkservice/connectioncontext/externaldnscontext/server.go @@ -0,0 +1,155 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package externaldnscontext gets dnscontext from the remote side. +package externaldnscontext + +import ( + "context" + "errors" + "net" + "net/url" + "sync/atomic" + "time" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/dns" + "github.com/networkservicemesh/api/pkg/api/networkservice" + + "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" + "github.com/networkservicemesh/sdk/pkg/tools/log" +) + +type externaldnscontextServer struct { + serverLabels map[string]string + clientQueue chan *dns.DNSRequest + configs atomic.Value +} + +// NewServer cretes new externaldnscontext networkservice server instance +func NewServer(ctx context.Context, serverLabels map[string]string, remoteURL *url.URL, opts ...grpc.DialOption) networkservice.NetworkServiceServer { + var r = &externaldnscontextServer{ + serverLabels: serverLabels, + clientQueue: make(chan *dns.DNSRequest, 100), + } + var logger = log.FromContext(ctx).WithField("externaldnscontextServer", "managePrefixes go-routine") + + r.configs.Store((*dns.Configs)(nil)) + + go func() { + defer close(r.clientQueue) + <-ctx.Done() + }() + + go func() { + for ; ctx.Err() == nil; time.Sleep(time.Millisecond * 100) { + cc, err := grpc.DialContext(ctx, grpcutils.URLToTarget(remoteURL), opts...) + if err != nil { + logger.Errorf("cant dial: %v", err.Error()) + continue + } + defer func() { _ = cc.Close() }() + + c := dns.NewDNSClient(cc) + resp, err := c.FetchConfigs(ctx, new(empty.Empty)) + + if err != nil { + logger.Errorf("cant fetch configs: %v") + } + + r.configs.Store(resp) + + stream, err := c.ManageNames(ctx) + if err != nil { + logger.Errorf("cant open stream: %v", err.Error()) + continue + } + + for req := range r.clientQueue { + err = stream.Send(req) + if err != nil { + logger.Errorf("cant send msg: %v", err.Error()) + break + } + _, err = stream.Recv() + if err != nil { + logger.Errorf("cant recv msg: %v", err.Error()) + break + } + } + r.configs.Store((*dns.Configs)(nil)) + _ = cc.Close() + } + }() + + return r +} + +func (e *externaldnscontextServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { + var configs []*networkservice.DNSConfig + + if v, ok := e.configs.Load().(*dns.Configs); ok && v != nil { + configs = v.Configs + } else { + return nil, errors.New("dns service is not ready yet") + } + + if request.GetConnection() == nil { + request.Connection = new(networkservice.Connection) + } + if request.GetConnection().GetContext() == nil { + request.GetConnection().Context = new(networkservice.ConnectionContext) + } + if request.GetConnection().GetContext().GetDnsContext() == nil { + request.GetConnection().GetContext().DnsContext = new(networkservice.DNSContext) + } + + e.enqueueDNSRequest(dns.Type_ASSIGN, e.serverLabels, request.GetConnection().GetContext().GetIpContext().GetDstIPNets()) + e.enqueueDNSRequest(dns.Type_ASSIGN, request.GetConnection().GetLabels(), request.GetConnection().GetContext().GetIpContext().GetSrcIPNets()) + + request.GetConnection().GetContext().GetDnsContext().Configs = append(request.GetConnection().GetContext().GetDnsContext().Configs, configs...) + + resp, err := next.Server(ctx).Request(ctx, request) + + if err != nil { + return nil, err + } + + return resp, err +} + +func (e *externaldnscontextServer) enqueueDNSRequest(t dns.Type, labels map[string]string, ipNets []*net.IPNet) { + var ips []string + for _, ipNet := range ipNets { + ips = append(ips, ipNet.IP.String()) + } + + select { + case e.clientQueue <- &dns.DNSRequest{Type: t, Ips: ips, Labels: labels}: + default: + } +} + +func (e *externaldnscontextServer) Close(ctx context.Context, conn *networkservice.Connection) (*empty.Empty, error) { + e.enqueueDNSRequest(dns.Type_UNASSIGN, e.serverLabels, conn.GetContext().GetIpContext().GetDstIPNets()) + e.enqueueDNSRequest(dns.Type_UNASSIGN, conn.GetLabels(), conn.GetContext().GetIpContext().GetSrcIPNets()) + + return next.Server(ctx).Close(ctx, conn) +} diff --git a/pkg/networkservice/connectioncontext/externaldnscontext/server_test.go b/pkg/networkservice/connectioncontext/externaldnscontext/server_test.go new file mode 100644 index 000000000..6573fc019 --- /dev/null +++ b/pkg/networkservice/connectioncontext/externaldnscontext/server_test.go @@ -0,0 +1,237 @@ +// Copyright (c) 2022 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externaldnscontext_test + +import ( + "context" + "net/url" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/networkservicemesh/api/pkg/api/dns" + "github.com/networkservicemesh/api/pkg/api/networkservice" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/grpc" + + "github.com/networkservicemesh/sdk/pkg/networkservice/common/retry" + "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/dnscontext" + "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/externaldnscontext" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/adapters" + "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" + "github.com/networkservicemesh/sdk/pkg/tools/grpcutils" +) + +func newTestingDNSRegisterServce(ctx context.Context, t *testing.T) *testingDNSRegisterService { + var s = grpc.NewServer() + + var r = new(testingDNSRegisterService) + dns.RegisterDNSServer(s, r) + + var serverAddr url.URL + + require.Len(t, grpcutils.ListenAndServe(ctx, &serverAddr, s), 0) + + r.Addr = serverAddr + return r +} + +type testingDNSRegisterService struct { + entries sync.Map + Addr url.URL +} + +func (e *testingDNSRegisterService) lenEntries() int { + var r int + + e.entries.Range(func(key, value interface{}) bool { + r++ + return true + }) + + return r +} + +func (e *testingDNSRegisterService) FetchConfigs(ctx context.Context, _ *empty.Empty) (*dns.Configs, error) { + return &dns.Configs{Configs: []*networkservice.DNSConfig{{DnsServerIps: []string{"8.8.8.8"}}}}, nil +} + +func (e *testingDNSRegisterService) ManageNames(s dns.DNS_ManageNamesServer) error { + for s.Context().Err() == nil { + r, err := s.Recv() + if err != nil { + return err + } + var values []string + + for _, v := range r.Labels { + values = append(values, v) + } + + sort.Strings(values) + + var name = strings.Join(values, ".") + + if r.Type == dns.Type_ASSIGN { + e.entries.Store(name, r.Ips[0]) + } else { + e.entries.Delete(name) + } + + err = s.Send(&dns.DNSResponse{ + Names: []string{name}, + }) + + if err != nil { + return err + } + } + + return nil +} + +func Test_ExternalDNSContext_EmptyRequest(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + var ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var s = newTestingDNSRegisterServce(ctx, t) + + var c = retry.NewClient( + adapters.NewServerToClient( + externaldnscontext.NewServer( + ctx, + map[string]string{"app": "vl3", "podName": "nse1"}, + &s.Addr, grpc.WithInsecure(), + ), + ), + retry.WithTryTimeout(time.Second/10), + retry.WithInterval(time.Millisecond*100), + ) + + _, err := c.Request(ctx, &networkservice.NetworkServiceRequest{}) + + require.NoError(t, err) +} + +func Test_ExternalDNSContext_RequestRefreshClose(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + var ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var s = newTestingDNSRegisterServce(ctx, t) + + var c = retry.NewClient( + adapters.NewServerToClient( + externaldnscontext.NewServer( + ctx, + map[string]string{"app": "vl3", "podName": "nse1"}, + &s.Addr, grpc.WithInsecure(), + ), + ), + retry.WithTryTimeout(time.Second/10), + retry.WithInterval(time.Millisecond*100), + ) + + req := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: t.Name(), + Context: &networkservice.ConnectionContext{ + IpContext: &networkservice.IPContext{ + SrcIpAddrs: []string{"1.1.1.1/32"}, + DstIpAddrs: []string{"1.1.1.2/32"}, + }, + }, + }, + } + + _, err := c.Request(ctx, req) + + require.NoError(t, err) + + require.Eventually(t, func() bool { + return s.lenEntries() == 2 + }, time.Second/2, time.Second/10) + + resp, err := c.Request(ctx, req) + + require.NoError(t, err) + + require.Eventually(t, func() bool { + return s.lenEntries() == 2 + }, time.Second/2, time.Second/10) + + _, err = c.Close(ctx, resp) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return s.lenEntries() == 0 + }, time.Second/2, time.Second/10) +} +func Test_ExternalDNSContext_WorksCorrectly_WithDNSContext(t *testing.T) { + t.Cleanup(func() { goleak.VerifyNone(t) }) + + var ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + + var s = newTestingDNSRegisterServce(ctx, t) + + var c = retry.NewClient( + adapters.NewServerToClient( + chain.NewNetworkServiceServer( + externaldnscontext.NewServer( + ctx, + map[string]string{"app": "vl3", "podName": "nse1"}, + &s.Addr, grpc.WithInsecure(), + ), + dnscontext.NewServer(&networkservice.DNSConfig{DnsServerIps: []string{"8.8.4.4"}}), + ), + ), + retry.WithTryTimeout(time.Second/10), + retry.WithInterval(time.Millisecond*100), + ) + + req := &networkservice.NetworkServiceRequest{ + Connection: &networkservice.Connection{ + Id: t.Name(), + Context: &networkservice.ConnectionContext{ + IpContext: &networkservice.IPContext{ + SrcIpAddrs: []string{"1.1.1.1/32"}, + DstIpAddrs: []string{"1.1.1.2/32"}, + }, + }, + }, + } + + resp, err := c.Request(ctx, req) + + require.NoError(t, err) + + var ips []string + + for _, cfg := range resp.GetContext().GetDnsContext().GetConfigs() { + ips = append(ips, cfg.DnsServerIps...) + } + + require.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, ips) +}