Skip to content

Commit

Permalink
Asynchronously wait for results while profiling the pipeline
Browse files Browse the repository at this point in the history
closes pulp#2445
  • Loading branch information
lubosmj authored and bmbouter committed Aug 23, 2022
1 parent 8ae1955 commit bc001ea
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGES/2445.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a bug in the profiling engine.
7 changes: 3 additions & 4 deletions docs/plugins/api-reference/profiling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ Profiling the Stages API Performance
====================================

Pulp has a performance data collection feature that collects statistics about a Stages API pipeline
as it runs. The data is recorded to a sqlite3 database in the `/var/lib/pulp/debug` folder.
as it runs. The data is recorded to a sqlite3 database in the ``/var/lib/pulp/debug`` directory.

This can be enabled with the `PROFILE_STAGES_API = True` setting in the Pulp settings file. Once
enabled it will write a sqlite3 with the uuid of the task name it runs in to the
`/var/lib/pulp/debug/` folder.
The feature can be activated by declaring the setting ``PROFILE_STAGES_API=True`` in the settings
file. Once enabled, Pulp will record the statistics with the UUID of the task name it runs.

Summarizing Performance Data
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/plugin/stages/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ async def run(self):
history.add(stage)
if i < len(stages) - 1:
if settings.PROFILE_STAGES_API:
out_q = ProfilingQueue.make_and_record_queue(stages[i + 1], i + 1, maxsize)
out_q = await ProfilingQueue.make_and_record_queue(stages[i + 1], i + 1, maxsize)
else:
out_q = asyncio.Queue(maxsize=maxsize)
else:
Expand Down
34 changes: 21 additions & 13 deletions pulpcore/plugin/stages/profiler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asgiref.sync import sync_to_async
from asyncio import Queue
import pathlib
import time
Expand Down Expand Up @@ -66,15 +67,22 @@ def put_nowait(self, item):
except KeyError:
pass
else:
service_time = now - item.extra_data["last_get_time"]
sql = (
"INSERT INTO traffic (uuid, waiting_time, service_time) VALUES ("
"'{uuid}','{waiting_time}','{service_time}')"
)
formatted_sql = sql.format(
uuid=self.stage_uuid, waiting_time=last_waiting_time, service_time=service_time
)
CONN.cursor().execute(formatted_sql)
last_get_time = item.extra_data["last_get_time"]
# the extra_data dictionary might be initialized from within the plugin as
# 'defaultdict' returning empty lists by default; with this if statement,
# we prevent errors like "(unsupported operand type(s) for -: 'float' and 'list')"
if last_get_time or last_get_time == 0:
service_time = now - last_get_time
sql = (
"INSERT INTO traffic (uuid, waiting_time, service_time) VALUES ("
"'{uuid}','{waiting_time}','{service_time}')"
)
formatted_sql = sql.format(
uuid=self.stage_uuid,
waiting_time=last_waiting_time,
service_time=service_time,
)
CONN.cursor().execute(formatted_sql)

interarrival_time = now - self.last_arrival_time
sql = (
Expand All @@ -92,7 +100,7 @@ def put_nowait(self, item):
return super().put_nowait(item)

@staticmethod
def make_and_record_queue(stage, num, maxsize):
async def make_and_record_queue(stage, num, maxsize):
"""
Create a ProfileQueue that is associated with the stage it feeds and record it in sqlite3.
Expand All @@ -105,7 +113,7 @@ def make_and_record_queue(stage, num, maxsize):
ProfilingQueue: The configured ProfilingQueue that was also recorded in the db.
"""
if CONN is None:
create_profile_db_and_connection()
await create_profile_db_and_connection()
stage_id = uuid.uuid4()
stage_name = ".".join([stage.__class__.__module__, stage.__class__.__name__])
sql = "INSERT INTO stages (uuid, name, num) VALUES ('{uuid}','{stage}','{num}')"
Expand All @@ -116,7 +124,7 @@ def make_and_record_queue(stage, num, maxsize):
return in_q


def create_profile_db_and_connection():
async def create_profile_db_and_connection():
"""
Create a profile db from this tasks UUID and a sqlite3 connection to that databases.
Expand All @@ -139,7 +147,7 @@ def create_profile_db_and_connection():
"""
debug_data_dir = "/var/lib/pulp/debug/"
pathlib.Path(debug_data_dir).mkdir(parents=True, exist_ok=True)
current_task = Task.current()
current_task = await sync_to_async(Task.current)()
if current_task:
db_path = debug_data_dir + str(current_task.pk)
else:
Expand Down

0 comments on commit bc001ea

Please sign in to comment.