Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom vpc pod support tcp http probe with tproxy method #3024

Merged
merged 22 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/templates/ovncni-ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ spec:
- --log_file_max_size=0
- --enable-metrics={{- .Values.networking.ENABLE_METRICS }}
- --kubelet-dir={{ .Values.kubelet_conf.KUBELET_DIR }}
- --enable-tproxy={{ .Values.func.ENABLE_TPROXY }}
securityContext:
runAsUser: 0
privileged: true
Expand Down
1 change: 1 addition & 0 deletions charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func:
LOGICAL_GATEWAY: false
ENABLE_BIND_LOCAL_IP: true
U2O_INTERCONNECTION: false
ENABLE_TPROXY: false

ipv4:
POD_CIDR: "10.16.0.0/16"
Expand Down
15 changes: 1 addition & 14 deletions cmd/daemon/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"k8s.io/klog/v2"
"k8s.io/sample-controller/pkg/signals"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
"github.com/kubeovn/kube-ovn/pkg/daemon"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -96,19 +95,7 @@ func CmdMain() {
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

addr := "0.0.0.0"
if os.Getenv("ENABLE_BIND_LOCAL_IP") == "true" {
podIpsEnv := os.Getenv("POD_IPS")
podIps := strings.Split(podIpsEnv, ",")
// when pod in dual mode, golang can't support bind v4 and v6 address in the same time,
// so not support bind local ip when in dual mode
if len(podIps) == 1 {
addr = podIps[0]
if util.CheckProtocol(podIps[0]) == kubeovnv1.ProtocolIPv6 {
addr = fmt.Sprintf("[%s]", podIps[0])
}
}
}
addr := util.GetDefaultListenAddr()

if config.EnableVerboseConnCheck {
go func() {
Expand Down
2 changes: 2 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ IFACE=${IFACE:-}
# Note that the dpdk tunnel iface and tunnel ip cidr should be diffierent with Kubernetes api cidr, otherwise the route will be a problem.
DPDK_TUNNEL_IFACE=${DPDK_TUNNEL_IFACE:-br-phy}
ENABLE_BIND_LOCAL_IP=${ENABLE_BIND_LOCAL_IP:-true}
ENABLE_TPROXY=${ENABLE_TPROXY:-false}

# debug
DEBUG_WRAPPER=${DEBUG_WRAPPER:-}
Expand Down Expand Up @@ -4037,6 +4038,7 @@ spec:
- --log_file=/var/log/kube-ovn/kube-ovn-cni.log
- --log_file_max_size=0
- --kubelet-dir=$KUBELET_DIR
- --enable-tproxy=$ENABLE_TPROXY
securityContext:
runAsUser: 0
privileged: true
Expand Down
12 changes: 12 additions & 0 deletions dist/images/uninstall.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ iptables -t filter -D FORWARD -m set --match-set ovn40subnets src -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services dst -j ACCEPT
iptables -t filter -D FORWARD -m set --match-set ovn40services src -j ACCEPT
iptables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0
iptables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING
iptables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT
iptables -t mangle -F OVN-PREROUTING
iptables -t mangle -X OVN-PREROUTING
iptables -t mangle -F OVN-OUTPUT
iptables -t mangle -X OVN-OUTPUT
zhangzujian marked this conversation as resolved.
Show resolved Hide resolved

sleep 1

Expand Down Expand Up @@ -51,6 +57,12 @@ ip6tables -t filter -D FORWARD -m set --match-set ovn60subnets src -j ACCEPT
ip6tables -t filter -D FORWARD -m set --match-set ovn60services dst -j ACCEPT
ip6tables -t filter -D FORWARD -m set --match-set ovn60services src -j ACCEPT
ip6tables -t filter -D OUTPUT -p udp -m udp --dport 6081 -j MARK --set-xmark 0x0
ip6tables -t mangle -D PREROUTING -m comment --comment "kube-ovn prerouting rules" -j OVN-PREROUTING
ip6tables -t mangle -D OUTPUT -m comment --comment "kube-ovn output rules" -j OVN-OUTPUT
ip6tables -t mangle -F OVN-PREROUTING
ip6tables -t mangle -X OVN-PREROUTING
ip6tables -t mangle -F OVN-OUTPUT
ip6tables -t mangle -X OVN-OUTPUT

sleep 1

Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Configuration struct {
EnableVerboseConnCheck bool
TCPConnCheckPort int
UDPConnCheckPort int
EnableTProxy bool
}

// ParseFlags will parse cmd args then init kubeClient and configuration
Expand Down Expand Up @@ -100,6 +101,7 @@ func ParseFlags() *Configuration {
argEnableVerboseConnCheck = pflag.Bool("enable-verbose-conn-check", false, "enable TCP/UDP connectivity check listen port")
argTCPConnectivityCheckPort = pflag.Int("tcp-conn-check-port", 8100, "TCP connectivity Check Port")
argUDPConnectivityCheckPort = pflag.Int("udp-conn-check-port", 8101, "UDP connectivity Check Port")
argEnableTProxy = pflag.Bool("enable-tproxy", false, "enable tproxy for vpc pod liveness or readiness probe")
)

// mute info log for ipset lib
Expand Down Expand Up @@ -154,6 +156,7 @@ func ParseFlags() *Configuration {
EnableVerboseConnCheck: *argEnableVerboseConnCheck,
TCPConnCheckPort: *argTCPConnectivityCheckPort,
UDPConnCheckPort: *argUDPConnectivityCheckPort,
EnableTProxy: *argEnableTProxy,
}
return config
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}
}, 5*time.Minute, stopCh)

if c.config.EnableTProxy {
go c.StartTProxyForwarding()
go wait.Until(c.runTProxyConfigWorker, 3*time.Second, stopCh)
// Using the tproxy method, kubelet's TCP probe packets cannot reach the namespace of the pod of the custom VPC,
// so tproxy itself probes the pod of the custom VPC, if probe failed remove the iptable rules from
// kubelet to tproxy, if probe success recover the iptable rules
go wait.Until(c.StartTProxyTCPPortProbe, 1*time.Second, stopCh)
} else {
c.cleanTProxyConfig()
}

<-stopCh
klog.Info("Shutting down workers")
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/daemon/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package daemon
import (
"fmt"
"os/exec"
"sort"
"strings"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -246,3 +247,44 @@ func (c *Controller) getEgressNatIpByNode(nodeName string) (map[string]string, e
}
return subnetsNatIp, nil
}

func (c *Controller) getTProxyConditionPod(needSort bool) ([]*v1.Pod, error) {

var filteredPods []*v1.Pod
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed, %v", err)
return nil, err
}

for _, pod := range pods {
if pod.Spec.NodeName != c.config.NodeName {
continue
}

subnetName, ok := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, util.OvnProvider)]
if !ok {
continue
}

subnet, err := c.subnetsLister.Get(subnetName)
if err != nil {
err = fmt.Errorf("failed to get subnet '%s', err: %v", subnetName, err)
return nil, err
}

if subnet.Spec.Vpc == c.config.ClusterRouter {
continue
}

filteredPods = append(filteredPods, pod)
}

if needSort {
sort.Slice(filteredPods, func(i, j int) bool {
return filteredPods[i].Namespace+"/"+filteredPods[i].Name < filteredPods[j].Namespace+"/"+filteredPods[j].Name
})
}

return filteredPods, nil
}
132 changes: 132 additions & 0 deletions pkg/daemon/gateway_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ const (

const (
NAT = "nat"
MANGLE = "mangle"
Prerouting = "PREROUTING"
Postrouting = "POSTROUTING"
Output = "OUTPUT"
OvnPrerouting = "OVN-PREROUTING"
OvnPostrouting = "OVN-POSTROUTING"
OvnOutput = "OVN-OUTPUT"
OvnMasquerade = "OVN-MASQUERADE"
OvnNatOutGoingPolicy = "OVN-NAT-POLICY"
OvnNatOutGoingPolicySubnet = "OVN-NAT-PSUBNET-"
Expand All @@ -52,6 +55,10 @@ const (
const (
OnOutGoingNatMark = "0x90001/0x90001"
OnOutGoingForwardMark = "0x90002/0x90002"
TProxyOutputMark = 0x90003
TProxyOutputMask = 0x90003
TProxyPreroutingMark = 0x90004
TProxyPreroutingMask = 0x90004
)

type policyRouteMeta struct {
Expand Down Expand Up @@ -584,9 +591,11 @@ func (c *Controller) setIptables() error {
}
)
protocols := make([]string, 2)
isDual := false
if c.protocol == kubeovnv1.ProtocolDual {
protocols[0] = kubeovnv1.ProtocolIPv4
protocols[1] = kubeovnv1.ProtocolIPv6
isDual = true
} else {
protocols[0] = c.protocol
}
Expand Down Expand Up @@ -733,6 +742,10 @@ func (c *Controller) setIptables() error {
return err
}

if err = c.reconcileTProxyIPTableRules(protocol, isDual); err != nil {
return err
}

if err = c.updateIptablesChain(ipt, NAT, OvnPrerouting, Prerouting, natPreroutingRules); err != nil {
klog.Errorf("failed to update chain %s/%s: %v", NAT, OvnPrerouting)
return err
Expand All @@ -754,6 +767,125 @@ func (c *Controller) setIptables() error {
return nil
}

func (c *Controller) reconcileTProxyIPTableRules(protocol string, isDual bool) error {
if !c.config.EnableTProxy {
return nil
}

ipt := c.iptables[protocol]
tproxyPreRoutingRules := make([]util.IPTableRule, 0)
tproxyOutputRules := make([]util.IPTableRule, 0)
probePorts := strset.New()

pods, err := c.getTProxyConditionPod(true)
if err != nil {
return err
}

for _, pod := range pods {
var podIP string
for _, ip := range pod.Status.PodIPs {
if util.CheckProtocol(ip.IP) == protocol {
podIP = ip.IP
break
}
}

if podIP == "" {
continue
}

for _, container := range pod.Spec.Containers {
if container.ReadinessProbe != nil {
if httpGet := container.ReadinessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.ReadinessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}

if container.LivenessProbe != nil {
if httpGet := container.LivenessProbe.HTTPGet; httpGet != nil {
if port := httpGet.Port.String(); port != "" {
probePorts.Add(port)
}
}

if tcpSocket := container.LivenessProbe.TCPSocket; tcpSocket != nil {
if port := tcpSocket.Port.String(); port != "" {
if isTCPProbePortReachable, ok := customVPCPodTCPProbeIPPort.Load(getIPPortString(podIP, port)); ok {
if isTCPProbePortReachable.(bool) {
probePorts.Add(port)
}
}
}
}
}
}

if probePorts.IsEmpty() {
continue
}

probePortList := probePorts.List()
sort.Strings(probePortList)
for _, probePort := range probePortList {
tProxyOutputMarkMask := fmt.Sprintf("%#x/%#x", TProxyOutputMark, TProxyOutputMask)
tProxyPreRoutingMarkMask := fmt.Sprintf("%#x/%#x", TProxyPreroutingMark, TProxyPreroutingMask)

hostIP := pod.Status.HostIP
prefixLen := 32
if protocol == kubeovnv1.ProtocolIPv6 {
prefixLen = 128
}

if isDual || os.Getenv("ENABLE_BIND_LOCAL_IP") == "false" {
if protocol == kubeovnv1.ProtocolIPv4 {
hostIP = "0.0.0.0"
} else if protocol == kubeovnv1.ProtocolIPv6 {
hostIP = "::"
}
}
tproxyOutputRules = append(tproxyOutputRules, util.IPTableRule{Table: MANGLE, Chain: OvnOutput, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j MARK --set-xmark %s`, podIP, prefixLen, probePort, tProxyOutputMarkMask))})
tproxyPreRoutingRules = append(tproxyPreRoutingRules, util.IPTableRule{Table: MANGLE, Chain: OvnPrerouting, Rule: strings.Fields(fmt.Sprintf(`-d %s/%d -p tcp -m tcp --dport %s -j TPROXY --on-port %d --on-ip %s --tproxy-mark %s`, podIP, prefixLen, probePort, util.TProxyListenPort, hostIP, tProxyPreRoutingMarkMask))})
}
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnPrerouting, Prerouting, tproxyPreRoutingRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnPrerouting, tproxyPreRoutingRules, err)
return err
}

if err := c.updateIptablesChain(ipt, MANGLE, OvnOutput, Output, tproxyOutputRules); err != nil {
klog.Errorf("failed to update chain %s with rules %v: %v", OvnOutput, tproxyOutputRules, err)
return err
}
return nil
}

func (c *Controller) cleanTProxyIPTableRules(protocol string) {
ipt := c.iptables[protocol]
if ipt == nil {
return
}
for _, chain := range [2]string{OvnPrerouting, OvnOutput} {
if err := ipt.ClearChain(MANGLE, chain); err != nil {
klog.Errorf("failed to clear iptables chain %v in table %v, %+v", chain, MANGLE, err)
return
}
}
}

func (c *Controller) reconcileNatOutgoingPolicyIptablesChain(protocol string) error {
ipt := c.iptables[protocol]

Expand Down
Loading
Loading