Skip to content

Commit

Permalink
use source address for ping to check if the connection is alive
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Glazychev <artem.glazychev@xored.com>
  • Loading branch information
glazychev-art committed Mar 21, 2023
1 parent 40f70dd commit 9527e5a
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 42 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 // indirect
github.com/edwarnicke/serialize v1.0.7 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ github.com/edwarnicke/exechelper v1.0.2/go.mod h1:/T271jtNX/ND4De6pa2aRy2+8sNtyC
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29 h1:4/2wgileNvQB4HfJbq7u4FFLKIfc38a6P0S/51ZGgX8=
github.com/edwarnicke/genericsync v0.0.0-20220910010113-61a344f9bc29/go.mod h1:3m+ZfVq+z0pTLW798jmqnifMsalrVLIKmfXaMFvqSuc=
github.com/edwarnicke/serialize v1.0.7 h1:geX8vmyu8Ij2S5fFIXjy9gBDkKxXnrMIzMoDvV0Ddac=
github.com/edwarnicke/serialize v1.0.7/go.mod h1:y79KgU2P7ALH/4j37uTSIdNavHFNttqN7pzO6Y8B2aw=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -213,6 +214,7 @@ go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTN
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down Expand Up @@ -357,6 +359,7 @@ golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
168 changes: 126 additions & 42 deletions pkg/kernel/tools/heal/liveness_check.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -19,12 +19,14 @@ package heal

import (
"context"
"net"
"time"

"github.com/pkg/errors"

"github.com/go-ping/ping"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/heal"
"github.com/networkservicemesh/sdk/pkg/tools/log"
)

Expand All @@ -33,58 +35,140 @@ const (
packetCount = 4
)

// KernelLivenessCheck is an implementation of heal.LivenessCheck. It sends ICMP
// ping and checks reply. Returns false if didn't get reply.
func KernelLivenessCheck(deadlineCtx context.Context, conn *networkservice.Connection) bool {
if mechanism := conn.GetMechanism().GetType(); mechanism != kernel.MECHANISM {
log.FromContext(deadlineCtx).Warnf("ping is not supported for mechanism %v", mechanism)
return true
}
type options struct {
pingerFactory PingerFactory
}

// Option is an option pattern for LivelinessChecker
type Option func(o *options)

deadline, ok := deadlineCtx.Deadline()
if !ok {
deadline = time.Now().Add(defaultTimeout)
// WithPingerFactory - sets any custom pinger factory
func WithPingerFactory(pf PingerFactory) Option {
return func(o *options) {
o.pingerFactory = pf
}
}

addrCount := len(conn.GetContext().GetIpContext().GetDstIpAddrs())
if addrCount == 0 {
log.FromContext(deadlineCtx).Debug("No dst IP address")
return true
// KernelLivenessCheck returns an implementation of heal.LivenessCheck. It sends ICMP
// ping and checks reply. Returns false if didn't get reply.
func KernelLivenessCheck(opts ...Option) heal.LivenessCheck {
// Apply options
o := &options{
pingerFactory: &defaultPingerFactory{},
}
timeout := time.Until(deadline) / time.Duration(addrCount)
for _, opt := range opts {
opt(o)
}
var pingerFactory = o.pingerFactory

var pinger *ping.Pinger
return func(deadlineCtx context.Context, conn *networkservice.Connection) bool {
if mechanism := conn.GetMechanism().GetType(); mechanism != kernel.MECHANISM {
log.FromContext(deadlineCtx).Warnf("ping is not supported for mechanism %v", mechanism)
return true
}
ipContext := conn.GetContext().GetIpContext()
addrCount := len(ipContext.GetDstIpAddrs()) * len(ipContext.GetSrcIpAddrs())
if addrCount == 0 {
log.FromContext(deadlineCtx).Debug("No IP address")
return true
}

for _, cidr := range conn.GetContext().GetIpContext().GetDstIpAddrs() {
addr, _, err := net.ParseCIDR(cidr)
if err != nil {
log.FromContext(deadlineCtx).Errorf("ParseCIDR failed: %s", err.Error())
return false
deadline, ok := deadlineCtx.Deadline()
if !ok {
deadline = time.Now().Add(defaultTimeout)
}
timeout := time.Until(deadline)

// Start ping for all Src/DstIPs combination
responseCh := make(chan error, addrCount)
defer close(responseCh)
for _, srcIPNet := range ipContext.GetSrcIPNets() {
for _, dstIPNet := range ipContext.GetDstIPNets() {
// Skip if IPs don't belong to the same family
if (srcIPNet.IP.To4() != nil) != (dstIPNet.IP.To4() != nil) {
responseCh <- nil
continue
}

go func(srcIP, dstIP string) {
logger := log.FromContext(deadlineCtx).WithField("srcIP", srcIP).WithField("dstIP", dstIP)
pinger := pingerFactory.CreatePinger(srcIP, dstIP, timeout, packetCount)

err := pinger.Run()
if err != nil {
logger.Errorf("Ping failed: %s", err.Error())
responseCh <- err
return
}

ipAddr := &net.IPAddr{IP: addr}
if pinger == nil {
pinger, err = ping.NewPinger(addr.String())
if err != nil {
log.FromContext(deadlineCtx).Errorf("Failed to create pinger: %s", err.Error())
return false
if pinger.GetReceivedPackets() == 0 {
err = errors.New("No packets received")
logger.Errorf(err.Error())
responseCh <- err
return
}
responseCh <- nil
}(srcIPNet.IP.String(), dstIPNet.IP.String())
}
pinger.SetPrivileged(true)
pinger.Interval = timeout / packetCount
pinger.Timeout = timeout
pinger.Count = packetCount
} else {
pinger.SetIPAddr(ipAddr)
}
err = pinger.Run()
if err != nil {
log.FromContext(deadlineCtx).Errorf("Ping failed: %s", err.Error())
return false
}

if pinger.Statistics().PacketsRecv == 0 {
// Waiting for all ping results. If at least one fails - return false
return waitForResponses(responseCh)
}
}

func waitForResponses(responseCh <-chan error) bool {
respCount := cap(responseCh)
success := true
for {
resp, ok := <-responseCh
if !ok {
return false
}
if resp != nil {
success = false
}
respCount--
if respCount == 0 {
return success
}
}
}

// PingerFactory - factory interface for creating pingers
type PingerFactory interface {
CreatePinger(srcIP, dstIP string, timeout time.Duration, count int) Pinger
}

// Pinger - pinger interface
type Pinger interface {
Run() error
GetReceivedPackets() int
}

type defaultPingerFactory struct{}

func (p *defaultPingerFactory) CreatePinger(srcIP, dstIP string, timeout time.Duration, count int) Pinger {
pi := ping.New(dstIP)
pi.SetPrivileged(true)
pi.Source = srcIP
pi.Timeout = timeout
pi.Count = count
if count != 0 {
pi.Interval = timeout / time.Duration(count)
}
return true

return &defaultPinger{pinger: pi}
}

type defaultPinger struct {
pinger *ping.Pinger
}

func (p *defaultPinger) Run() error {
return p.pinger.Run()
}

func (p *defaultPinger) GetReceivedPackets() int {
return p.pinger.Statistics().PacketsRecv
}
160 changes: 160 additions & 0 deletions pkg/kernel/tools/heal/liveness_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) 2023 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package heal_test

import (
"context"
"sync/atomic"
"testing"
"time"

"go.uber.org/goleak"

"github.com/networkservicemesh/api/pkg/api/networkservice"
"github.com/networkservicemesh/api/pkg/api/networkservice/mechanisms/kernel"
"github.com/stretchr/testify/require"

"github.com/networkservicemesh/sdk-kernel/pkg/kernel/tools/heal"
)

const unPingableIPv4 = "172.168.1.1"
const unPingableIPv6 = "2005::1"

func createConnection(srcIPs, dstIPs []string) *networkservice.Connection {
return &networkservice.Connection{
Mechanism: &networkservice.Mechanism{
Type: kernel.MECHANISM,
},
Context: &networkservice.ConnectionContext{IpContext: &networkservice.IPContext{
SrcIpAddrs: srcIPs,
DstIpAddrs: dstIPs,
}},
}
}
func Test_LivenessChecker(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

samples := []struct {
Name string
Connection *networkservice.Connection
PingersCount int32
ExpectedResult bool
}{
{
Name: "Pingable IPv4 one pair",
Connection: createConnection(
[]string{"172.168.0.1/32"},
[]string{"172.168.0.2/32"},
),
PingersCount: 1,
ExpectedResult: true,
},
{
Name: "Pingable IPv4 two pairs",
Connection: createConnection(
[]string{"172.168.0.1/32", "172.168.0.3/32"},
[]string{"172.168.0.2/32", "172.168.0.4/32"},
),
PingersCount: 4,
ExpectedResult: true,
},
{
Name: "Unpingable IPv4 two pairs",
Connection: createConnection(
[]string{"172.168.0.1/32", "172.168.0.3/32"},
[]string{"172.168.0.2/32", unPingableIPv4 + "/32"},
),
PingersCount: 4,
ExpectedResult: false,
},
{
Name: "Pingable IPv4 and IPv6",
Connection: createConnection(
[]string{"172.168.0.1/32", "2004::1/128"},
[]string{"172.168.0.2/32", "2004::2/128"},
),
PingersCount: 2,
ExpectedResult: true,
},
{
Name: "Unpingable IPv4 and IPv6",
Connection: createConnection(
[]string{"172.168.0.1/32", "2004::1/128"},
[]string{"172.168.0.2/32", unPingableIPv6 + "/128"},
),
PingersCount: 2,
ExpectedResult: false,
},
{
Name: "SrcIPs is empty",
Connection: createConnection(
[]string{},
[]string{"172.168.0.2/32"},
),
PingersCount: 0,
ExpectedResult: true,
},
{
Name: "DstIPs is empty",
Connection: createConnection(
[]string{"172.168.0.1/32"},
[]string{},
),
PingersCount: 0,
ExpectedResult: true,
},
}
for _, s := range samples {
sample := s
t.Run(sample.Name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
pingerFactory := &testPingerFactory{}
ok := heal.KernelLivenessCheck(heal.WithPingerFactory(pingerFactory))(ctx, sample.Connection)
require.Equal(t, sample.ExpectedResult, ok)
require.Equal(t, pingerFactory.pingersCount, sample.PingersCount)
})
}
}

type testPingerFactory struct {
pingersCount int32
}

func (p *testPingerFactory) CreatePinger(srcIP, dstIP string, timeout time.Duration, count int) heal.Pinger {
atomic.AddInt32(&p.pingersCount, 1)
return &testPinger{
dstIP: dstIP,
count: count,
}
}

type testPinger struct {
dstIP string
count int
}

func (p *testPinger) Run() error {
return nil
}

func (p *testPinger) GetReceivedPackets() int {
if p.dstIP == unPingableIPv4 || p.dstIP == unPingableIPv6 {
return 0
}
return p.count
}

0 comments on commit 9527e5a

Please sign in to comment.