Skip to content

Commit

Permalink
hotfix: add traffic-manager pod ip to route table (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
wencaiwulue authored Dec 13, 2024
1 parent b9c1f2a commit 062c69d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 48 deletions.
88 changes: 48 additions & 40 deletions pkg/handler/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ConnectOptions struct {
apiServerIPs []net.IP
extraHost []dns.Entry
once sync.Once
tunName string
}

func (c *ConnectOptions) Context() context.Context {
Expand Down Expand Up @@ -214,11 +215,12 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error)
return err
}
log.Info("Forwarding port...")
if err = c.portForward(c.ctx, []string{
portPair := []string{
fmt.Sprintf("%d:10800", rawTCPForwardPort),
fmt.Sprintf("%d:10801", gvisorTCPForwardPort),
fmt.Sprintf("%d:10802", gvisorUDPForwardPort),
}); err != nil {
}
if err = c.portForward(c.ctx, portPair); err != nil {
return
}
if util.IsWindows() {
Expand Down Expand Up @@ -272,10 +274,13 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
}
return
}
pod := podList[0]
// add route in case of don't have permission to watch pod, but pod recreated ip changed, so maybe this ip can not visit
_ = c.addRoute(pod.Status.PodIP)
childCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
var readyChan = make(chan struct{})
podName := podList[0].GetName()
podName := pod.GetName()
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0])
Expand Down Expand Up @@ -427,58 +432,29 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress

// Listen all pod, add route if needed
func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
tunName, e := c.GetTunDeviceName()
if e != nil {
return e
var err error
c.tunName, err = c.GetTunDeviceName()
if err != nil {
return err
}

podNs, svcNs, err1 := util.GetNsForListPodAndSvc(ctx, c.clientset, []string{v1.NamespaceAll, c.Namespace})
if err1 != nil {
return err1
}

var addRouteFunc = func(resource, ipStr string) {
ip := net.ParseIP(ipStr)
if ip == nil {
return
}
for _, p := range c.apiServerIPs {
// if pod ip or service ip is equal to apiServer ip, can not add it to route table
if p.Equal(ip) {
return
}
}

var mask net.IPMask
if ip.To4() != nil {
mask = net.CIDRMask(32, 32)
} else {
mask = net.CIDRMask(128, 128)
}
if r, err := netroute.New(); err == nil {
iface, _, _, err := r.Route(ip)
if err == nil && iface.Name == tunName {
return
}
}
errs := tun.AddRoutes(tunName, types.Route{Dst: net.IPNet{IP: ip, Mask: mask}})
if errs != nil {
log.Errorf("Failed to add route, resource: %s, IP: %s, err: %v", resource, ip, errs)
}
}

go func() {
var listDone bool
for ctx.Err() == nil {
err := func() error {
if !listDone {
err := util.ListService(ctx, c.clientset.CoreV1().Services(svcNs), addRouteFunc)
err := util.ListService(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute)
if err != nil {
return err
}
listDone = true
}
err := util.WatchServiceToAddRoute(ctx, c.clientset.CoreV1().Services(svcNs), addRouteFunc)
err := util.WatchServiceToAddRoute(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute)
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
Expand All @@ -494,13 +470,13 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
for ctx.Err() == nil {
err := func() error {
if !listDone {
err := util.ListPod(ctx, c.clientset.CoreV1().Pods(podNs), addRouteFunc)
err := util.ListPod(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute)
if err != nil {
return err
}
listDone = true
}
err := util.WatchPodToAddRoute(ctx, c.clientset.CoreV1().Pods(podNs), addRouteFunc)
err := util.WatchPodToAddRoute(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute)
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
Expand All @@ -514,6 +490,38 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
return nil
}

func (c *ConnectOptions) addRoute(ipStr string) error {
if c.tunName == "" {
return nil
}

ip := net.ParseIP(ipStr)
if ip == nil {
return nil
}
for _, p := range c.apiServerIPs {
// if pod ip or service ip is equal to apiServer ip, can not add it to route table
if p.Equal(ip) {
return nil
}
}

var mask net.IPMask
if ip.To4() != nil {
mask = net.CIDRMask(32, 32)
} else {
mask = net.CIDRMask(128, 128)
}
if r, err := netroute.New(); err == nil {
ifi, _, _, err := r.Route(ip)
if err == nil && ifi.Name == c.tunName {
return nil
}
}
err := tun.AddRoutes(c.tunName, types.Route{Dst: net.IPNet{IP: ip, Mask: mask}})
return err
}

func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
if !util.IsWindows() {
return
Expand Down
22 changes: 14 additions & 8 deletions pkg/util/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset,
return
}

func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(resource string, ipStr string)) error {
func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(ipStr string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
serviceList, err := lister.List(ctx, opts)
if err != nil {
return err
}
for _, service := range serviceList.Items {
addRouteFunc(service.Name, service.Spec.ClusterIP)
err = addRouteFunc(service.Spec.ClusterIP)
if err != nil {
log.Errorf("Failed to add route, resource: %s, IP: %s, err: %v", service.Name, service.Spec.ClusterIP, err)
}
}
if serviceList.Continue == "" {
return nil
Expand All @@ -67,7 +70,7 @@ func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc
}
}

func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(resource string, ipStr string)) error {
func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(ipStr string) error) error {
defer func() {
if er := recover(); er != nil {
log.Error(er)
Expand All @@ -91,12 +94,12 @@ func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, r
if !ok {
continue
}
routeFunc(svc.Name, svc.Spec.ClusterIP)
_ = routeFunc(svc.Spec.ClusterIP)
}
}
}

func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(resource string, ipStr string)) error {
func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(ipStr string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
podList, err := lister.List(ctx, opts)
Expand All @@ -107,7 +110,10 @@ func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(res
if pod.Spec.HostNetwork {
continue
}
addRouteFunc(pod.Name, pod.Status.PodIP)
err = addRouteFunc(pod.Status.PodIP)
if err != nil {
log.Errorf("Failed to add route, resource: %s, IP: %s, err: %v", pod.Name, pod.Status.PodIP, err)
}
}
if podList.Continue == "" {
return nil
Expand All @@ -116,7 +122,7 @@ func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(res
}
}

func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(resource string, ipStr string)) error {
func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(ipStr string) error) error {
defer func() {
if er := recover(); er != nil {
log.Errorln(er)
Expand Down Expand Up @@ -144,7 +150,7 @@ func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteF
continue
}
ip := pod.Status.PodIP
addRouteFunc(pod.Name, ip)
_ = addRouteFunc(ip)
}
}
}

0 comments on commit 062c69d

Please sign in to comment.