Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the dormant container during intercept with --replace. #3760

Closed
wants to merge 9 commits into from
15 changes: 15 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ items:
- version: 2.22.0
date: (TBD)
notes:
- type: feature
title: One single invocation of the Telepresence intercept command can now intercept multiple ports.
body: >-
It is now possible to intercept multiple ports with one single invocation of `telepresence intercept` by just
repeating the `--port` flag.
- type: feature
title: Unify how Traffic Manager selects namespaces
body: |-
Expand All @@ -44,6 +49,16 @@ items:
values: <namespaces>`.
```
docs: install/manager#static-versus-dynamic-namespace-selection
- type: feature
title: Removal of the dormant container during intercept with --replace.
body: |-
During a `telepresence intercept --replace operation`, the previously injected dormant container has been
removed. The Traffic Agent now directly serves as the replacement container, eliminating the need to forward
traffic to the original application container. This simplification offers several advantages when using the
`--replace` flag:

- **Removal of the init-container:** The need for a separate init-container is no longer necessary.
- **Elimination of port renames:** Port renames within the intercepted pod are no longer required.
- type: change
title: Drop deprecated current-cluster-id command.
body: >-
Expand Down
41 changes: 24 additions & 17 deletions cmd/traffic/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func sftpServer(ctx context.Context, sftpPortCh chan<- uint16) error {
_ = l.Close()
}()

_, sftpPort, err := iputil.SplitToIPPort(l.Addr())
ap, err := iputil.SplitToIPPort(l.Addr())
if err != nil {
return err
}
sftpPortCh <- sftpPort
sftpPortCh <- ap.Port()

dlog.Infof(ctx, "Listening at: %s", l.Addr())
for {
Expand Down Expand Up @@ -163,31 +163,38 @@ func sidecar(ctx context.Context, s State, info *rpc.AgentInfo) error {
// Group the container's intercepts by agent port
icStates := make(map[agentconfig.PortAndProto][]*agentconfig.Intercept, len(cn.Intercepts))
for _, ic := range cn.Intercepts {
k := agentconfig.PortAndProto{Port: ic.AgentPort, Proto: ic.Protocol}
ap := ic.AgentPort
if cn.Replace {
// Listen to replaced container's original port.
ap = ic.ContainerPort
}
k := agentconfig.PortAndProto{Port: ap, Proto: ic.Protocol}
icStates[k] = append(icStates[k], ic)
}

for pp, ics := range icStates {
ic := ics[0] // They all have the same protocol container port, so the first one will do.
var fwd forwarder.Interceptor
var cp uint16
if ic.TargetPortNumeric {
// We must differentiate between connections originating from the agent's forwarder to the container
// port and those from other sources. The former should not be routed back, while the latter should
// always be routed to the agent. We do this by using a proxy port that will be recognized by the
// iptables filtering in our init-container.
cp = ac.ProxyPort(ic)
if !cn.Replace {
if ic.TargetPortNumeric {
// We must differentiate between connections originating from the agent's forwarder to the container
// port and those from other sources. The former should not be routed back, while the latter should
// always be routed to the agent. We do this by using a proxy port that will be recognized by the
// iptables filtering in our init-container.
cp = ac.ProxyPort(ic)
} else {
cp = ic.ContainerPort
}
// Redirect non-intercepted traffic to the pod, so that injected sidecars that hijack the ports for
// incoming connections will continue to work.
targetHost := s.PodIP()
fwd = forwarder.NewInterceptor(pp, targetHost, cp)
} else {
fwd = forwarder.NewInterceptor(pp, "", 0)
cp = ic.ContainerPort
}
lisAddr, err := pp.Addr()
if err != nil {
return err
}
// Redirect non-intercepted traffic to the pod, so that injected sidecars that hijack the ports for
// incoming connections will continue to work.
targetHost := s.PodIP()

fwd := forwarder.NewInterceptor(lisAddr, targetHost, cp)
dgroup.ParentGroup(ctx).Go(fmt.Sprintf("forward-%s", iputil.JoinHostPort(cn.Name, cp)), func(ctx context.Context) error {
return fwd.Serve(tunnel.WithPool(ctx, tunnel.NewPool()), nil)
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/traffic/cmd/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *state) ReportMetrics(ctx context.Context, metrics *rpc.TunnelMetrics) {
mCtx, mCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second)
defer mCancel()
_, err := s.manager.ReportMetrics(mCtx, metrics)
if err != nil {
if err != nil && status.Code(err) != codes.Canceled {
dlog.Errorf(ctx, "ReportMetrics failed: %v", err)
}
}()
Expand Down
7 changes: 2 additions & 5 deletions cmd/traffic/cmd/agent/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package agent_test

import (
"context"
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
core "k8s.io/api/core/v1"

"github.com/datawire/dlib/dlog"
rpc "github.com/telepresenceio/telepresence/rpc/v2/manager"
Expand All @@ -23,10 +23,7 @@ const (
)

func makeFS(t *testing.T, ctx context.Context) (forwarder.Interceptor, agent.State) {
lAddr, err := net.ResolveTCPAddr("tcp", ":1111")
assert.NoError(t, err)

f := forwarder.NewInterceptor(lAddr, appHost, appPort)
f := forwarder.NewInterceptor(agentconfig.PortAndProto{Proto: core.ProtocolTCP, Port: 1111}, appHost, appPort)
go func() {
if err := f.Serve(context.Background(), nil); err != nil {
dlog.Error(ctx, err)
Expand Down
22 changes: 13 additions & 9 deletions cmd/traffic/cmd/agentinit/agent_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"os"
"path/filepath"
"strconv"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/datawire/dlib/dlog"
"github.com/telepresenceio/telepresence/v2/pkg/agentconfig"
"github.com/telepresenceio/telepresence/v2/pkg/dos"
"github.com/telepresenceio/telepresence/v2/pkg/iputil"
"github.com/telepresenceio/telepresence/v2/pkg/version"
)

Expand All @@ -45,7 +45,7 @@ func loadConfig(ctx context.Context) (*config, error) {
return &c, nil
}

func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTables, loopback, localHostCIDR, podIP string) error {
func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTables, loopback string, localHostCIDR netip.Prefix, podIP netip.Addr) error {
// These iptables rules implement routing such that a packet directed to the appPort will hit the agentPort instead.
// If there's no mesh this is simply request -> agent -> app (or intercept)
// However, if there's a service mesh we want to make sure we don't bypass the mesh, so the traffic
Expand Down Expand Up @@ -114,8 +114,8 @@ func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTab
// loop it back into the agent.
dlog.Debugf(ctx, "output DNAT %s:%d -> %s:%d", podIP, ac.ProxyPort(ic), podIP, ic.ContainerPort)
err = iptables.AppendUnique(nat, outputChain,
"-p", lcProto, "-d", podIP, "--dport", strconv.Itoa(int(ac.ProxyPort(ic))),
"-j", "DNAT", "--to-destination", net.JoinHostPort(podIP, strconv.Itoa(int(ic.ContainerPort))))
"-p", lcProto, "-d", podIP.String(), "--dport", strconv.Itoa(int(ac.ProxyPort(ic))),
"-j", "DNAT", "--to-destination", netip.AddrPortFrom(podIP, ic.ContainerPort).String())
if err != nil {
return fmt.Errorf("failed to append rule to %s: %w", outputChain, err)
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (c *config) configureIptables(ctx context.Context, iptables *iptables.IPTab
err = iptables.Insert(nat, "OUTPUT", 1,
"-o", loopback,
"-p", lcProto,
"!", "-d", localHostCIDR,
"!", "-d", localHostCIDR.String(),
"-m", "owner", "--uid-owner", agentUID,
"-j", outputChain)
if err != nil {
Expand Down Expand Up @@ -213,11 +213,15 @@ func Main(ctx context.Context, args ...string) error {
return err
}
proto := iptables.ProtocolIPv4
localhostCIDR := "127.0.0.1/32"
podIP := os.Getenv("POD_IP")
if len(iputil.Parse(podIP)) == 16 {
localhostCIDR := netip.PrefixFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), 32)
podIP, err := netip.ParseAddr(os.Getenv("POD_IP"))
if err != nil {
dlog.Error(ctx, err)
return err
}
if podIP.Is6() {
proto = iptables.ProtocolIPv6
localhostCIDR = "::1/128"
localhostCIDR = netip.PrefixFrom(netip.IPv6Loopback(), 128)
}
it, err := iptables.NewWithProtocol(proto)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion cmd/traffic/cmd/manager/cluster/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,11 @@ func getInjectorSvcIP(ctx context.Context, env *managerutil.Env, client v1.CoreV
break
}
}
return iputil.Parse(sc.Spec.ClusterIP), p, nil
ip, err := netip.ParseAddr(sc.Spec.ClusterIP)
if err != nil {
return nil, 0, err
}
return ip.AsSlice(), p, nil
}

func (oi *info) watchPodSubnets(ctx context.Context) {
Expand Down
Loading
Loading