Skip to content

Commit

Permalink
Remove the dormant container during intercept with --replace.
Browse files Browse the repository at this point in the history
There's no need to have a dormant container because the traffic-agent
can serve as the replacement itself. That way, there's no need to
redirect ports either, which means that there's no need to rename ports
or use an init-container.

An annotation is added to the pod to cater for when multiple injections
happen due to replace-policy, so that injections that follow the one
when an app-container is removed can reproduce the traffic-agent
container, taking its ports, mounts, and environment into account.

Closes #3758

Signed-off-by: Thomas Hallgren <thomas@tada.se>
  • Loading branch information
thallgren committed Jan 2, 2025
1 parent c56d774 commit dd95d15
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 133 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ items:
values: <namespaces>`.
```
docs: install/manager#static-versus-dynamic-namespace-selection
- type: feature
title: Removal of the dormant container during intercept with --replace.
body: |-
During a `telepresence intercept --replace operation`, the previously injected dormant container has been
removed. The Traffic Agent now directly serves as the replacement container, eliminating the need to forward
traffic to the original application container. This simplification offers several advantages when using the
`--replace` flag:
- **Removal of the init-container:** The need for a separate init-container is no longer necessary.
- **Elimination of port renames:** Port renames within the intercepted pod are no longer required.
- type: change
title: Drop deprecated current-cluster-id command.
body: >-
Expand Down
42 changes: 27 additions & 15 deletions cmd/traffic/cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,31 +163,43 @@ func sidecar(ctx context.Context, s State, info *rpc.AgentInfo) error {
// Group the container's intercepts by agent port
icStates := make(map[agentconfig.PortAndProto][]*agentconfig.Intercept, len(cn.Intercepts))
for _, ic := range cn.Intercepts {
k := agentconfig.PortAndProto{Port: ic.AgentPort, Proto: ic.Protocol}
ap := ic.AgentPort
if cn.Replace {
// Listen to replaced container's original port.
ap = ic.ContainerPort
}
k := agentconfig.PortAndProto{Port: ap, Proto: ic.Protocol}
icStates[k] = append(icStates[k], ic)
}

for pp, ics := range icStates {
ic := ics[0] // They all have the same protocol container port, so the first one will do.
var cp uint16
if ic.TargetPortNumeric {
// We must differentiate between connections originating from the agent's forwarder to the container
// port and those from other sources. The former should not be routed back, while the latter should
// always be routed to the agent. We do this by using a proxy port that will be recognized by the
// iptables filtering in our init-container.
cp = ac.ProxyPort(ic)
} else {
cp = ic.ContainerPort
}
lisAddr, err := pp.Addr()
if err != nil {
return err
}
// Redirect non-intercepted traffic to the pod, so that injected sidecars that hijack the ports for
// incoming connections will continue to work.
targetHost := s.PodIP()

fwd := forwarder.NewInterceptor(lisAddr, targetHost, cp)
var fwd forwarder.Interceptor
var cp uint16
if !cn.Replace {
if ic.TargetPortNumeric {
// We must differentiate between connections originating from the agent's forwarder to the container
// port and those from other sources. The former should not be routed back, while the latter should
// always be routed to the agent. We do this by using a proxy port that will be recognized by the
// iptables filtering in our init-container.
cp = ac.ProxyPort(ic)
} else {
cp = ic.ContainerPort
}
// Redirect non-intercepted traffic to the pod, so that injected sidecars that hijack the ports for
// incoming connections will continue to work.
targetHost := s.PodIP()
fwd = forwarder.NewInterceptor(lisAddr, targetHost, cp)
} else {
fwd = forwarder.NewInterceptor(lisAddr, "", 0)
cp = ic.ContainerPort
}

dgroup.ParentGroup(ctx).Go(fmt.Sprintf("forward-%s", iputil.JoinHostPort(cn.Name, cp)), func(ctx context.Context) error {
return fwd.Serve(tunnel.WithPool(ctx, tunnel.NewPool()), nil)
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/traffic/cmd/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *state) ReportMetrics(ctx context.Context, metrics *rpc.TunnelMetrics) {
mCtx, mCancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second)
defer mCancel()
_, err := s.manager.ReportMetrics(mCtx, metrics)
if err != nil {
if err != nil && status.Code(err) != codes.Canceled {
dlog.Errorf(ctx, "ReportMetrics failed: %v", err)
}
}()
Expand Down
130 changes: 63 additions & 67 deletions cmd/traffic/cmd/manager/mutator/agent_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"sync/atomic"

"github.com/go-json-experiment/json"
"github.com/go-json-experiment/json/jsontext"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
admission "k8s.io/api/admission/v1"
Expand Down Expand Up @@ -190,15 +192,17 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ
}

var patches PatchOps
var annotations map[string]string
config := scx.AgentConfig()
patches = disableAppContainer(ctx, pod, config, patches)
patches = addInitContainer(pod, config, patches)
patches = addAgentContainer(ctx, pod, config, patches)
patches, annotations = addAgentContainer(ctx, pod, config, patches)
patches = addPullSecrets(pod, config, patches)
patches = addAgentVolumes(pod, config, patches)
patches = hidePorts(pod, config, patches)
patches = addPodAnnotations(ctx, pod, patches)
annotations[agentconfig.InjectAnnotation] = "enabled"
patches = addPodAnnotations(pod, annotations, patches)
patches = addPodLabels(ctx, pod, config, patches)
patches = maybeRemoveAppContainer(pod, config, patches)

if config.APIPort != 0 {
tpEnv := make(map[string]string)
Expand All @@ -209,6 +213,16 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ
// Create patch operations to add the traffic-agent sidecar
if len(patches) > 0 {
dlog.Infof(ctx, "Injecting %d patches into pod %s.%s", len(patches), pod.Name, pod.Namespace)
if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug {
cns := strings.Builder{}
for i, cn := range pod.Spec.Containers {
cns.WriteString(fmt.Sprintf("%d %s\n", i, cn.Name))
}
dlog.Debugf(ctx, "Containers \n%s", cns.String())
if pj, err := json.Marshal(patches, jsontext.WithIndent(" ")); err == nil {
dlog.Debugf(ctx, "\n%s", string(pj))
}
}
} else {
dlog.Infof(ctx, "Pod %s.%s was left untouched", pod.Name, pod.Namespace)
}
Expand All @@ -224,61 +238,27 @@ func (a *agentInjector) Uninstall(ctx context.Context) {

func needInitContainer(config *agentconfig.Sidecar) bool {
for _, cc := range config.Containers {
for _, ic := range cc.Intercepts {
if ic.Headless || ic.TargetPortNumeric {
return true
if !cc.Replace {
for _, ic := range cc.Intercepts {
if ic.Headless || ic.TargetPortNumeric {
return true
}
}
}
}
return false
}

const sleeperImage = "alpine:latest"

var sleeperArgs = []string{"sleep", "infinity"} //nolint:gochecknoglobals // constant

func disableAppContainer(ctx context.Context, pod *core.Pod, config *agentconfig.Sidecar, patches PatchOps) PatchOps {
podContainers:
for i, pc := range pod.Spec.Containers {
func maybeRemoveAppContainer(pod *core.Pod, config *agentconfig.Sidecar, patches PatchOps) PatchOps {
// Extremely important to remove in reverse order, or one may affect the index of the next removal.
cns := pod.Spec.Containers
for i := len(cns) - 1; i >= 0; i-- {
for _, cc := range config.Containers {
if cc.Name == pc.Name && cc.Replace {
if pc.Image == sleeperImage && slices.Equal(pc.Args, sleeperArgs) {
continue podContainers
}
patches = append(patches, PatchOperation{
Op: "replace",
Path: fmt.Sprintf("/spec/containers/%d/image", i),
Value: sleeperImage,
})
argsOp := "add"
if len(pc.Args) > 0 {
argsOp = "replace"
}
if cc.Name == cns[i].Name && cc.Replace {
patches = append(patches, PatchOperation{
Op: argsOp,
Path: fmt.Sprintf("/spec/containers/%d/args", i),
Value: sleeperArgs,
Op: "remove",
Path: fmt.Sprintf("/spec/containers/%d", i),
})
if pc.StartupProbe != nil {
patches = append(patches, PatchOperation{
Op: "remove",
Path: fmt.Sprintf("/spec/containers/%d/startupProbe", i),
})
}
if pc.LivenessProbe != nil {
patches = append(patches, PatchOperation{
Op: "remove",
Path: fmt.Sprintf("/spec/containers/%d/livenessProbe", i),
})
}
if pc.ReadinessProbe != nil {
patches = append(patches, PatchOperation{
Op: "remove",
Path: fmt.Sprintf("/spec/containers/%d/readinessProbe", i),
})
}
dlog.Debugf(ctx, "Disabled container %s", pc.Name)
continue podContainers
}
}
}
Expand Down Expand Up @@ -417,8 +397,18 @@ func compareVolumeMounts(a, b []core.VolumeMount) bool {
return eq
}

func containerEqual(a, b *core.Container) bool {
func containerEqual(ctx context.Context, a, b *core.Container) bool {
// skips contain defaults assigned by Kubernetes that are not zero values
if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug {
diff := cmp.Diff(a, b,
cmp.Comparer(compareProbes),
cmp.Comparer(compareVolumeMounts),
cmpopts.IgnoreFields(core.Container{}, "ImagePullPolicy", "Resources", "TerminationMessagePath", "TerminationMessagePolicy"))
if diff != "" {
dlog.Debug(ctx, diff)
}
return diff == ""
}
return cmp.Equal(a, b,
cmp.Comparer(compareProbes),
cmp.Comparer(compareVolumeMounts),
Expand All @@ -431,34 +421,34 @@ func addAgentContainer(
pod *core.Pod,
config *agentconfig.Sidecar,
patches PatchOps,
) PatchOps {
acn := agentconfig.AgentContainer(ctx, pod, config)
) (PatchOps, map[string]string) {
acn, replaceAnnotations := agentconfig.AgentContainer(ctx, pod, config)
if acn == nil {
return patches
return patches, replaceAnnotations
}

refPodName := pod.Name + "." + pod.Namespace
for i := range pod.Spec.Containers {
pcn := &pod.Spec.Containers[i]
if pcn.Name == agentconfig.ContainerName {
if containerEqual(pcn, acn) {
if containerEqual(ctx, pcn, acn) {
dlog.Infof(ctx, "Pod %s already has container %s and it isn't modified", refPodName, agentconfig.ContainerName)
return patches
return patches, replaceAnnotations
}
dlog.Debugf(ctx, "Pod %s already has container %s but it is modified", refPodName, agentconfig.ContainerName)
return append(patches, PatchOperation{
Op: "replace",
Path: "/spec/containers/" + strconv.Itoa(i),
Value: acn,
})
}), replaceAnnotations
}
}

return append(patches, PatchOperation{
Op: "add",
Path: "/spec/containers/-",
Value: acn,
})
}), replaceAnnotations
}

// addAgentContainer creates a patch operation to add the traffic-agent container.
Expand Down Expand Up @@ -499,7 +489,9 @@ func addPullSecrets(
// addTPEnv adds telepresence specific environment variables to all interceptable app containers.
func addTPEnv(pod *core.Pod, config *agentconfig.Sidecar, env map[string]string, patches PatchOps) PatchOps {
agentconfig.EachContainer(pod, config, func(app *core.Container, cc *agentconfig.Container) {
patches = addContainerTPEnv(pod, app, env, patches)
if !cc.Replace {
patches = addContainerTPEnv(pod, app, env, patches)
}
})
return patches
}
Expand Down Expand Up @@ -556,12 +548,14 @@ func addContainerTPEnv(pod *core.Pod, cn *core.Container, env map[string]string,
// the same replacement on all references to that port from the probes of the container.
func hidePorts(pod *core.Pod, config *agentconfig.Sidecar, patches PatchOps) PatchOps {
agentconfig.EachContainer(pod, config, func(app *core.Container, cc *agentconfig.Container) {
for _, ic := range agentconfig.PortUniqueIntercepts(cc) {
if ic.Headless || ic.TargetPortNumeric {
// Rely on iptables mapping instead of port renames
continue
if !cc.Replace {
for _, ic := range agentconfig.PortUniqueIntercepts(cc) {
if ic.Headless || ic.TargetPortNumeric {
// Rely on iptables mapping instead of port renames
continue
}
patches = hideContainerPorts(pod, app, bool(cc.Replace), ic.ContainerPortName, patches)
}
patches = hideContainerPorts(pod, app, bool(cc.Replace), ic.ContainerPortName, patches)
}
})
return patches
Expand Down Expand Up @@ -616,7 +610,7 @@ func hideContainerPorts(pod *core.Pod, app *core.Container, isReplace bool, port
return patches
}

func addPodAnnotations(_ context.Context, pod *core.Pod, patches PatchOps) PatchOps {
func addPodAnnotations(pod *core.Pod, anns map[string]string, patches PatchOps) PatchOps {
op := "replace"
changed := false
am := pod.Annotations
Expand All @@ -627,9 +621,11 @@ func addPodAnnotations(_ context.Context, pod *core.Pod, patches PatchOps) Patch
am = maps.Copy(am)
}

if _, ok := pod.Annotations[agentconfig.InjectAnnotation]; !ok {
changed = true
am[agentconfig.InjectAnnotation] = "enabled"
for k, v := range anns {
if _, ok := pod.Annotations[k]; !ok {
changed = true
am[k] = v
}
}

if changed {
Expand Down
23 changes: 8 additions & 15 deletions cmd/traffic/cmd/manager/mutator/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/telepresenceio/telepresence/v2/pkg/agentmap"
"github.com/telepresenceio/telepresence/v2/pkg/informer"
"github.com/telepresenceio/telepresence/v2/pkg/k8sapi"
"github.com/telepresenceio/telepresence/v2/pkg/maps"
"github.com/telepresenceio/telepresence/v2/pkg/workload"
)

Expand Down Expand Up @@ -198,8 +199,8 @@ func isRolloutNeededForPod(ctx context.Context, ac *agentconfig.Sidecar, name, n
return fmt.Sprintf("Rollout of %s.%s is necessary. An agent is desired but the pod %s doesn't have one",
name, namespace, pod.GetName())
}
desiredAc := agentconfig.AgentContainer(ctx, pod, ac)
if !containerEqual(podAc, desiredAc) {
desiredAc, anns := agentconfig.AgentContainer(ctx, pod, ac)
if !(containerEqual(ctx, podAc, desiredAc) && maps.Equal(anns, pod.ObjectMeta.Annotations)) {
return fmt.Sprintf("Rollout of %s.%s is necessary. The desired agent is not equal to the existing agent in pod %s",
name, namespace, pod.GetName())
}
Expand All @@ -224,22 +225,14 @@ func isRolloutNeededForPod(ctx context.Context, ac *agentconfig.Sidecar, name, n
break
}
}
if found == nil {
return fmt.Sprintf("Rollout of %s.%s is necessary. The desired pod should contain container %s",
name, namespace, cn.Name)
}
if cn.Replace {
// Ensure that the replaced container is disabled
if !(found.Image == sleeperImage && slices.Equal(found.Args, sleeperArgs)) {
return fmt.Sprintf("Rollout of %s.%s is necessary. The desired pod's container %s should be disabled",
name, namespace, cn.Name)
}
} else {
// Ensure that the replaced container is not disabled
if found.Image == sleeperImage && slices.Equal(found.Args, sleeperArgs) {
return fmt.Sprintf("Rollout of %s.%s is necessary. The desired pod's container %s should not be disabled",
if found != nil {
return fmt.Sprintf("Rollout of %s.%s is necessary. The %s container must be replaced",
name, namespace, cn.Name)
}
} else if found == nil {
return fmt.Sprintf("Rollout of %s.%s is necessary. The %s container should not be replaced",
name, namespace, cn.Name)
}
}
return ""
Expand Down
12 changes: 12 additions & 0 deletions docs/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ namespaceSelector:
```
</div>
## <div style="display:flex;"><img src="images/feature.png" alt="feature" style="width:30px;height:fit-content;"/><div style="display:flex;margin-left:7px;">Removal of the dormant container during intercept with --replace.</div></div>
<div style="margin-left: 15px">
During a `telepresence intercept --replace operation`, the previously injected dormant container has been
removed. The Traffic Agent now directly serves as the replacement container, eliminating the need to forward
traffic to the original application container. This simplification offers several advantages when using the
`--replace` flag:

- **Removal of the init-container:** The need for a separate init-container is no longer necessary.
- **Elimination of port renames:** Port renames within the intercepted pod are no longer required.
</div>

## <div style="display:flex;"><img src="images/change.png" alt="change" style="width:30px;height:fit-content;"/><div style="display:flex;margin-left:7px;">Drop deprecated current-cluster-id command.</div></div>
<div style="margin-left: 15px">

Expand Down
Loading

0 comments on commit dd95d15

Please sign in to comment.