Skip to content

Commit

Permalink
Better cleanup of scheduled tasks (#3150)
Browse files Browse the repository at this point in the history
* Better cleanup of scheduled tasks

* Move final write

* Remove hass from QueueManager
  • Loading branch information
ludeeus authored Jun 16, 2023
1 parent 6ac1100 commit 8219c81
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 95 deletions.
27 changes: 16 additions & 11 deletions custom_components/hacs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
from awesomeversion import AwesomeVersion
from homeassistant.components.lovelace.system_health import system_health_info
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import Platform, __version__ as HAVERSION
from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
Platform,
__version__ as HAVERSION,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.discovery import async_load_platform
Expand Down Expand Up @@ -86,7 +90,7 @@ async def async_initialize_integration(
hacs.version = integration.version
hacs.configuration.dev = integration.version == "0.0.0"
hacs.hass = hass
hacs.queue = QueueManager(hass=hass)
hacs.queue = QueueManager()
hacs.data = HacsData(hacs=hacs)
hacs.data_client = HacsDataClient(
session=clientsession,
Expand Down Expand Up @@ -209,6 +213,15 @@ async def async_try_startup(_=None):
return
hacs.enable_hacs()

if config_entry is None:
hacs.recuring_tasks.append(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, hacs.async_cleanup_tasks)
)
else:
config_entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, hacs.async_cleanup_tasks)
)

await async_try_startup()

# Mischief managed!
Expand All @@ -232,15 +245,7 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
"""Handle removal of an entry."""
hacs: HacsBase = hass.data[DOMAIN]

# Clear out pending queue
hacs.queue.clear()

for task in hacs.recuring_tasks:
# Cancel all pending tasks
task()

# Store data
await hacs.data.async_write(force=True)
await hacs.async_cleanup_tasks()

try:
if hass.data.get("frontend_panels", {}).get("hacs"):
Expand Down
54 changes: 30 additions & 24 deletions custom_components/hacs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
from aiohttp.client import ClientSession, ClientTimeout
from awesomeversion import AwesomeVersion
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.const import Platform
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.loader import Integration
from homeassistant.util import dt
Expand Down Expand Up @@ -360,7 +361,7 @@ class HacsBase:
integration: Integration | None = None
log: logging.Logger = LOGGER
queue: QueueManager | None = None
recuring_tasks = []
recuring_tasks: list[CALLBACK_TYPE] = []
repositories: HacsRepositories = HacsRepositories()
repository: AIOGitHubAPIRepository | None = None
session: ClientSession | None = None
Expand Down Expand Up @@ -620,57 +621,51 @@ async def startup_tasks(self, _=None) -> None:

if not self.configuration.experimental:
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.async_update_downloaded_repositories, timedelta(hours=48)
async_track_time_interval(
self.hass, self.async_update_downloaded_repositories, timedelta(hours=48)
)
)
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
async_track_time_interval(
self.hass,
self.async_update_all_repositories,
timedelta(hours=96),
)
)
else:
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
async_track_time_interval(
self.hass,
self.async_load_hacs_from_github,
timedelta(hours=48),
)
)

self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.async_update_downloaded_custom_repositories, timedelta(hours=48)
async_track_time_interval(
self.hass, self.async_update_downloaded_custom_repositories, timedelta(hours=48)
)
)

self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.async_get_all_category_repositories, timedelta(hours=6)
async_track_time_interval(
self.hass, self.async_get_all_category_repositories, timedelta(hours=6)
)
)

self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.async_check_rate_limit, timedelta(minutes=5)
)
async_track_time_interval(self.hass, self.async_check_rate_limit, timedelta(minutes=5))
)
self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.async_prosess_queue, timedelta(minutes=10)
)
async_track_time_interval(self.hass, self.async_prosess_queue, timedelta(minutes=10))
)

self.recuring_tasks.append(
self.hass.helpers.event.async_track_time_interval(
self.async_handle_critical_repositories, timedelta(hours=6)
async_track_time_interval(
self.hass, self.async_handle_critical_repositories, timedelta(hours=6)
)
)

self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_FINAL_WRITE, self.data.async_force_write
)

self.log.debug("There are %s scheduled recurring tasks", len(self.recuring_tasks))

self.status.startup = False
Expand All @@ -689,6 +684,17 @@ async def startup_tasks(self, _=None) -> None:

self.async_dispatch(HacsDispatchEvent.STATUS, {})

async def async_cleanup_tasks(self, _=None) -> None:
"""HACS cleanup tasks."""
self.log.debug("Running cleanup tasks")
for task in self.recuring_tasks:
task()
self.recuring_tasks = []

self.queue.clear()
await self.data.async_write(force=True)
self.log.debug("Completed cleanup tasks")

async def async_download_file(self, url: str, *, headers: dict | None = None) -> bytes | None:
"""Download files, and return the content."""
if url is None:
Expand Down Expand Up @@ -1071,7 +1077,7 @@ async def async_update_downloaded_custom_repositories(self, _=None) -> None:

async def async_handle_critical_repositories(self, _=None) -> None:
"""Handle critical repositories."""
critical_queue = QueueManager(hass=self.hass)
critical_queue = QueueManager()
instored = []
critical = []
was_installed = False
Expand Down
4 changes: 2 additions & 2 deletions custom_components/hacs/repositories/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ async def download_zip_files(self, validate) -> None:
validate.errors.append(f"No assets found for release '{self.ref}'")
return

download_queue = QueueManager(hass=self.hacs.hass)
download_queue = QueueManager()

for content in contents or []:
download_queue.add(self.async_download_zip_file(content, validate))
Expand Down Expand Up @@ -637,7 +637,7 @@ async def download_content(self) -> None:
if not contents:
raise HacsException("No content to download")

download_queue = QueueManager(hass=self.hacs.hass)
download_queue = QueueManager()

for content in contents:
if self.repository_manifest.content_in_root and self.repository_manifest.filename:
Expand Down
76 changes: 43 additions & 33 deletions custom_components/hacs/utils/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
import time
from typing import Coroutine

from homeassistant.core import HomeAssistant

from ..exceptions import HacsExecutionStillInProgress
from .logger import LOGGER

_LOGGER = LOGGER


class QueueManager:
"""The QueueManager class."""

def __init__(self, hass: HomeAssistant) -> None:
self.hass = hass
self.queue: list[Coroutine] = []
self.running = False
def __init__(self) -> None:
self._queue: list[asyncio.Task] = []
self._execution_group: asyncio.Future | None = None
self._stopping = False

@property
def running(self) -> bool:
"""Return a bool indicating if we are already running."""
return self._execution_group is not None

@property
def pending_tasks(self) -> int:
"""Return a count of pending tasks in the queue."""
return len(self.queue)
return len(self._queue)

@property
def has_pending_tasks(self) -> bool:
Expand All @@ -33,49 +34,58 @@ def has_pending_tasks(self) -> bool:

def clear(self) -> None:
"""Clear the queue."""
self.queue = []
self._stopping = True
for task in self._queue:
task.cancel()
if self._execution_group is not None:
self._execution_group.cancel()
self._execution_group = None
self._queue = []
self._stopping = False

def add(self, task: Coroutine) -> None:
"""Add a task to the queue."""
self.queue.append(task)
_task = asyncio.create_task(task)
if self._stopping:
_task.cancel()
return
self._queue.append(_task)

async def execute(self, number_of_tasks: int | None = None) -> None:
"""Execute the tasks in the queue."""
if self.running:
_LOGGER.debug("<QueueManager> Execution is already running")
LOGGER.debug("<QueueManager> Execution is already running")
raise HacsExecutionStillInProgress
if len(self.queue) == 0:
_LOGGER.debug("<QueueManager> The queue is empty")
if self.pending_tasks == 0:
LOGGER.debug("<QueueManager> The queue is empty")
return
if self._stopping:
LOGGER.debug("<QueueManager> The queue is stopping")
return

self.running = True

_LOGGER.debug("<QueueManager> Checking out tasks to execute")
local_queue = []
LOGGER.debug("<QueueManager> Checking out tasks to execute")
local_queue: list[asyncio.Task] = []

if number_of_tasks:
for task in self.queue[:number_of_tasks]:
local_queue.append(task)
else:
for task in self.queue:
local_queue.append(task)
for task in self._queue[:number_of_tasks]:
local_queue.append(task)
self._queue.remove(task)

for task in local_queue:
self.queue.remove(task)
local_queue_count = len(local_queue)

_LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", local_queue_count)
start = time.time()
result = await asyncio.gather(*local_queue, return_exceptions=True)
self._execution_group = asyncio.gather(*local_queue, return_exceptions=True)
result = await self._execution_group
for entry in result:
if isinstance(entry, Exception):
_LOGGER.error("<QueueManager> %s", entry)
LOGGER.error("<QueueManager> %s", entry)
end = time.time() - start

_LOGGER.debug(
LOGGER.debug(
"<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
len(local_queue),
local_queue_count,
end,
)
if self.has_pending_tasks:
_LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
self.running = False
LOGGER.debug("<QueueManager> %s tasks remaining in the queue", self.pending_tasks)
self._execution_group = None
2 changes: 1 addition & 1 deletion scripts/data/generate_category_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(self, session: ClientSession, *, token: str | None = None):
"""Initialize."""
super().__init__()
self.hass = HomeAssistant()
self.queue = QueueManager(self.hass)
self.queue = QueueManager()
self.repositories = HacsRepositories()
self.system.generator = True
self.session = session
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def hacs(hass: HomeAssistant):
)
hacs_obj.common = HacsCommon()
hacs_obj.data = AsyncMock()
hacs_obj.queue = QueueManager(hass=hass)
hacs_obj.queue = QueueManager()
hacs_obj.core = HacsCore()
hacs_obj.system = HacsSystem()

Expand Down
Loading

0 comments on commit 8219c81

Please sign in to comment.