From 114e7c0e986477f97fdabad07d1427229b88a3d1 Mon Sep 17 00:00:00 2001 From: Jake Sokol Date: Tue, 23 Jun 2020 13:18:50 -0400 Subject: [PATCH] Implements antctl query endpoint Antctl query endpoint gives users the ability to filter network policies relevant to a certain endpoint. The user is able to query internal controller stores using the antrea controller apiserver through antctl. Queries are answered by iterating over AddressGroups and AppliedToGroups. While we considered adding Indexers to their respective stores to speed-up queries, we decided that it was not sufficient to justify the memory overhead observed in some cases. Note, antctl query endpoint only works when run in the controller. We are working on getting it to work out-of-cluster. Fixes #974 --- pkg/apiserver/handlers/handler_test.go | 233 ++++++++++++++++++ .../controller_querier_replier.go | 187 ++++++++++++++ .../controller_query_perf_test.go | 146 +++++++++++ .../networkpolicy/controller_query_test.go | 228 +++++++++++++++++ .../networkpolicy/networkpolicy_controller.go | 2 +- 5 files changed, 795 insertions(+), 1 deletion(-) create mode 100644 pkg/apiserver/handlers/handler_test.go create mode 100644 pkg/controller/networkpolicy/controller_querier_replier.go create mode 100644 pkg/controller/networkpolicy/controller_query_perf_test.go create mode 100644 pkg/controller/networkpolicy/controller_query_test.go diff --git a/pkg/apiserver/handlers/handler_test.go b/pkg/apiserver/handlers/handler_test.go new file mode 100644 index 00000000000..55b31ef2a7d --- /dev/null +++ b/pkg/apiserver/handlers/handler_test.go @@ -0,0 +1,233 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/admission/v1" + "k8s.io/apimachinery/pkg/api/errors" + + "github.com/vmware-tanzu/antrea/pkg/apiserver/handlers/endpoint" + "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy" + queriermock "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/testing" +) + +type TestCase struct { + // query arguments sent to handler function + handlerRequest string + expectedStatus int + // expected result written by handler function + expectedContent response + + // arguments of call to mock + argsMock []string + // results of call to mock + mockQueryResponse response +} + +type response struct { + response *networkpolicy.EndpointQueryResponse + error error +} + +var responses = []response{ + { + response: &networkpolicy.EndpointQueryResponse{Endpoints: nil}, + error: errors.NewNotFound(v1.Resource("pod"), "pod"), + }, + { + response: &networkpolicy.EndpointQueryResponse{Endpoints: []networkpolicy.Endpoint{ + { + Policies: []networkpolicy.Policy{ + { + PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + }, + }, + }, + }, + }, + error: nil, + }, + { + response: &networkpolicy.EndpointQueryResponse{Endpoints: []networkpolicy.Endpoint{ + { + Policies: []networkpolicy.Policy{ + { + PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + }, + { + PolicyRef: networkpolicy.PolicyRef{Name: "policy2"}, + }, + }, + }, + }, + }, + error: nil, + }, +} + +// TestIncompleteArguments tests how the handler function responds when the user passes in a query command +// with incomplete arguments (for now, missing pod or namespace) +func TestIncompleteArguments(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + // sample selector arguments (right now, only supports podname and namespace) + namespace := "namespace" + // outline test cases with expected behavior + testCases := map[string]TestCase{ + "Responds with error given no name and no namespace": { + handlerRequest: "", + expectedStatus: http.StatusBadRequest, + argsMock: []string{"", ""}, + }, + "Responds with error given no name": { + handlerRequest: "?namespace=namespace", + expectedStatus: http.StatusBadRequest, + argsMock: []string{namespace, ""}, + }, + } + + evaluateTestCases(testCases, mockCtrl, t) + +} + +// TestInvalidArguments tests how the handler function responds when the user passes in a selector which does not select +// any existing endpoint +func TestInvalidArguments(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + // sample selector arguments (right now, only supports podname and namespace) + pod, namespace := "pod", "namespace" + // outline test cases with expected behavior + testCases := map[string]TestCase{ + "Responds with error given no invalid selection": { + handlerRequest: "?namespace=namespace&pod=pod", + expectedStatus: http.StatusNotFound, + argsMock: []string{namespace, pod}, + mockQueryResponse: response{ + response: nil, + error: nil, + }, + }, + } + + evaluateTestCases(testCases, mockCtrl, t) + +} + +// TestSinglePolicyResponse tests how the handler function responds when the user passes in a endpoint with a +// single policy response +func TestSinglePolicyResponse(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + // sample selector arguments (right now, only supports podName and namespace) + pod, namespace := "pod", "namespace" + // outline test cases with expected behavior + testCases := map[string]TestCase{ + "Responds with list of single element": { + handlerRequest: "?namespace=namespace&pod=pod", + expectedStatus: http.StatusOK, + expectedContent: responses[1], + argsMock: []string{namespace, pod}, + mockQueryResponse: response{ + response: &networkpolicy.EndpointQueryResponse{Endpoints: []networkpolicy.Endpoint{ + { + Policies: []networkpolicy.Policy{ + { + PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + }, + }, + }, + }, + }, + error: nil, + }, + }, + } + + evaluateTestCases(testCases, mockCtrl, t) + +} + +// TestMultiPolicyResponse tests how the handler function responds when the user passes in a endpoint with +// multiple policy responses +func TestMultiPolicyResponse(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + // sample selector arguments (right now, only supports podName and namespace) + pod, namespace := "pod", "namespace" + // outline test cases with expected behavior + testCases := map[string]TestCase{ + "Responds with list of single element": { + handlerRequest: "?namespace=namespace&pod=pod", + expectedStatus: http.StatusOK, + expectedContent: responses[2], + argsMock: []string{namespace, pod}, + mockQueryResponse: response{ + response: &networkpolicy.EndpointQueryResponse{Endpoints: []networkpolicy.Endpoint{ + { + Policies: []networkpolicy.Policy{ + { + PolicyRef: networkpolicy.PolicyRef{Name: "policy1"}, + }, + { + PolicyRef: networkpolicy.PolicyRef{Name: "policy2"}, + }, + }, + }, + }, + }, + error: nil, + }, + }, + } + + evaluateTestCases(testCases, mockCtrl, t) + +} + +func evaluateTestCases(testCases map[string]TestCase, mockCtrl *gomock.Controller, t *testing.T) { + for _, tc := range testCases { + // create mock querier with expected behavior outlined in testCase + mockQuerier := queriermock.NewMockEndpointQuerier(mockCtrl) + if tc.expectedStatus != http.StatusBadRequest { + mockQuerier.EXPECT().QueryNetworkPolicies(tc.argsMock[0], tc.argsMock[1]).Return(tc.mockQueryResponse.response, tc.mockQueryResponse.error) + } + // initialize handler with mockQuerier + handler := endpoint.HandleFunc(mockQuerier) + // create http using handlerArgs and serve the http request + req, err := http.NewRequest(http.MethodGet, tc.handlerRequest, nil) + assert.Nil(t, err) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + assert.Equal(t, tc.expectedStatus, recorder.Code) + if tc.expectedStatus != http.StatusOK { + return + } + // check response is expected + var received networkpolicy.EndpointQueryResponse + err = json.Unmarshal(recorder.Body.Bytes(), &received) + assert.Nil(t, err) + for i, policy := range tc.expectedContent.response.Endpoints[0].Policies { + assert.Equal(t, policy.Name, received.Endpoints[0].Policies[i].Name) + } + } +} diff --git a/pkg/controller/networkpolicy/controller_querier_replier.go b/pkg/controller/networkpolicy/controller_querier_replier.go new file mode 100644 index 00000000000..35c63da8366 --- /dev/null +++ b/pkg/controller/networkpolicy/controller_querier_replier.go @@ -0,0 +1,187 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package networkpolicy provides NetworkPolicyController implementation to manage +// and synchronize the Pods and Namespaces affected by Network Policies and enforce +// their rules. + +package networkpolicy + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + networkingv1beta1 "github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1" + "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store" + antreatypes "github.com/vmware-tanzu/antrea/pkg/controller/types" +) + +type EndpointQuerier interface { + QueryNetworkPolicies(namespace string, podName string) (*EndpointQueryResponse, error) +} + +// EndpointQueryReplier is responsible for handling query requests from antctl query +type EndpointQueryReplier struct { + networkPolicyController *NetworkPolicyController +} + +// EndpointQueryResponse is the reply struct for QueryNetworkPolicies +type EndpointQueryResponse struct { + Endpoints []Endpoint `json:"endpoints,omitempty"` +} + +// Endpoint holds response information for an endpoint following a query +type Endpoint struct { + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + Policies []Policy `json:"policies,omitempty"` + Rules []Rule `json:"rules,omitempty"` +} + +type PolicyRef struct { + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + UID types.UID `json:"uid,omitempty"` +} + +// Policy holds network policy information to be relayed to client following query endpoint +type Policy struct { + PolicyRef + selector metav1.LabelSelector `json:"selector,omitempty"` +} + +type Rule struct { + PolicyRef + Direction networkingv1beta1.Direction `json:"direction,omitempty"` + RuleIndex int `json:"ruleindex,omitempty"` +} + +// NewEndpointQueryReplier returns a new *NewEndpointQueryReplier. +func NewEndpointQueryReplier(networkPolicyController *NetworkPolicyController) *EndpointQueryReplier { + n := &EndpointQueryReplier{ + networkPolicyController: networkPolicyController, + } + return n +} + +// QueryNetworkPolicies returns kubernetes network policy references relevant to the selected network endpoint. Relevant +// policies fall into three categories: applied policies (Policies in Endpoint type) are policies which directly apply to +// an endpoint, egress and ingress rules (Rules in Endpoint type) are policies which reference the endpoint in an ingress/ +// egress rule respectively. +func (eq EndpointQueryReplier) QueryNetworkPolicies(namespace string, podName string) (*EndpointQueryResponse, error) { + // check if namespace and podName select an existing pod + pod, err := eq.networkPolicyController.podInformer.Lister().Pods(namespace).Get(podName) + if err != nil { + return nil, nil + } + type ruleTemp struct { + policy *antreatypes.NetworkPolicy + index int + } + // create network policies categories + applied := make([]*antreatypes.NetworkPolicy, 0) + ingress := make([]*ruleTemp, 0) + egress := make([]*ruleTemp, 0) + // get all appliedToGroups using filter, then get applied policies using appliedToGroup + appliedToGroupKeys := eq.networkPolicyController.filterAppliedToGroupsForPod(pod) + if err != nil { + return nil, err + } + for appliedToGroupKey := range appliedToGroupKeys { + policies, err := eq.networkPolicyController.internalNetworkPolicyStore.GetByIndex(store.AppliedToGroupIndex, + appliedToGroupKey) + if err != nil { + return nil, err + } + for _, policy := range policies { + applied = append(applied, policy.(*antreatypes.NetworkPolicy)) + } + } + // get all addressGroups using filter, then get ingress and egress policies using addressGroup + addressGroupKeys := eq.networkPolicyController.filterAddressGroupsForPod(pod) + if err != nil { + return nil, err + } + for addressGroupKey := range addressGroupKeys { + addressGroup, found, err := eq.networkPolicyController.addressGroupStore.Get(addressGroupKey) + if !found { + continue + } + policies, err := eq.networkPolicyController.internalNetworkPolicyStore.GetByIndex(store.AddressGroupIndex, + addressGroupKey) + if err != nil { + return nil, err + } + for _, policy := range policies { + for i, rule := range policy.(*antreatypes.NetworkPolicy).Rules { + for _, addressGroupTrial := range rule.To.AddressGroups { + if addressGroupTrial == string(addressGroup.(*antreatypes.AddressGroup).UID) { + egress = append(egress, &ruleTemp{policy: policy.(*antreatypes.NetworkPolicy), index: i}) + } + } + for _, addressGroupTrial := range rule.From.AddressGroups { + if addressGroupTrial == string(addressGroup.(*antreatypes.AddressGroup).UID) { + ingress = append(ingress, &ruleTemp{policy: policy.(*antreatypes.NetworkPolicy), index: i}) + } + } + } + } + } + // make response policies + responsePolicies := make([]Policy, 0) + for _, internalPolicy := range applied { + responsePolicy := Policy{ + PolicyRef: PolicyRef{ + Namespace: internalPolicy.Namespace, + Name: internalPolicy.Name, + UID: internalPolicy.UID, + }, + } + responsePolicies = append(responsePolicies, responsePolicy) + } + responseRules := make([]Rule, 0) + // create rules based on egress and ingress policies + for _, internalPolicy := range egress { + newRule := Rule{ + PolicyRef: PolicyRef{ + Namespace: internalPolicy.policy.Namespace, + Name: internalPolicy.policy.Name, + UID: internalPolicy.policy.UID, + }, + Direction: networkingv1beta1.DirectionOut, + RuleIndex: internalPolicy.index, + } + responseRules = append(responseRules, newRule) + } + for _, internalPolicy := range ingress { + newRule := Rule{ + PolicyRef: PolicyRef{ + Namespace: internalPolicy.policy.Namespace, + Name: internalPolicy.policy.Name, + UID: internalPolicy.policy.UID, + }, + Direction: networkingv1beta1.DirectionIn, + RuleIndex: internalPolicy.index, + } + responseRules = append(responseRules, newRule) + } + // for now, selector only selects a single endpoint (pod, namespace) + endpoint := Endpoint{ + Namespace: namespace, + Name: podName, + Policies: responsePolicies, + Rules: responseRules, + } + return &EndpointQueryResponse{[]Endpoint{endpoint}}, nil +} diff --git a/pkg/controller/networkpolicy/controller_query_perf_test.go b/pkg/controller/networkpolicy/controller_query_perf_test.go new file mode 100644 index 00000000000..6635d2db379 --- /dev/null +++ b/pkg/controller/networkpolicy/controller_query_perf_test.go @@ -0,0 +1,146 @@ +// +build !race + +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" +) + +/* +TestLargeScaleEndpointQueryManyPolicies tests the execution time and the memory usage of computing a scale +of 10k Namespaces, 10k NetworkPolicies, 10k Pods, where query returns every policy (applied + ingress). + +NAMESPACES PODS NETWORK-POLICIES TIME(s) MEMORY(M) +1 10000 10000 13.88 1092 + +The metrics are not accurate under the race detector, and will be skipped when testing with "-race". +*/ +func TestLargeScaleEndpointQueryManyPolicies(t *testing.T) { + namespace := rand.String(8) + getObjects := func() ([]*v1.Namespace, []*networkingv1.NetworkPolicy, []*v1.Pod) { + namespaces := []*v1.Namespace{ + { + ObjectMeta: metav1.ObjectMeta{Name: namespace, Labels: map[string]string{"app": namespace}}, + }, + } + uid := rand.String(8) + networkPolicies := []*networkingv1.NetworkPolicy{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "np-1" + uid, UID: types.UID(uuid.New().String())}, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"app-1": "scale-1"}}, + PolicyTypes: []networkingv1.PolicyType{networkingv1.PolicyTypeIngress, networkingv1.PolicyTypeEgress}, + Ingress: []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{ + { + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app-1": "scale-1"}, + }, + }, + }, + }, + }, + }, + }, + } + pods := []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: "pod1" + uid, UID: types.UID(uuid.New().String()), Labels: map[string]string{"app-1": "scale-1"}}, + Spec: v1.PodSpec{NodeName: getRandomNodeName()}, + Status: v1.PodStatus{PodIP: getRandomIP()}, + }, + } + return namespaces, networkPolicies, pods + } + namespaces, networkPolicies, pods := getXObjects(10000, getObjects) + testQueryEndpoint(t, 25*time.Second, namespaces[0:1], networkPolicies, pods, 10000) +} + +func testQueryEndpoint(t *testing.T, maxExecutionTime time.Duration, namespaces []*v1.Namespace, networkPolicies []*networkingv1.NetworkPolicy, pods []*v1.Pod, responseLength int) { + // Stat the maximum heap allocation. + var wg sync.WaitGroup + stopCh := make(chan struct{}) + var maxAlloc uint64 + wg.Add(1) + go func() { + statMaxMemAlloc(&maxAlloc, 500*time.Millisecond, stopCh) + wg.Done() + }() + // create controller + objs := toRunTimeObjects(namespaces, networkPolicies, pods) + querier := makeControllerAndEndpointQueryReplier(objs...) + // Everything is ready, now start timing. + start := time.Now() + // track execution time by calling query endpoint 1000 times on random pods + for i := 0; i < 1000; i++ { + pod, namespace := pods[i].Name, pods[i].Namespace + response, err := querier.QueryNetworkPolicies(namespace, pod) + require.Equal(t, err, nil) + require.Equal(t, len(response.Endpoints[0].Policies), responseLength) + } + // Stop tracking go routines + stopCh <- struct{}{} + // Minus the idle time to get the actual execution time. + executionTime := time.Since(start) + if executionTime > maxExecutionTime { + t.Errorf("The actual execution time %v is greater than the maximum value %v", executionTime, maxExecutionTime) + } + + // Block until all statistics are done. + wg.Wait() + + t.Logf(`Summary metrics: +NAMESPACES PODS NETWORK-POLICIES TIME(s) MEMORY(M) +%-12d %-7d %-19d %-10.2f %-12d +`, len(namespaces), len(pods), len(networkPolicies), float64(executionTime)/float64(time.Second), maxAlloc/1024/1024) +} + +/* +Note, this is performance data relevant to a previous implementation of QueryNetworkPolicy + +Without indexing + +query +100k policies and 100k pods with 1 applied policy per pod takes 16++ seconds for 100 queries (2341++M max heap) +10k policies and 10k pods with each policy applied to all pods takes 80++ seconds for 10 queries (1038++M max heap) + +init +100k policies and 100k pods with 1 applied policy per pod takes 12++ seconds (1555++M max heap) +10k policies and 10k pods with each policy applied to all pods takes 7++ seconds (1138++M max heap) + + +With indexing + +query +100k policies and 100k pods with 1 applied policy per pod takes .1 seconds for 1000 queries (2793M max heap) +10k policies and 10k pods with each policy applied to all pods takes 13 seconds for 1000 queries (1092M max heap) + +init +100k policies and 100k pods with 1 applied policy per pod takes 13 seconds (1849M max heap) +10k policies and 10k pods with each policy applied to all pods takes 11 seconds (1185M max heap) +*/ diff --git a/pkg/controller/networkpolicy/controller_query_test.go b/pkg/controller/networkpolicy/controller_query_test.go new file mode 100644 index 00000000000..ecdccfe79d4 --- /dev/null +++ b/pkg/controller/networkpolicy/controller_query_test.go @@ -0,0 +1,228 @@ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package networkpolicy + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1" + v1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "testing" + "time" +) + +// pods represent kubernetes pods for testing proper query results +var pods = []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "podA", + Namespace: "testNamespace", + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "container-1", + }}, + NodeName: "nodeA", + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + PodIP: "1.2.3.4", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "podB", + Namespace: "testNamespace", + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "container-1", + }}, + NodeName: "nodeA", + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + PodIP: "1.2.3.4", + }, + }, +} + +// polices represent kubernetes policies for testing proper query results +// +// policy 0: select all pods and deny default ingress +// policy 1: select all pods and deny default egress + +var policies = []networkingv1.NetworkPolicy{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ingress-egress", + Namespace: "testNamespace", + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Ingress: []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{ + { + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + MatchExpressions: nil, + }, + }, + }, + }, + }, + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{ + { + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + MatchExpressions: nil, + }, + }, + }, + }, + }, + PolicyTypes: []networkingv1.PolicyType{ + networkingv1.PolicyTypeEgress, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "default-deny-egress", + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + PolicyTypes: []networkingv1.PolicyType{ + networkingv1.PolicyTypeEgress, + }, + }, + }, +} + +var namespaces = []v1.Namespace{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNamespace", + UID: "testNamespaceUID", + }, + }, +} + +func makeControllerAndEndpointQueryReplier(objects ...runtime.Object) *EndpointQueryReplier { + // create controller + _, c := newController(objects...) + c.heartbeatCh = make(chan heartbeat, 1000) + stopCh := make(chan struct{}) + // create querier with stores inside controller + querier := NewEndpointQueryReplier(c.NetworkPolicyController) + // start informers and run controller + c.informerFactory.Start(stopCh) + go c.Run(stopCh) + // wait until computation is done, we assume it is done when no signal has been received on heartbeat channel for 500ms + idleTimeout := 500 * time.Millisecond + timer := time.NewTimer(idleTimeout) + func() { + for { + timer.Reset(idleTimeout) + select { + case <-c.heartbeatCh: + continue + case <-timer.C: + close(stopCh) + return + } + } + }() + // block until computation complete + <-stopCh + return querier +} + +// TestInvalidSelector tests the result of QueryNetworkPolicy when the selector (right now pod, namespace) does not +// select any pods +func TestInvalidSelector(t *testing.T) { + endpointQuerier := makeControllerAndEndpointQueryReplier() + // test appropriate response to QueryNetworkPolices + namespace, pod := "non-existing-namespace", "non-existing-pod" + response, err := endpointQuerier.QueryNetworkPolicies(namespace, pod) + if response != nil { + assert.Fail(t, "expected nil endpoints") + } + assert.Equal(t, nil, err, "expected not nil error") +} + +// TestNoPolicy tests the result of QueryNetworkPolicy when no policies are relevant to the endpoint +func TestNoPolicy(t *testing.T) { + endpointQuerier := makeControllerAndEndpointQueryReplier(&namespaces[0], &pods[0]) + namespace1, pod1 := "testNamespace", "podA" + response1, err := endpointQuerier.QueryNetworkPolicies(namespace1, pod1) + require.Equal(t, nil, err) + // test applied policy response + assert.Equal(t, 0, len(response1.Endpoints[0].Policies)) + // test egress + ingress policy response + assert.Equal(t, 0, len(response1.Endpoints[0].Rules)) +} + +// TestSingleAppliedPolicy tests the result of QueryNetworkPolicy when the selector (right now pod, namespace) selects a +// pod which has a single network policy object applied to it +func TestSingleAppliedIngressEgressPolicy(t *testing.T) { + endpointQuerier := makeControllerAndEndpointQueryReplier(&namespaces[0], &pods[0], &policies[0]) + namespace1, pod1 := "testNamespace", "podA" + response1, err := endpointQuerier.QueryNetworkPolicies(namespace1, pod1) + require.Equal(t, nil, err) + // test applied policy response + assert.Equal(t, "test-ingress-egress", response1.Endpoints[0].Policies[0].PolicyRef.Name) + // test egress policy response + assert.Equal(t, v1beta1.DirectionOut, response1.Endpoints[0].Rules[0].Direction) + assert.Equal(t, "test-ingress-egress", response1.Endpoints[0].Rules[0].PolicyRef.Name) + // test ingress policy response + assert.Equal(t, v1beta1.DirectionIn, response1.Endpoints[0].Rules[1].Direction) + assert.Equal(t, "test-ingress-egress", response1.Endpoints[0].Rules[1].Name) +} + +// TestMultiplePolicy tests the result of QueryNetworkPolicy when the selector (right now pod, namespace) selects +// a pod which has multiple networkpolicies which define policies on it. +func TestMultiplePolicy(t *testing.T) { + endpointQuerier := makeControllerAndEndpointQueryReplier(&namespaces[0], &pods[0], &policies[0], &policies[1]) + namespace1, pod1 := "testNamespace", "podA" + response, err := endpointQuerier.QueryNetworkPolicies(namespace1, pod1) + require.Equal(t, nil, err) + assert.True(t, response.Endpoints[0].Policies[0].Name == "default-deny-egress" || + response.Endpoints[0].Policies[0].Name == "test-ingress-egress") + assert.True(t, response.Endpoints[0].Policies[1].Name == "default-deny-egress" || + response.Endpoints[0].Policies[1].Name == "test-ingress-egress") +} diff --git a/pkg/controller/networkpolicy/networkpolicy_controller.go b/pkg/controller/networkpolicy/networkpolicy_controller.go index be0f6fad259..56417c99d0e 100644 --- a/pkg/controller/networkpolicy/networkpolicy_controller.go +++ b/pkg/controller/networkpolicy/networkpolicy_controller.go @@ -1339,7 +1339,7 @@ func (n *NetworkPolicyController) syncAddressGroup(key string) error { // No need to insert Pod IPAddress when it is unset. continue } - podSet.Insert(podToMemberPod(pod, true, false)) + podSet.Insert(podToMemberPod(pod, true, true)) } for _, entity := range externalEntities { memberSet.Insert(externalEntityToGroupMember(entity))