Skip to content

Commit

Permalink
fix(celery): change out.host tags to point to celery broker (#10750)
Browse files Browse the repository at this point in the history
Co-authored-by: wconti27 <william.conti@datadoghq.com>
  • Loading branch information
ZStriker19 and wconti27 authored Nov 25, 2024
1 parent 81f52ca commit b9573be
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 119 deletions.
37 changes: 0 additions & 37 deletions .riot/requirements/118ee6f.txt

This file was deleted.

35 changes: 0 additions & 35 deletions .riot/requirements/138c2b7.txt

This file was deleted.

35 changes: 35 additions & 0 deletions .riot/requirements/1509aa1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1509aa1.in
#
amqp==5.3.1
attrs==24.2.0
billiard==4.2.1
celery[redis]==5.4.0
click==8.1.7
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
coverage[toml]==7.6.8
hypothesis==6.45.0
iniconfig==2.0.0
kombu==5.4.2
mock==5.1.0
more-itertools==8.10.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
prompt-toolkit==3.0.48
pytest==8.3.3
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
python-dateutil==2.9.0.post0
redis==5.2.0
six==1.16.0
sortedcontainers==2.4.0
tzdata==2024.2
vine==5.1.0
wcwidth==0.2.13
35 changes: 35 additions & 0 deletions .riot/requirements/1df4aa0.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1df4aa0.in
#
amqp==5.3.1
attrs==24.2.0
billiard==4.2.1
celery[redis]==5.4.0
click==8.1.7
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
coverage[toml]==7.6.8
hypothesis==6.45.0
iniconfig==2.0.0
kombu==5.4.2
mock==5.1.0
more-itertools==8.10.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
prompt-toolkit==3.0.48
pytest==8.3.3
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
python-dateutil==2.9.0.post0
redis==5.2.0
six==1.16.0
sortedcontainers==2.4.0
tzdata==2024.2
vine==5.1.0
wcwidth==0.2.13
38 changes: 38 additions & 0 deletions .riot/requirements/654f8c0.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/654f8c0.in
#
amqp==5.3.1
async-timeout==5.0.1
attrs==24.2.0
billiard==4.2.1
celery[redis]==5.4.0
click==8.1.7
click-didyoumean==0.3.1
click-plugins==1.1.1
click-repl==0.3.0
coverage[toml]==7.6.8
exceptiongroup==1.2.2
hypothesis==6.45.0
iniconfig==2.0.0
kombu==5.4.2
mock==5.1.0
more-itertools==8.10.0
opentracing==2.4.0
packaging==24.2
pluggy==1.5.0
prompt-toolkit==3.0.48
pytest==8.3.3
pytest-cov==6.0.0
pytest-mock==3.14.0
pytest-randomly==3.16.0
python-dateutil==2.9.0.post0
redis==5.2.0
six==1.16.0
sortedcontainers==2.4.0
tomli==2.1.0
tzdata==2024.2
vine==5.1.0
wcwidth==0.2.13
35 changes: 0 additions & 35 deletions .riot/requirements/91a1ee4.txt

This file was deleted.

22 changes: 18 additions & 4 deletions ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from urllib.parse import urlparse

from celery import current_app
from celery import registry
from celery.utils import nodenames

from ddtrace import Pin
from ddtrace import config
Expand Down Expand Up @@ -181,9 +183,21 @@ def trace_after_publish(*args, **kwargs):
if span is None:
return
else:
nodename = span.get_tag("celery.hostname")
if nodename is not None:
_, host = nodenames.nodesplit(nodename)
broker_url = current_app.conf.broker_url

if broker_url == "memory://":
host = broker_url
else:
parsed_url = urlparse(broker_url)

host = None
if parsed_url.hostname:
host = parsed_url.hostname

if parsed_url.port:
span.set_metric(net.TARGET_PORT, parsed_url.port)

if host:
span.set_tag_str(net.TARGET_HOST, host)

span.finish()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
celery: Changes celery ``out.host`` span tag to point towards broker host url instead of local celery process hostname. Fixes
inferred service representation issues when using celery.
3 changes: 1 addition & 2 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,10 +720,9 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT
"PYTEST_PLUGINS": "celery.contrib.pytest",
},
pkgs={
"celery": [
"celery[redis]": [
latest,
],
"redis": "~=3.5",
},
),
],
Expand Down
6 changes: 6 additions & 0 deletions tests/contrib/celery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
from ddtrace.contrib.celery import unpatch
from tests.utils import TracerTestCase

from ..config import RABBITMQ_CONFIG
from ..config import REDIS_CONFIG


REDIS_URL = "redis://127.0.0.1:{port}".format(port=REDIS_CONFIG["port"])
BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0)
BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1)

AMQP_URL = "amqp://{user}:{password}@127.0.0.1:{port}".format(
user=RABBITMQ_CONFIG["user"], password=RABBITMQ_CONFIG["password"], port=RABBITMQ_CONFIG["port"]
)
AMQP_BROKER_URL = "{amqp}//".format(amqp=AMQP_URL)


@pytest.fixture(scope="session")
def celery_config():
Expand Down
7 changes: 3 additions & 4 deletions tests/contrib/celery/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections import Counter
import os
import socket
import subprocess
from time import sleep

Expand Down Expand Up @@ -194,7 +193,7 @@ def fn_task_parameters(user, force_logout=False):
assert async_span.get_tag("celery.routing_key") == "celery"
assert async_span.get_tag("component") == "celery"
assert async_span.get_tag("span.kind") == "producer"
assert async_span.get_tag("out.host") == socket.gethostname()
assert async_span.get_tag("out.host") == "memory://"
else:
assert 1 == len(traces)
assert 1 == len(traces[0])
Expand Down Expand Up @@ -239,7 +238,7 @@ def fn_task_parameters(user, force_logout=False):
assert async_span.get_tag("celery.routing_key") == "celery"
assert async_span.get_tag("component") == "celery"
assert async_span.get_tag("span.kind") == "producer"
assert async_span.get_tag("out.host") == socket.gethostname()
assert async_span.get_tag("out.host") == "memory://"
else:
assert 1 == len(traces)
assert 1 == len(traces[0])
Expand Down Expand Up @@ -635,7 +634,7 @@ def fn_task_parameters(user, force_logout=False):
assert async_span.get_tag("celery.routing_key") == "celery"
assert async_span.get_tag("component") == "celery"
assert async_span.get_tag("span.kind") == "producer"
assert async_span.get_tag("out.host") == socket.gethostname()
assert async_span.get_tag("out.host") == "memory://"

run_span = self.find_span(name="celery.run")
assert run_span.name == "celery.run"
Expand Down
Loading

0 comments on commit b9573be

Please sign in to comment.