Skip to content

Commit

Permalink
Add connection tracking metrics in flow exporter
Browse files Browse the repository at this point in the history
Resolves issue: #1033
Add there metrics TotalConnectionsInConnTrackTable,
TotalAntreaConnectionsInConnTrackTable, MaxConnectionsInConnTrackTable
to show the capacity and current number of connection in the connection
tracking table.
These metrics will be useful for configuring connection tracking table
size and also help generate alerts.
  • Loading branch information
Yongming Ding committed Sep 11, 2020
1 parent fe66434 commit 00e3cd0
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 4 deletions.
4 changes: 3 additions & 1 deletion pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/proxy"
)
Expand Down Expand Up @@ -147,6 +148,7 @@ func (cs *ConnectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
}
}
}
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()
klog.V(4).Infof("New Antrea flow added: %v", conn)
// Add new antrea connection to connection store
cs.connections[connKey] = *conn
Expand Down Expand Up @@ -215,7 +217,7 @@ func (cs *ConnectionStore) DeleteConnectionByKey(connKey flowexporter.Connection
cs.mutex.Lock()
defer cs.mutex.Unlock()
delete(cs.connections, connKey)

metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/agent/flowexporter/connections/connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ package connections
import (
"fmt"
"net"
"strings"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"

"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
interfacestoretest "github.com/vmware-tanzu/antrea/pkg/agent/interfacestore/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
proxytest "github.com/vmware-tanzu/antrea/pkg/agent/proxy/testing"
k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy"
)
Expand Down Expand Up @@ -56,6 +60,7 @@ func makeTuple(srcIP *net.IP, dstIP *net.IP, protoID uint8, srcPort uint16, dstP
func TestConnectionStore_addAndUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
metrics.InitializeConnectionMetrics()
// Create two flows; one is already in ConnectionStore and other one is new
refTime := time.Now()
// Flow-1, which is already in ConnectionStore
Expand Down Expand Up @@ -138,6 +143,8 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
// Add flow1conn to the Connection map
testFlow1Tuple := flowexporter.NewConnectionKey(&testFlow1)
connStore.connections[testFlow1Tuple] = oldTestFlow1
// For testing purposes, increment the metric
metrics.TotalAntreaConnectionsInConnTrackTable.Inc()

addOrUpdateConnTests := []struct {
flow flowexporter.Connection
Expand Down Expand Up @@ -170,6 +177,7 @@ func TestConnectionStore_addAndUpdateConn(t *testing.T) {
actualConn, ok := connStore.GetConnByKey(flowTuple)
assert.Equal(t, ok, true, "connection should be there in connection store")
assert.Equal(t, expConn, *actualConn, "Connections should be equal")
checkAntreaConnectionMetrics(t, len(connStore.connections))
}
}

Expand Down Expand Up @@ -238,6 +246,7 @@ func TestConnectionStore_ForAllConnectionsDo(t *testing.T) {
func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
metrics.InitializeConnectionMetrics()
// Create two flows; one is already in ConnectionStore and other one is new
testFlows := make([]*flowexporter.Connection, 2)
testFlowKeys := make([]*flowexporter.ConnectionKey, 2)
Expand Down Expand Up @@ -272,6 +281,8 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
connKey := flowexporter.NewConnectionKey(flow)
testFlowKeys[i] = &connKey
}
// For testing purposes, set the metric
metrics.TotalAntreaConnectionsInConnTrackTable.Set(float64(len(testFlows)))
// Create ConnectionStore
mockIfaceStore := interfacestoretest.NewMockInterfaceStore(ctrl)
mockConnDumper := connectionstest.NewMockConnTrackDumper(ctrl)
Expand All @@ -286,5 +297,16 @@ func TestConnectionStore_DeleteConnectionByKey(t *testing.T) {
assert.Nil(t, err, "DeleteConnectionByKey should return nil")
_, exists := connStore.GetConnByKey(*testFlowKeys[i])
assert.Equal(t, exists, false, "connection should be deleted in connection store")
checkAntreaConnectionMetrics(t, len(connStore.connections))
}
}

func checkAntreaConnectionMetrics(t *testing.T, numConns int) {
expectedAntreaConnectionCount := `
# HELP antrea_agent_conntrack_antrea_connection_count [ALPHA] Number of connections in the Antrea ZoneID of the conntrack table.
# TYPE antrea_agent_conntrack_antrea_connection_count gauge
`
expectedAntreaConnectionCount = expectedAntreaConnectionCount + fmt.Sprintf("antrea_agent_conntrack_antrea_connection_count %d\n", numConns)

assert.Equal(t, nil, testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedAntreaConnectionCount), "antrea_agent_conntrack_antrea_connection_count"))
}
25 changes: 24 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ package connections

import (
"net"
"strconv"
"strings"

"github.com/ti-mo/conntrack"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/util/sysctl"

"os/exec"
)

// connTrackSystem implements ConnTrackDumper. This is for linux kernel datapath.
Expand All @@ -44,11 +49,26 @@ func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet) *
// Do not fail, but continue after logging error as we can still dump flows with no timestamps.
sysctl.EnsureSysctlNetValue("netfilter/nf_conntrack_timestamp", 1)

return &connTrackSystem{
ct := &connTrackSystem{
nodeConfig,
serviceCIDR,
&netFilterConnTrack{},
}

// Set the antrea_agent_conntrack_max_connection_count prometheus metric
cmdOutput, err := exec.Command("cat", "/proc/sys/net/nf_conntrack_max").Output()
if err != nil {
klog.Errorf("error when executing cat /proc/sys/net/nf_conntrack_max command: %v", err)
return nil
}
maxConns, err := strconv.Atoi(strings.TrimSpace(string(cmdOutput)))
if err != nil {
klog.Errorf("error when convert nf_conntrack_max cmdoutput to int: %v", err)
return nil
}
metrics.MaxConnectionsInConnTrackTable.Set(float64(maxConns))

return ct
}

// DumpFlows opens netlink connection and dumps all the flows in Antrea ZoneID of conntrack table.
Expand All @@ -68,6 +88,9 @@ func (ct *connTrackSystem) DumpFlows(zoneFilter uint16) ([]*flowexporter.Connect
klog.Errorf("Error when dumping flows from conntrack: %v", err)
return nil, err
}

metrics.TotalConnectionsInConnTrackTable.Set(float64(len(conns)))

filteredConns := filterAntreaConns(conns, ct.nodeConfig, ct.serviceCIDR, zoneFilter)
klog.V(2).Infof("No. of flow exporter considered flows in Antrea zoneID: %d", len(filteredConns))

Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl"
)

Expand All @@ -49,11 +50,25 @@ func NewConnTrackOvsAppCtl(nodeConfig *config.NodeConfig, serviceCIDR *net.IPNet
if ovsctlClient == nil {
return nil
}
return &connTrackOvsCtl{
ct := &connTrackOvsCtl{
nodeConfig,
serviceCIDR,
ovsctlClient,
}

cmdOutput, execErr := ct.ovsctlClient.RunAppctlCmd("dpctl/ct-get-maxconns", false)
if execErr != nil {
klog.Errorf("error when executing dpctl/ct-get-maxconns command: %v", execErr)
return nil
}
maxConns, err := strconv.Atoi(strings.TrimSpace(string(cmdOutput)))
if err != nil {
klog.Errorf("error when convert dpctl/ct-get-maxconns cmdoutput to int: %v", err)
return nil
}
metrics.MaxConnectionsInConnTrackTable.Set(float64(maxConns))

return ct
}

// DumpFlows uses "ovs-appctl dpctl/dump-conntrack" to dump conntrack flows in the Antrea ZoneID.
Expand Down Expand Up @@ -90,6 +105,8 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe
antreaConns = append(antreaConns, conn)
}
}

metrics.TotalConnectionsInConnTrackTable.Set(float64(len(outputFlow)))
klog.V(2).Infof("FlowExporter considered flows in conntrack: %d", len(antreaConns))
return antreaConns, nil
}
Expand Down
36 changes: 35 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@
package connections

import (
"fmt"
"net"
"strconv"
"strings"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/ti-mo/conntrack"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
connectionstest "github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections/testing"
"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
ovsctltest "github.com/vmware-tanzu/antrea/pkg/ovs/ovsctl/testing"
)

func TestConnTrackSystem_DumpFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
metrics.InitializeConnectionMetrics()
// Create flows for test
tuple, revTuple := makeTuple(&net.IP{1, 2, 3, 4}, &net.IP{4, 3, 2, 1}, 6, 65280, 255)
antreaFlow := &flowexporter.Connection{
Expand Down Expand Up @@ -90,14 +97,19 @@ func TestConnTrackSystem_DumpFlows(t *testing.T) {
t.Errorf("Dump flows function returned error: %v", err)
}
assert.Equal(t, 1, len(conns), "number of filtered connections should be equal")
checkConnectionMetrics(t, len(testFlows))
}

func TestConnTackOvsAppCtl_DumpFlows(t *testing.T) {
func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
metrics.InitializeConnectionMetrics()

// Create mock interface
mockOVSCtlClient := ovsctltest.NewMockOVSCtlClient(ctrl)
// Set expect call of dpctl/ct-get-maxconns for mock ovsCtlClient
maxConns := 300000
mockOVSCtlClient.EXPECT().RunAppctlCmd("dpctl/ct-get-maxconns", false).Return([]byte(strconv.Itoa(maxConns)), nil)
// Create nodeConfig and gateWayConfig
// Set antreaGWFlow.TupleOrig.IP.DestinationAddress as gateway IP
gwConfig := &config.GatewayConfig{
Expand All @@ -117,6 +129,7 @@ func TestConnTackOvsAppCtl_DumpFlows(t *testing.T) {
ovsctlCmdOutput := []byte("tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=127.0.0.1,dst=8.7.6.5,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=8.7.6.5,dst=127.0.0.1,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=192.168.86.82,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,mark=33,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
outputFlow := strings.Split(string(ovsctlCmdOutput), "\n")
expConn := &flowexporter.Connection{
ID: 982464968,
Timeout: 86399,
Expand Down Expand Up @@ -157,5 +170,26 @@ func TestConnTackOvsAppCtl_DumpFlows(t *testing.T) {
}
assert.Equal(t, len(conns), 1)
assert.Equal(t, conns[0], expConn, "filtered connection and expected connection should be same")
checkConnectionMetrics(t, len(outputFlow))
checkMaxConnectionsMetrics(t, maxConns)
}

func checkConnectionMetrics(t *testing.T, numConns int) {
expectedConnectionCount := `
# HELP antrea_agent_conntrack_total_connection_count [ALPHA] Number of connections in the conntrack table.
# TYPE antrea_agent_conntrack_total_connection_count gauge
`
expectedConnectionCount = expectedConnectionCount + fmt.Sprintf("antrea_agent_conntrack_total_connection_count %d\n", numConns)

assert.Equal(t, nil, testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedConnectionCount), "antrea_agent_conntrack_total_connection_count"))
}

func checkMaxConnectionsMetrics(t *testing.T, maxConns int) {
expectedkMaxConnectionsCount := `
# HELP antrea_agent_conntrack_max_connection_count [ALPHA] Size of the conntrack table.
# TYPE antrea_agent_conntrack_max_connection_count gauge
`
expectedkMaxConnectionsCount = expectedkMaxConnectionsCount + fmt.Sprintf("antrea_agent_conntrack_max_connection_count %d\n", maxConns)

assert.Equal(t, nil, testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(expectedkMaxConnectionsCount), "antrea_agent_conntrack_max_connection_count"))
}
37 changes: 37 additions & 0 deletions pkg/agent/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,30 @@ var (
},
[]string{"operation"},
)

TotalConnectionsInConnTrackTable = metrics.NewGauge(
&metrics.GaugeOpts{
Name: "antrea_agent_conntrack_total_connection_count",
Help: "Number of connections in the conntrack table.",
StabilityLevel: metrics.ALPHA,
},
)

TotalAntreaConnectionsInConnTrackTable = metrics.NewGauge(
&metrics.GaugeOpts{
Name: "antrea_agent_conntrack_antrea_connection_count",
Help: "Number of connections in the Antrea ZoneID of the conntrack table.",
StabilityLevel: metrics.ALPHA,
},
)

MaxConnectionsInConnTrackTable = metrics.NewGauge(
&metrics.GaugeOpts{
Name: "antrea_agent_conntrack_max_connection_count",
Help: "Size of the conntrack table.",
StabilityLevel: metrics.ALPHA,
},
)
)

func InitializePrometheusMetrics() {
Expand Down Expand Up @@ -121,6 +145,7 @@ func InitializePrometheusMetrics() {
InitializePodMetrics()
InitializeNetworkPolicyMetrics()
InitializeOVSMetrics()
InitializeConnectionMetrics()
}

func InitializePodMetrics() {
Expand Down Expand Up @@ -169,3 +194,15 @@ func InitializeOVSMetrics() {
OVSFlowOpsLatency.WithLabelValues(ops)
}
}

func InitializeConnectionMetrics() {
if err := legacyregistry.Register(TotalConnectionsInConnTrackTable); err != nil {
klog.Errorf("Failed to register antrea_agent_conntrack_total_connection_count with error: %v", err)
}
if err := legacyregistry.Register(TotalAntreaConnectionsInConnTrackTable); err != nil {
klog.Errorf("Failed to register antrea_agent_conntrack_antrea_connection_count with error: %v", err)
}
if err := legacyregistry.Register(MaxConnectionsInConnTrackTable); err != nil {
klog.Errorf("Failed to register antrea_agent_conntrack_antrea_connection_count with error: %v", err)
}
}

0 comments on commit 00e3cd0

Please sign in to comment.