Skip to content

Commit

Permalink
Configure Kind in Gitlab to allow kubernetes injection tests to run (#…
Browse files Browse the repository at this point in the history
…3057)

* implement logic to allow KinD to work in gitlab

* try getting the container id directly

* wrap with bash?

* escape escape

* Use json in python to get all the info

* python doesn't handle bash piping too well

* correct decoding of the json

* more fixups

* $HOME is not resolved

* better the '/root'

* extract to function

* forgot a couple words

* print out config

* add debug info

* add more debugging

* fix typo

* log running pods

* test test-agent 1.16.0

* restore tag

* pull if not present

* restore pull policy

* debug locally

* deploy app

* fix

* fix local

* debug local

* no stop cluster

* use internal dns to access to dev test agent

* debug traces for gitlab patch

* test

* fix agent port

* test manual inject

* fix ports

* fix

* enable all tests

* destroy cluster after

* keep network

* debug network connection

* disable kind network policies

* restore

* diable tests

* no pull images

* load local image into cluster

* no helm

* revert helm charts

* no destroy cluster

* connect kind containers to bridget network

* revert change

* restore by default

* test only helm

* disable kube proxty

* disable kube proxty

* test

* kubeproxy

* pod subnet

* connect kind cluster

* pull offline

* helm offline

* cluster agent offline

* preload webapp

* pull policy never

* enable all tests

* run one by one

* activate more tets

* run one tests

* test admission controller only

* test uds

* uds pull policy never

* enable two tests

* cluster agent traces

* change interfaces sync

* fix command sync

* fix command sync

* enable all tests

* datadog kubernetes

* fix merge

* enable all

* offline mode

* helm chart offline mode file pattern

* datadog helm offline

* Remove offline-mode, rework setup

* remove some unintended changes

* some debug info. Fix sed

* use formatting instead of a loop to get network info

* strip() to remove whitespace

* remove debug logs

* merge and other fixes

* formatting

* text and variable name changes

---------

Co-authored-by: roberto montero <roberto.montero@datadoghq.com>
  • Loading branch information
randomanderson and robertomonteromiguel authored Sep 18, 2024
1 parent dd32a9b commit 53fe59b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 51 deletions.
80 changes: 43 additions & 37 deletions tests/k8s_lib_injection/test_k8s_manual_inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,52 @@
class _TestAdmisionController:
def test_inject_admission_controller(self, test_k8s_instance):
logger.info(
f"Launching test _test_inject_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f"Launching test _test_inject_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_datadog_cluster_agent()
test_k8s_instance.deploy_weblog_as_pod()
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster.agent_port)
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster)
assert len(traces_json) > 0, "No traces found"
logger.info(f"Test _test_inject_admission_controller finished")

def test_inject_uds_admission_controller(self, test_k8s_instance):
logger.info(
f"Launching test test_inject_uds_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f"Launching test test_inject_uds_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_datadog_cluster_agent(use_uds=True)
test_k8s_instance.deploy_weblog_as_pod()
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster.agent_port)
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster)
assert len(traces_json) > 0, "No traces found"
logger.info(f"Test test_inject_uds_admission_controller finished")

def test_inject_without_admission_controller(self, test_k8s_instance):
logger.info(
f"Launching test _test_inject_without_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f"Launching test _test_inject_without_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_weblog_as_pod(with_admission_controller=False)
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster.agent_port)
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster)
assert len(traces_json) > 0, "No traces found"
logger.info(f"Test _test_inject_without_admission_controller finished")

def test_inject_uds_without_admission_controller(self, test_k8s_instance):
logger.info(
f"Launching test test_inject_uds_without_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f"Launching test test_inject_uds_without_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_weblog_as_pod(with_admission_controller=False, use_uds=True)
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster.agent_port)
traces_json = self._get_dev_agent_traces(test_k8s_instance.k8s_kind_cluster)
assert len(traces_json) > 0, "No traces found"
logger.info(f"Test test_inject_uds_without_admission_controller finished")

def _get_dev_agent_traces(self, agent_port, retry=10):
def _get_dev_agent_traces(self, k8s_kind_cluster, retry=10):
for _ in range(retry):
logger.info(f"[Check traces] Checking traces:")
response = requests.get(f"http://localhost:{agent_port}/test/traces")
response = requests.get(
f"http://{k8s_kind_cluster.cluster_host_name}:{k8s_kind_cluster.get_agent_port()}/test/traces"
)
traces_json = response.json()
if len(traces_json) > 0:
logger.debug(f"Test traces response: {traces_json}")
Expand All @@ -73,7 +75,7 @@ class _TestAdmisionControllerAsm:

def test_inject_asm_admission_controller(self, test_k8s_instance):
logger.info(
f"Launching test test_inject_asm_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f"Launching test test_inject_asm_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)

asm_features = {
Expand All @@ -84,15 +86,16 @@ def test_inject_asm_admission_controller(self, test_k8s_instance):
test_k8s_instance.deploy_datadog_cluster_agent(features=asm_features)
test_k8s_instance.deploy_agent()

weblog_port = test_k8s_instance.k8s_kind_cluster.weblog_port
logger.info(f"Waiting for weblog available [localhost:{weblog_port}]")
wait_for_port(weblog_port, "localhost", 80.0)
logger.info(f"[localhost:{weblog_port}]: Weblog app is ready!")
warmup_weblog(f"http://localhost:{weblog_port}/")
logger.info(f"Making a request to weblog [localhost:{weblog_port}]")
request_uuid = make_get_request(f"http://localhost:{weblog_port}/")
weblog_port = test_k8s_instance.k8s_kind_cluster.get_weblog_port()
weblog_host = test_k8s_instance.k8s_kind_cluster.cluster_host_name
logger.info(f"Waiting for weblog available [{weblog_host}:{weblog_port}]")
wait_for_port(weblog_port, weblog_host, 80.0)
logger.info(f"[{weblog_host}:{weblog_port}]: Weblog app is ready!")
warmup_weblog(f"http://{weblog_host}:{weblog_port}/")
logger.info(f"Making a request to weblog [{weblog_host}:{weblog_port}]")
request_uuid = make_get_request(f"http://{weblog_host}:{weblog_port}/")

logger.info(f"Http request done with uuid: [{request_uuid}] for [localhost:{weblog_port}]")
logger.info(f"Http request done with uuid: [{request_uuid}] for [{weblog_host}:{weblog_port}]")
wait_backend_trace_id(request_uuid, 120.0, profile=False, validator=backend_trace_validator)


Expand All @@ -101,13 +104,15 @@ def test_inject_asm_admission_controller(self, test_k8s_instance):
class TestAdmisionControllerProfiling:
"""Test profiling activation with the admission controller."""

def _check_profiling_request_sent(self, agent_port, timeout=90):
def _check_profiling_request_sent(self, k8s_kind_cluster, timeout=90):
""" Use test agent profiling endpoint to check if the profiling data has been sent by the injectect library.
Checks the request made to the profiling endpoint (/profiling/v1/input).
The profiling post data can take between 12 and 90 seconds (12 if the library supports both env vars, 90 if it supports neither. """
mustend = time.time() + timeout
while time.time() < mustend:
response = requests.get(f"http://localhost:{agent_port}/test/session/requests")
response = requests.get(
f"http://{k8s_kind_cluster.cluster_host_name}:{k8s_kind_cluster.get_agent_port()}/test/session/requests"
)
for request in response.json():
if request["url"].endswith("/profiling/v1/input"):
return True
Expand All @@ -117,36 +122,36 @@ def _check_profiling_request_sent(self, agent_port, timeout=90):
def test_profiling_disabled_by_default(self, test_k8s_instance):
logger.info(f"Launching test test_profiling_disabled_by_default")
logger.info(
f": Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f": Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_datadog_cluster_agent()
# if profiling is enabled force some profiling data to be sent
test_k8s_instance.deploy_weblog_as_pod(
env={"DD_PROFILING_UPLOAD_PERIOD": "10", "DD_INTERNAL_PROFILING_LONG_LIVED_THRESHOLD": "1500"}
)
profiling_request_found = self._check_profiling_request_sent(test_k8s_instance.k8s_kind_cluster.agent_port)
profiling_request_found = self._check_profiling_request_sent(test_k8s_instance.k8s_kind_cluster)
assert not profiling_request_found, "Profiling should be disabled by default, but a profiling request was found"

@bug(context.library > "python@2.12.2", reason="APMON-1496")
def test_profiling_admission_controller(self, test_k8s_instance):
logger.info(f"Launching test test_profiling_admission_controller")
logger.info(
f": Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f": Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
test_k8s_instance.deploy_test_agent()
test_k8s_instance.deploy_datadog_cluster_agent(features={"datadog.profiling.enabled": "auto"})
test_k8s_instance.deploy_weblog_as_pod(
env={"DD_PROFILING_UPLOAD_PERIOD": "10", "DD_INTERNAL_PROFILING_LONG_LIVED_THRESHOLD": "1500"}
)
profiling_request_found = self._check_profiling_request_sent(test_k8s_instance.k8s_kind_cluster.agent_port)
profiling_request_found = self._check_profiling_request_sent(test_k8s_instance.k8s_kind_cluster)
assert profiling_request_found, "No profiling request found"

@bug(context.library > "python@2.12.2", reason="APMON-1496")
def test_profiling_override_cluster_env(self, test_k8s_instance):
logger.info(f"Launching test test_profiling_override_cluster_env")
logger.info(
f": Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f": Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)
cluster_agent_config = {
"clusterAgent.env[0].name": "DD_ADMISSION_CONTROLLER_AUTO_INSTRUMENTATION_PROFILING_ENABLED",
Expand All @@ -157,28 +162,29 @@ def test_profiling_override_cluster_env(self, test_k8s_instance):
test_k8s_instance.deploy_weblog_as_pod(
env={"DD_PROFILING_UPLOAD_PERIOD": "10", "DD_INTERNAL_PROFILING_LONG_LIVED_THRESHOLD": "1500"}
)
profiling_request_found = self._check_profiling_request_sent(test_k8s_instance.k8s_kind_cluster.agent_port)
profiling_request_found = self._check_profiling_request_sent(test_k8s_instance.k8s_kind_cluster)
assert profiling_request_found, "No profiling request found"

def _test_inject_profiling_admission_controller_real(self, test_k8s_instance):
logger.info(
f"Launching test test_inject_profiling_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.weblog_port}] Agent: [{test_k8s_instance.k8s_kind_cluster.agent_port}]"
f"Launching test test_inject_profiling_admission_controller: Weblog: [{test_k8s_instance.k8s_kind_cluster.get_weblog_port()}] Agent: [{test_k8s_instance.k8s_kind_cluster.get_agent_port()}]"
)

test_k8s_instance.deploy_datadog_cluster_agent(features={"datadog.profiling.enabled": "auto"})
test_k8s_instance.deploy_agent()
test_k8s_instance.deploy_weblog_as_pod(
env={"DD_PROFILING_UPLOAD_PERIOD": "10", "DD_INTERNAL_PROFILING_LONG_LIVED_THRESHOLD": "1500"}
)
weblog_port = test_k8s_instance.k8s_kind_cluster.weblog_port
logger.info(f"Waiting for weblog available [localhost:{weblog_port}]")
wait_for_port(weblog_port, "localhost", 80.0)
logger.info(f"[localhost:{weblog_port}]: Weblog app is ready!")
warmup_weblog(f"http://localhost:{weblog_port}/")
logger.info(f"Making a request to weblog [localhost:{weblog_port}]")
request_uuid = make_get_request(f"http://localhost:{weblog_port}/")

logger.info(f"Http request done with uuid: [{request_uuid}] for [localhost:{weblog_port}]")
weblog_port = test_k8s_instance.k8s_kind_cluster.get_weblog_port()
weblog_host = test_k8s_instance.k8s_kind_cluster.cluster_host_name
logger.info(f"Waiting for weblog available [{weblog_host}:{weblog_port}]")
wait_for_port(weblog_port, weblog_host, 80.0)
logger.info(f"[{weblog_host}:{weblog_port}]: Weblog app is ready!")
warmup_weblog(f"http://{weblog_host}:{weblog_port}/")
logger.info(f"Making a request to weblog [{weblog_host}:{weblog_port}]")
request_uuid = make_get_request(f"http://{weblog_host}:{weblog_port}/")

logger.info(f"Http request done with uuid: [{request_uuid}] for [{weblog_host}:{weblog_port}]")
wait_backend_trace_id(request_uuid, 120.0, profile=True)


Expand Down
12 changes: 9 additions & 3 deletions utils/k8s_lib_injection/k8s_command_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import subprocess, datetime, os, time, signal
import subprocess, datetime, os, time, signal, shlex
from utils.tools import logger
from utils import context
from utils.k8s_lib_injection.k8s_sync_kubectl import KubectlLock
from retry import retry


def execute_command(command, timeout=None, logfile=None):
def execute_command(command, timeout=None, logfile=None, subprocess_env=None):
"""call shell-command and either return its output or kill it
if it doesn't normally exit within timeout seconds and return None"""
applied_timeout = 90
Expand All @@ -16,10 +16,16 @@ def execute_command(command, timeout=None, logfile=None):
command_out_redirect = subprocess.PIPE
if logfile:
command_out_redirect = open(logfile, "w")

if not subprocess_env:
subprocess_env = os.environ.copy()

output = ""
try:
start = datetime.datetime.now()
process = subprocess.Popen(command.split(), stdout=command_out_redirect, stderr=command_out_redirect)
process = subprocess.Popen(
shlex.split(command), stdout=command_out_redirect, stderr=command_out_redirect, env=subprocess_env
)

while process.poll() is None:
time.sleep(0.1)
Expand Down
76 changes: 65 additions & 11 deletions utils/k8s_lib_injection/k8s_kind_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import tempfile
from uuid import uuid4

from utils.k8s_lib_injection.k8s_command_utils import execute_command
from utils.k8s_lib_injection.k8s_command_utils import execute_command, execute_command_sync
from utils.tools import logger
from utils import context

Expand All @@ -21,7 +21,7 @@ def ensure_cluster():

def _ensure_cluster():
k8s_kind_cluster = K8sKindCluster()
k8s_kind_cluster.confiure_ports()
k8s_kind_cluster.configure_networking(docker_in_docker="GITLAB_CI" in os.environ)

kind_data = ""
with open("utils/k8s_lib_injection/resources/kind-config-template.yaml", "r") as file:
Expand All @@ -35,11 +35,18 @@ def _ensure_cluster():
with open(cluster_config, "w") as fp:
fp.write(kind_data)
fp.seek(0)
execute_command(
f"kind create cluster --image=kindest/node:v1.25.3@sha256:f52781bc0d7a19fb6c405c2af83abfeb311f130707a0e219175677e366cc45d1 --name {k8s_kind_cluster.cluster_name} --config {cluster_config} --wait 1m"
)

# time.sleep(20)
kind_command = f"kind create cluster --image=kindest/node:v1.25.3@sha256:f52781bc0d7a19fb6c405c2af83abfeb311f130707a0e219175677e366cc45d1 --name {k8s_kind_cluster.cluster_name} --config {cluster_config} --wait 1m"

if "GITLAB_CI" in os.environ:
# Kind needs to run in bridge network to communicate with the internet: https://github.com/DataDog/buildenv/blob/master/cookbooks/dd_firewall/templates/rules.erb#L96
new_env = os.environ.copy()
new_env["KIND_EXPERIMENTAL_DOCKER_NETWORK"] = "bridge"
execute_command(kind_command, subprocess_env=new_env)

setup_kind_in_gitlab(k8s_kind_cluster)
else:
execute_command(kind_command)

return k8s_kind_cluster

Expand All @@ -49,6 +56,37 @@ def destroy_cluster(k8s_kind_cluster):
execute_command(f"docker rm -f {k8s_kind_cluster.cluster_name}-control-plane")


def setup_kind_in_gitlab(k8s_kind_cluster):
# The build runs in a docker container:
# - Docker commands are forwarded to the host.
# - The kind container is a sibling to the build container
# Three things need to happen
# 1) The kind container needs to be in the bridge network to communicate with the internet: done in _ensure_cluster()
# 2) Kube config needs to be altered to use the correct IP of the control plane server
# 3) The internal ports needs to be used rather than external ports: handled in get_agent_port() and get_weblog_port()
correct_control_plane_ip = execute_command(
f"docker container inspect {k8s_kind_cluster.cluster_name}-control-plane --format '{{{{.NetworkSettings.Networks.bridge.IPAddress}}}}'"
).strip()
if not correct_control_plane_ip:
raise Exception("Unable to find correct control plane IP")
logger.debug(f"[setup_kind_in_gitlab] correct_control_plane_ip: {correct_control_plane_ip}")

control_plane_address_in_config = execute_command(
f'docker container inspect {k8s_kind_cluster.cluster_name}-control-plane --format \'{{{{index .NetworkSettings.Ports "6443/tcp" 0 "HostIp"}}}}:{{{{index .NetworkSettings.Ports "6443/tcp" 0 "HostPort"}}}}\''
).strip()
if not control_plane_address_in_config:
raise Exception("Unable to find control plane address from config")
logger.debug(f"[setup_kind_in_gitlab] control_plane_address_in_config: {control_plane_address_in_config}")

# Replace server config with dns name + internal port
execute_command_sync(
f"sed -i -e 's/{control_plane_address_in_config}/{correct_control_plane_ip}:6443/g' {os.environ['HOME']}/.kube/config",
k8s_kind_cluster,
)

k8s_kind_cluster.cluster_host_name = correct_control_plane_ip


def get_free_port():
last_allowed_port = 65535
port = random.randint(1100, 65100)
Expand All @@ -67,10 +105,26 @@ class K8sKindCluster:
def __init__(self):
self.cluster_name = f"lib-injection-testing-{str(uuid4())[:8]}"
self.context_name = f"kind-{self.cluster_name}"
self.agent_port = 18126
self.weblog_port = 18080

def confiure_ports(self):
# Get random free ports
self.cluster_host_name = "localhost"
self.agent_port = None
self.weblog_port = None
self.internal_agent_port = None
self.internal_weblog_port = None
self.docker_in_docker = False

def configure_networking(self, docker_in_docker=False):
self.docker_in_docker = docker_in_docker
self.agent_port = get_free_port()
self.weblog_port = get_free_port()
self.internal_agent_port = 8126
self.internal_weblog_port = 18080

def get_agent_port(self):
if self.docker_in_docker:
return self.internal_agent_port
return self.agent_port

def get_weblog_port(self):
if self.docker_in_docker:
return self.internal_weblog_port
return self.weblog_port

0 comments on commit 53fe59b

Please sign in to comment.