Skip to content

Commit

Permalink
Remove while loop from check_service
Browse files Browse the repository at this point in the history
Looking at the code it seems that the possible reason for getting the
check_service stuck could be the while loop in the check_service.

I rewrite to something more reliable, that will process chunks of projects at
once. This should remove the infinite loop.

Signed-off-by: Michal Konecny <mkonecny@redhat.com>
  • Loading branch information
Zlopez committed Aug 21, 2024
1 parent c2e6a04 commit 46d2863
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 27 deletions.
42 changes: 17 additions & 25 deletions anitya/check_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""

import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ThreadPoolExecutor, wait
from datetime import datetime
from threading import Lock
from time import sleep
Expand Down Expand Up @@ -189,8 +189,6 @@ def run(self):
self.clear_counters()
queue = self.construct_queue(time)
total_count = len(queue)
projects_left = len(queue)
projects_iter = iter(queue)

if not queue:
return
Expand All @@ -201,32 +199,26 @@ def run(self):
futures = {}
pool_size = config.get("CRON_POOL")
timeout = config.get("CHECK_TIMEOUT")
with ThreadPoolExecutor(pool_size) as pool:
# Wait till every project in queue is checked
while projects_left:
for project in projects_iter:
for i in range(0, len(queue), pool_size):
with ThreadPoolExecutor(pool_size) as pool:
# Wait till every project in chunk is checked
for project in queue[i : i + pool_size]:
future = pool.submit(self.update_project, project)
futures[future] = project
if len(futures) > pool_size:
break # limit job submissions

# Wait for jobs that aren't completed yet
try:
for future in as_completed(futures, timeout=timeout):
projects_left -= 1 # one project down

# log any exception
if future.exception():
try:
future.result()
except Exception as e:
_log.exception(e)

del futures[future]

break # give a chance to add more jobs
except TimeoutError:
projects_left -= 1
(done, not_done) = wait(futures, timeout=timeout)
for future in done:
# log any exception
if future.exception():
try:
future.result()
except Exception as e:
_log.exception(e)
for future in not_done:
future.cancel()
# Don't actually kill the threads, they will be killed by finishing
# the with statement
_log.info("Thread was killed because the execution took too long.")
with self.error_counter_lock:
self.error_counter += 1
Expand Down
5 changes: 3 additions & 2 deletions anitya/tests/test_check_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""

import unittest
from concurrent.futures import Future
from datetime import timedelta
from unittest import mock

Expand Down Expand Up @@ -382,8 +383,8 @@ def test_run_small_pool_size(self, mock_check_project_release):
self.assertEqual(len(run_objects), 1)
self.assertEqual(run_objects[0].total_count, 2)

@mock.patch("anitya.check_service.as_completed", side_effect=TimeoutError())
def test_run_timeout(self, mock_as_completed):
@mock.patch("anitya.check_service.wait", return_value=([], [Future(), Future()]))
def test_run_timeout(self, mock_wait):
"""
Assert that TimeoutError is thrown when TIMEOUT is reached.
"""
Expand Down
1 change: 1 addition & 0 deletions news/1806.dev
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rewrite parallelization part of check service

0 comments on commit 46d2863

Please sign in to comment.