diff --git a/CHANGES/2445.misc b/CHANGES/2445.misc new file mode 100644 index 0000000000..81d2c0b91b --- /dev/null +++ b/CHANGES/2445.misc @@ -0,0 +1 @@ +Fixed a bug in the profiling engine. diff --git a/docs/plugins/api-reference/profiling.rst b/docs/plugins/api-reference/profiling.rst index 557362ba6b..2412da0002 100644 --- a/docs/plugins/api-reference/profiling.rst +++ b/docs/plugins/api-reference/profiling.rst @@ -4,11 +4,10 @@ Profiling the Stages API Performance ==================================== Pulp has a performance data collection feature that collects statistics about a Stages API pipeline -as it runs. The data is recorded to a sqlite3 database in the `/var/lib/pulp/debug` folder. +as it runs. The data is recorded to a sqlite3 database in the ``/var/lib/pulp/debug`` directory. -This can be enabled with the `PROFILE_STAGES_API = True` setting in the Pulp settings file. Once -enabled it will write a sqlite3 with the uuid of the task name it runs in to the -`/var/lib/pulp/debug/` folder. +The feature can be activated by declaring the setting ``PROFILE_STAGES_API=True`` in the settings +file. Once enabled, Pulp will record the statistics with the UUID of the task name it runs. Summarizing Performance Data ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/pulpcore/plugin/stages/api.py b/pulpcore/plugin/stages/api.py index 006d7167c9..0281289188 100644 --- a/pulpcore/plugin/stages/api.py +++ b/pulpcore/plugin/stages/api.py @@ -212,7 +212,7 @@ async def run(self): history.add(stage) if i < len(stages) - 1: if settings.PROFILE_STAGES_API: - out_q = ProfilingQueue.make_and_record_queue(stages[i + 1], i + 1, maxsize) + out_q = await ProfilingQueue.make_and_record_queue(stages[i + 1], i + 1, maxsize) else: out_q = asyncio.Queue(maxsize=maxsize) else: diff --git a/pulpcore/plugin/stages/profiler.py b/pulpcore/plugin/stages/profiler.py index de42ba63f3..1e06e4a4a7 100644 --- a/pulpcore/plugin/stages/profiler.py +++ b/pulpcore/plugin/stages/profiler.py @@ -1,3 +1,4 @@ +from asgiref.sync import sync_to_async from asyncio import Queue import pathlib import time @@ -66,15 +67,22 @@ def put_nowait(self, item): except KeyError: pass else: - service_time = now - item.extra_data["last_get_time"] - sql = ( - "INSERT INTO traffic (uuid, waiting_time, service_time) VALUES (" - "'{uuid}','{waiting_time}','{service_time}')" - ) - formatted_sql = sql.format( - uuid=self.stage_uuid, waiting_time=last_waiting_time, service_time=service_time - ) - CONN.cursor().execute(formatted_sql) + last_get_time = item.extra_data["last_get_time"] + # the extra_data dictionary might be initialized from within the plugin as + # 'defaultdict' returning empty lists by default; with this if statement, + # we prevent errors like "(unsupported operand type(s) for -: 'float' and 'list')" + if last_get_time or last_get_time == 0: + service_time = now - last_get_time + sql = ( + "INSERT INTO traffic (uuid, waiting_time, service_time) VALUES (" + "'{uuid}','{waiting_time}','{service_time}')" + ) + formatted_sql = sql.format( + uuid=self.stage_uuid, + waiting_time=last_waiting_time, + service_time=service_time, + ) + CONN.cursor().execute(formatted_sql) interarrival_time = now - self.last_arrival_time sql = ( @@ -92,7 +100,7 @@ def put_nowait(self, item): return super().put_nowait(item) @staticmethod - def make_and_record_queue(stage, num, maxsize): + async def make_and_record_queue(stage, num, maxsize): """ Create a ProfileQueue that is associated with the stage it feeds and record it in sqlite3. @@ -105,7 +113,7 @@ def make_and_record_queue(stage, num, maxsize): ProfilingQueue: The configured ProfilingQueue that was also recorded in the db. """ if CONN is None: - create_profile_db_and_connection() + await create_profile_db_and_connection() stage_id = uuid.uuid4() stage_name = ".".join([stage.__class__.__module__, stage.__class__.__name__]) sql = "INSERT INTO stages (uuid, name, num) VALUES ('{uuid}','{stage}','{num}')" @@ -116,7 +124,7 @@ def make_and_record_queue(stage, num, maxsize): return in_q -def create_profile_db_and_connection(): +async def create_profile_db_and_connection(): """ Create a profile db from this tasks UUID and a sqlite3 connection to that databases. @@ -139,7 +147,7 @@ def create_profile_db_and_connection(): """ debug_data_dir = "/var/lib/pulp/debug/" pathlib.Path(debug_data_dir).mkdir(parents=True, exist_ok=True) - current_task = Task.current() + current_task = await sync_to_async(Task.current)() if current_task: db_path = debug_data_dir + str(current_task.pk) else: