Skip to content

Commit

Permalink
Implements antctl query endpoint (Fixes antrea-io#974)
Browse files Browse the repository at this point in the history
Antctl query endpoint gives users the ability to filter
network policies relevant to a certain endpoint. This
commit gives a high level architecture for selective querying
of internal antrea objects for easy resource tracking of
large clusters, along with an implementation of endpoint querying.

Note, antctl query endpoint only works when run in the controller.
Proxy issues will be resolved shortly.
  • Loading branch information
jakesokol1 committed Jul 30, 2020
1 parent 1732a8a commit 401698a
Show file tree
Hide file tree
Showing 20 changed files with 1,252 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ endif
build: build-ubuntu

.PHONY: test
test: golangci
#test: golangci
test: build
test: docker-test-unit
test: docker-test-integration
Expand Down
8 changes: 7 additions & 1 deletion cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/vmware-tanzu/antrea/pkg/apiserver/openapi"
"github.com/vmware-tanzu/antrea/pkg/apiserver/storage"
crdinformers "github.com/vmware-tanzu/antrea/pkg/client/informers/externalversions"
queryapiserver "github.com/vmware-tanzu/antrea/pkg/controller/apiserver"
"github.com/vmware-tanzu/antrea/pkg/controller/metrics"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy/store"
Expand Down Expand Up @@ -87,7 +88,10 @@ func run(o *Options) error {
appliedToGroupStore,
networkPolicyStore)

controllerQuerier := querier.NewControllerQuerier(networkPolicyController, o.config.APIPort)
endpointQueryReplier := networkpolicy.NewEndpointQueryReplier(networkPolicyController)

controllerQuerier := querier.NewControllerQuerier(networkPolicyController, endpointQueryReplier,
o.config.APIPort)

controllerMonitor := monitor.NewControllerMonitor(crdClient, nodeInformer, controllerQuerier)

Expand All @@ -113,6 +117,8 @@ func run(o *Options) error {
if err != nil {
return fmt.Errorf("error creating API server: %v", err)
}
// install handlers for query functionality onto apiServer
queryapiserver.InstallHandlers(*endpointQueryReplier, apiServer.GenericAPIServer)

// Set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ MOCKGEN_TARGETS=(
"pkg/ovs/ovsctl OVSCtlClient"
"pkg/agent/querier AgentQuerier"
"pkg/controller/querier ControllerQuerier"
"pkg/querier AgentNetworkPolicyInfoQuerier"
"pkg/querier AgentNetworkPolicyInfoQuerier,ControllerNetworkPolicyInfoQuerier"
"pkg/agent/flowexporter/connections ConnTrackDumper,ConnTrackInterfacer"
)

Expand Down
32 changes: 32 additions & 0 deletions pkg/antctl/antctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
systemv1beta1 "github.com/vmware-tanzu/antrea/pkg/apis/system/v1beta1"
controllerinforest "github.com/vmware-tanzu/antrea/pkg/apiserver/registry/system/controllerinfo"
"github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned/scheme"
controllernetworkpolicy "github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy"
)

// CommandList defines all commands that could be used in the antctl for both agents
Expand Down Expand Up @@ -317,6 +318,37 @@ var CommandList = &commandList{
commandGroup: flat,
transformedResponse: reflect.TypeOf(ovstracing.Response{}),
},
{
use: "endpoint",
aliases: []string{"endpoints"},
short: "Filter network policies relevant to an endpoint.",
long: "Filter network policies relevant to an endpoint into three categories: network policies which apply to the endpoint and policies which select the endpoint in an ingress and/or egress rule.",
example: ` Query network policies given pod and namespace
$ antctl query endpoint -p pod1 -n ns1
`,
commandGroup: query,
controllerEndpoint: &endpoint{
nonResourceEndpoint: &nonResourceEndpoint{
path: "/endpoint",
params: []flagInfo{
{
name: "namespace",
usage: "Namespace of the entity (required)",
shorthand: "n",
},
{
name: "pod",
usage: "Name of a local Pod (required)",
shorthand: "p",
},
},
outputType: single,
},
addonTransform: nil,
},
agentEndpoint: nil,
transformedResponse: reflect.TypeOf(controllernetworkpolicy.EndpointQueryResponse{}),
},
},
rawCommands: []rawCommand{
{
Expand Down
173 changes: 162 additions & 11 deletions pkg/antctl/command_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/vmware-tanzu/antrea/pkg/apis/networking/v1beta1"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy"
"io"
"os"
"reflect"
"sort"
"strconv"
"strings"
"text/tabwriter"

Expand Down Expand Up @@ -72,6 +75,7 @@ const (
const (
flat commandGroup = iota
get
query
)

var groupCommands = map[commandGroup]*cobra.Command{
Expand All @@ -80,6 +84,11 @@ var groupCommands = map[commandGroup]*cobra.Command{
Short: "Get the status or resource of a topic",
Long: "Get the status or resource of a topic",
},
query: {
Use: "query",
Short: "List relevant resources to an endpoint or policy",
Long: "List relevant resources to an endpoint or policy",
},
}

type endpointResponder interface {
Expand Down Expand Up @@ -242,7 +251,6 @@ func (cd *commandDefinition) applySubCommandToRoot(root *cobra.Command, client *
cmd.RunE = cd.newCommandRunE(client)
}

// validate checks if the commandDefinition is valid.
func (cd *commandDefinition) validate() []error {
var errs []error
if len(cd.use) == 0 {
Expand Down Expand Up @@ -430,19 +438,24 @@ func (cd *commandDefinition) tableOutputForGetCommands(obj interface{}, writer i
return true
})
}
// Construct the table.
numRows, numCol := len(list)+1, len(args)
widths := getColumnWidths(numRows, numCol, rows)
return constructTable(numRows, numCol, widths, rows, writer)
}

numColumns := len(args)
widths := make([]int, numColumns)
if numColumns == 1 {
func getColumnWidths(numRows int, numCol int, rows [][]string) []int {
widths := make([]int, numCol)
if numCol == 1 {
// Do not limit the column length for a single column table.
// This is for the case a single column table can have long rows which cannot
// fit into a single line (one example is the ovsflows outputs).
widths[0] = 0
} else {
// Get the width of every column.
for j := 0; j < numColumns; j++ {
for j := 0; j < numCol; j++ {
width := len(rows[0][j])
for i := 1; i < len(list)+1; i++ {
for i := 1; i < numRows; i++ {
if len(rows[i][j]) == 0 {
rows[i][j] = "<NONE>"
}
Expand All @@ -456,11 +469,13 @@ func (cd *commandDefinition) tableOutputForGetCommands(obj interface{}, writer i
}
}
}
return widths
}

// Construct the table.
func constructTable(numRows int, numCol int, widths []int, rows [][]string, writer io.Writer) error {
var buffer bytes.Buffer
for i := 0; i < len(list)+1; i++ {
for j := 0; j < len(args); j++ {
for i := 0; i < numRows; i++ {
for j := 0; j < numCol; j++ {
val := ""
if j != 0 {
val = " " + val
Expand All @@ -480,6 +495,136 @@ func (cd *commandDefinition) tableOutputForGetCommands(obj interface{}, writer i
return nil
}


// tableOutputForQueryEndpoint implements printing sub tables (list of tables) for each response, utilizing constructTable
// with multiplicity.
func (cd *commandDefinition) tableOutputForQueryEndpoint(obj interface{}, writer io.Writer) error {
// intermittent new line buffer
var buffer bytes.Buffer
newLine := func() error {
buffer.WriteString("\n")
if _, err := io.Copy(writer, &buffer); err != nil {
return fmt.Errorf("error when copy output into writer: %w", err)
}
buffer.Reset()
return nil
}
// sort rows of sub table
sortRows := func(rows [][]string) {
body := rows[1:]
sort.Slice(body, func(i, j int) bool {
for k := range body[i] {
if body[i][k] != body[j][k] {
return body[i][k] < body[j][k]
}
}
return true
})
}
// constructs sub tables for responses
constructSubTable := func(header [][]string, body [][]string) error {
rows := append(header, body...)
sortRows(rows)
numRows, numCol := len(rows), len(rows[0])
widths := getColumnWidths(numRows, numCol, rows)
if err := constructTable(numRows, numCol, widths, rows, writer); err != nil {
return err
}
return nil
}
// construct sections of sub tables for responses (applied, ingress, egress)
constructSection := func(label [][]string, header [][]string, body [][]string, nonZero bool) error {
if err := constructSubTable(label, [][]string{}); err != nil {
return err
}
if nonZero {
if err := constructSubTable(header, body); err != nil {
return err
}
}
if err := newLine(); err != nil {
return err
}
return nil
}
// response is string representation of endpoint response
type response struct {
Namespace string
Name string
Policies [][]string
EgressRules [][]string
IngressRules [][]string
}
// iterate through each endpoint and construct response
endpointQueryResponse := obj.(*networkpolicy.EndpointQueryResponse)
responses := make([]*response, 0)
for _, endpoint := range endpointQueryResponse.Endpoints {
// transform applied policies to string representation
policies := make([][]string, 0)
for _, policy := range endpoint.Policies {
policyStr := []string{policy.Name, policy.Namespace, string(policy.UID)}
policies = append(policies, policyStr)
}
// transform egress and ingress rules to string representation
egress, ingress := make([][]string, 0), make([][]string, 0)
for _, rule := range endpoint.Rules {
ruleStr := []string{rule.Name, rule.Namespace, strconv.Itoa(rule.RuleIndex), string(rule.UID)}
if rule.Direction == v1beta1.DirectionIn {
ingress = append(ingress, ruleStr)
} else if rule.Direction == v1beta1.DirectionOut {
egress = append(egress, ruleStr)
} else {
panic("Unimplemented direction")
}
}
// create full response
response := &response{
Namespace: endpoint.Namespace,
Name: endpoint.Name,
Policies: policies,
EgressRules: egress,
IngressRules: ingress,
}

responses = append(responses, response)
}
// print each response as its own table (each representing a unique endpoint)
for _, r := range responses {
// table label
if err := constructSubTable([][]string{{"Endpoint " + r.Namespace + "/" + r.Name}}, [][]string{}); err != nil {
return err
}
// applied policies
nonZero := len(r.Policies) > 0
policyLabel := []string{"Applied Policies: None"}
if nonZero {
policyLabel = []string{"Applied Policies:"}
}
if err := constructSection([][]string{policyLabel}, [][]string{{"Name", "Namespace", "UID"}}, r.Policies, nonZero); err != nil {
return err
}
// egress rules
nonZero = len(r.EgressRules) > 0
egressLabel := []string{"Egress Rules: None"}
if nonZero {
egressLabel = []string{"Egress Rules:"}
}
if err := constructSection([][]string{egressLabel}, [][]string{{"Name", "Namespace", "Index", "UID"}}, r.EgressRules, nonZero); err != nil {
return err
}
// ingress rules
nonZero = len(r.IngressRules) > 0
ingressLabel := []string{"Ingress Rules: None"}
if nonZero {
ingressLabel = []string{"Ingress Rules:"}
}
if err := constructSection([][]string{ingressLabel}, [][]string{{"Name", "Namespace", "Index", "UID"}}, r.IngressRules, nonZero); err != nil {
return err
}
}
return nil
}

func (cd *commandDefinition) tableOutput(obj interface{}, writer io.Writer) error {
target, err := respTransformer(obj)
if err != nil {
Expand Down Expand Up @@ -574,7 +719,6 @@ func (cd *commandDefinition) output(resp io.Reader, writer io.Writer, ft formatt
}
klog.Infof("After transforming %v", obj)
}

// Output structure data in format
switch ft {
case jsonFormatter:
Expand All @@ -584,6 +728,12 @@ func (cd *commandDefinition) output(resp io.Reader, writer io.Writer, ft formatt
case tableFormatter:
if cd.commandGroup == get {
return cd.tableOutputForGetCommands(obj, writer)
} else if cd.commandGroup == query {
if cd.controllerEndpoint.nonResourceEndpoint.path == "/endpoint" {
return cd.tableOutputForQueryEndpoint(obj, writer)
} else {
panic("unimplemented")
}
} else {
return cd.tableOutput(obj, writer)
}
Expand Down Expand Up @@ -668,6 +818,8 @@ func (cd *commandDefinition) applyFlagsToCommand(cmd *cobra.Command) {
}
if cd.commandGroup == get {
cmd.Flags().StringP("output", "o", "table", "output format: json|table|yaml")
} else if cd.commandGroup == query {
cmd.Flags().StringP("output", "o", "table", "output format: json|table|yaml")
} else {
cmd.Flags().StringP("output", "o", "yaml", "output format: json|table|yaml")
}
Expand All @@ -683,7 +835,6 @@ func (cd *commandDefinition) applyExampleToCommand(cmd *cobra.Command) {
cmd.Example = cd.example
return
}

var commands []string
for iter := cmd; iter != nil; iter = iter.Parent() {
commands = append(commands, iter.Name())
Expand Down
4 changes: 4 additions & 0 deletions pkg/antctl/command_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func (cl *commandList) GetDebugCommands(mode string) [][]string {
var allCommands [][]string
for i := range cl.definitions {
def := cl.definitions[i]
// TODO: incorporate query commands into e2e testing once proxy access is implemented
if def.commandGroup == query {
continue
}
if mode == runtime.ModeAgent && def.agentEndpoint != nil ||
mode == runtime.ModeController && def.controllerEndpoint != nil {
var currentCommand []string
Expand Down
25 changes: 25 additions & 0 deletions pkg/controller/apiserver/apiserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2020 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"github.com/vmware-tanzu/antrea/pkg/controller/apiserver/handlers/endpoint"
"github.com/vmware-tanzu/antrea/pkg/controller/networkpolicy"
genericapiserver "k8s.io/apiserver/pkg/server"
)

func InstallHandlers(eq networkpolicy.EndpointQueryReplier, s *genericapiserver.GenericAPIServer) {
s.Handler.NonGoRestfulMux.HandleFunc("/endpoint", endpoint.HandleFunc(eq))
}
Loading

0 comments on commit 401698a

Please sign in to comment.