Skip to content

Commit

Permalink
Add support for Nomad transparent proxy
Browse files Browse the repository at this point in the history
Nomad will implement support for Connect transparent proxy. Unlike in K8s, the
CNI plugin can't contact the Nomad API to read allocation metadata (pod labels)
to get the iptables configuration, and doesn't use the rest of the Consul-K8s
control plane to inject that metadata. Instead, Nomad will pass the iptables
configuration JSON-serialized in the CNI arguments.

This changeset implements the behavior switch by detecting the `IPTABLES_CONFIG`
argument in the CNI arguments. This hypothetically allows for non-Nomad
workflows to use the same code path, if desired.

Ref: hashicorp/nomad#10628
  • Loading branch information
tgross committed Mar 26, 2024
1 parent 6464f6b commit 47df9ff
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pkg/
.idea/
.vscode
.bob/
control-plane/cni/cni
120 changes: 80 additions & 40 deletions control-plane/cni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type CNIArgs struct {
K8S_POD_NAMESPACE types.UnmarshallableString
// K8S_POD_INFRA_CONTAINER_ID is the runtime container ID that the pod runs under.
K8S_POD_INFRA_CONTAINER_ID types.UnmarshallableString

// IPTABLES_CONFIG is the runtime iptables configuration passed by
// orchestrator (ex. the Nomad client agent)
IPTABLES_CONFIG types.UnmarshallableString
}

// PluginConf is is the configuration used by the plugin.
Expand All @@ -95,9 +99,8 @@ type PluginConf struct {
Multus bool `json:"multus"`
// Kubeconfig file name. Can be set as a cli flag.
Kubeconfig string `json:"kubeconfig"`
// LogLevl is the logging level. Can be set as a cli flag.
// LogLevel is the logging level. Can be set as a cli flag.
LogLevel string `json:"log_level"`
//
}

// parseConfig parses the supplied CNI configuration (and prevResult) from stdin.
Expand Down Expand Up @@ -132,9 +135,11 @@ func (c *Command) cmdAdd(args *skel.CmdArgs) error {

podNamespace := string(cniArgs.K8S_POD_NAMESPACE)
podName := string(cniArgs.K8S_POD_NAME)
cniArgsIPTablesCfg := string(cniArgs.IPTABLES_CONFIG)

// We should never encounter this unless there has been an error in the kubelet. A good safeguard.
if podNamespace == "" || podName == "" {
// We should never encounter this unless there has been an error in the
// kubelet. A good safeguard.
if (podNamespace == "" || podName == "") && cniArgsIPTablesCfg == "" {
return fmt.Errorf("not running in a pod, namespace and pod should have values")
}

Expand Down Expand Up @@ -167,49 +172,54 @@ func (c *Command) cmdAdd(args *skel.CmdArgs) error {
result = prevResult
}

ctx := context.Background()
if c.client == nil {
var iptablesCfg iptables.Config

// Connect to kubernetes.
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(cfg.CNINetDir, cfg.Kubeconfig))
if cniArgsIPTablesCfg != "" {
var err error
iptablesCfg, err = parseIPTablesFromCNIArgs(cniArgsIPTablesCfg)
if err != nil {
return fmt.Errorf("could not get rest config from kubernetes api: %s", err)
return err
}
} else {
if c.client == nil {
if err := c.createK8sClient(cfg); err != nil {
return err
}
}

c.client, err = kubernetes.NewForConfig(restConfig)
ctx := context.Background()
pod, err := c.client.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error initializing Kubernetes client: %s", err)
return fmt.Errorf("error retrieving pod: %s", err)
}
}

pod, err := c.client.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error retrieving pod: %s", err)
}

// Skip traffic redirection if the correct annotations are not on the pod.
if skipTrafficRedirection(*pod) {
logger.Debug("skipping traffic redirection because the pod is either not injected or transparent proxy is disabled: %s", pod.Name)
return types.PrintResult(result, cfg.CNIVersion)
}
// Skip traffic redirection if the correct annotations are not on the pod.
if skipTrafficRedirection(*pod) {
logger.Debug("skipping traffic redirection because the pod is either not injected or transparent proxy is disabled: %s", pod.Name)
return types.PrintResult(result, cfg.CNIVersion)
}

// We do not throw an error here because kubernetes will often throw a benign error where the pod has been
// updated in between the get and update of the annotation. Eventually kubernetes will update the annotation
ok := c.updateTransparentProxyStatusAnnotation(podName, podNamespace, waiting)
if !ok {
logger.Info("unable to update %s pod annotation to waiting", keyTransparentProxyStatus)
}
// We do not throw an error here because kubernetes will often throw a
// benign error where the pod has been updated in between the get and
// update of the annotation. Eventually kubernetes will update the
// annotation
ok := c.updateTransparentProxyStatusAnnotation(podName, podNamespace, waiting)
if !ok {
logger.Info("unable to update %s pod annotation to waiting", keyTransparentProxyStatus)
}

// Parse the cni-proxy-config annotation into an iptables.Config object.
iptablesCfg, err := parseAnnotation(*pod, annotationRedirectTraffic)
if err != nil {
return err
// Parse the cni-proxy-config annotation into an iptables.Config object.
iptablesCfg, err = parseAnnotation(*pod, annotationRedirectTraffic)
if err != nil {
return err
}
}

// Set NetNS passed through the CNI.
iptablesCfg.NetNS = args.Netns

// Set the provider to a fake provider in testing, otherwise use the default iptables.Provider
// Set the provider to a fake provider in testing, otherwise use the default
// iptables.Provider
if c.iptablesProvider != nil {
iptablesCfg.IptablesProvider = c.iptablesProvider
}
Expand All @@ -220,15 +230,21 @@ func (c *Command) cmdAdd(args *skel.CmdArgs) error {
return fmt.Errorf("could not apply iptables setup: %v", err)
}

// We do not throw an error here because kubernetes will often throw a benign error where the pod has been
// updated in between the get and update of the annotation. Eventually kubernetes will update the annotation
ok = c.updateTransparentProxyStatusAnnotation(podName, podNamespace, complete)
if !ok {
logger.Info("unable to update %s pod annotation to complete", keyTransparentProxyStatus)
if cniArgsIPTablesCfg == "" {

// We do not throw an error here because kubernetes will often throw a
// benign error where the pod has been updated in between the get and update
// of the annotation. Eventually kubernetes will update the annotation
ok := c.updateTransparentProxyStatusAnnotation(podName, podNamespace, complete)
if !ok {
logger.Info("unable to update %s pod annotation to complete", keyTransparentProxyStatus)
}
}

logger.Debug("traffic redirect rules applied to pod: %s", pod.Name)
// Pass through the result for the next plugin even though we are the final plugin in the chain.
logger.Debug("traffic redirect rules applied to pod: %s", podName)

// Pass through the result for the next plugin even if we are the final
// plugin in the chain.
return types.PrintResult(result, cfg.CNIVersion)
}

Expand All @@ -249,6 +265,21 @@ func main() {
skel.PluginMain(c.cmdAdd, cmdCheck, cmdDel, version.All, bv.BuildString("consul-cni"))
}

// createK8sClient configures the command's Kubernetes API client if it doesn't
// already exist
func (c *Command) createK8sClient(cfg *PluginConf) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(cfg.CNINetDir, cfg.Kubeconfig))
if err != nil {
return fmt.Errorf("could not get rest config from kubernetes api: %s", err)
}

c.client, err = kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("error initializing Kubernetes client: %s", err)
}
return nil
}

// skipTrafficRedirection looks for annotations on the pod and determines if it should skip traffic redirection.
// The absence of the annotations is the equivalent of "disabled" because it means that the connect-inject mutating
// webhook did not run against the pod.
Expand All @@ -267,6 +298,15 @@ func skipTrafficRedirection(pod corev1.Pod) bool {
return false
}

func parseIPTablesFromCNIArgs(args string) (iptables.Config, error) {
cfg := iptables.Config{}
err := json.Unmarshal([]byte(args), &cfg)
if err != nil {
return cfg, fmt.Errorf("could not unmarshal CNI args: %w", err)
}
return cfg, nil
}

// parseAnnotation parses the cni-proxy-config annotation into an iptables.Config object.
func parseAnnotation(pod corev1.Pod, annotation string) (iptables.Config, error) {
anno, ok := pod.Annotations[annotation]
Expand Down
90 changes: 87 additions & 3 deletions control-plane/cni/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Test_cmdAdd(t *testing.T) {
cmd *Command
podName string
stdInData string
cmdArgs *skel.CmdArgs
configuredPod func(*corev1.Pod, *Command) *corev1.Pod
expectedRules bool
expectedErr error
Expand Down Expand Up @@ -127,12 +128,33 @@ func Test_cmdAdd(t *testing.T) {
expectedErr: nil,
expectedRules: true, // Rules will be applied
},
{
name: "Parsing iptables from CNI_ARGs as in Nomad",
cmd: &Command{
client: fake.NewSimpleClientset(),
iptablesProvider: &fakeIptablesProvider{},
},
cmdArgs: &skel.CmdArgs{ContainerID: "some-container-id",
IfName: "eth0",
Args: fmt.Sprintf("IPTABLES_CONFIG=%s", minimalIPTablesJSON(t)),
Path: "/some/bin/path",
},
stdInData: nomadStdinData,
expectedErr: nil,
expectedRules: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_ = c.configuredPod(minimalPod(c.podName), c.cmd)
err := c.cmd.cmdAdd(minimalSkelArgs(c.podName, defaultNamespace, c.stdInData))
require.Equal(t, c.expectedErr, err)
if c.cmdArgs != nil {
c.cmdArgs.StdinData = []byte(c.stdInData)
err := c.cmd.cmdAdd(c.cmdArgs)
require.Equal(t, c.expectedErr, err)
} else {
_ = c.configuredPod(minimalPod(c.podName), c.cmd)
err := c.cmd.cmdAdd(minimalSkelArgs(c.podName, defaultNamespace, c.stdInData))
require.Equal(t, c.expectedErr, err)
}

// Check to see that rules have been generated
if c.expectedErr == nil && c.expectedRules {
Expand Down Expand Up @@ -355,3 +377,65 @@ const missingIPsStdinData = `{
"name": "consul-cni",
"type": "consul-cni"
}`

// TODO: need to make this match what Nomad emits
const nomadStdinData = `{
"cniVersion": "0.3.1",
"name": "kindnet",
"type": "kindnet",
"capabilities": {
"testCapability": false
},
"ipam": {
"type": "host-local"
},
"dns": {
"nameservers": ["nameserver"],
"domain": "domain",
"search": ["search"],
"options": ["option"]
},
"prevResult": {
"cniversion": "0.3.1",
"interfaces": [
{
"name": "eth0",
"sandbox": "/tmp"
}
],
"ips": [
{
"version": "4",
"address": "10.0.0.2/24",
"gateway": "10.0.0.1",
"interface": 0
}
],
"routes": []
},
"cni_bin_dir": "/opt/cni/bin",
"cni_net_dir": "/etc/cni/net.d",
"log_level": "info",
"name": "consul-cni",
"type": "consul-cni"
}
`

func minimalIPTablesJSON(t *testing.T) string {
cfg := iptables.Config{
ConsulDNSIP: "127.0.0.1",
ConsulDNSPort: 8600,
ProxyUserID: "101",
ProxyInboundPort: 20000,
ProxyOutboundPort: 15001,
ExcludeInboundPorts: []string{"9000"},
ExcludeOutboundPorts: []string{"15002"},
ExcludeOutboundCIDRs: []string{"10.0.0.0/24"},
ExcludeUIDs: []string{"1", "42"},
NetNS: "/some/netns/path",
}
buf, err := json.Marshal(cfg)
require.NoError(t, err)
return string(buf)
}

0 comments on commit 47df9ff

Please sign in to comment.