Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Better cleanup of scheduled tasks (#3150)" #3183

Merged
merged 1 commit into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions custom_components/hacs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
from awesomeversion import AwesomeVersion
from homeassistant.components.lovelace.system_health import system_health_info
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
EVENT_HOMEASSISTANT_STOP,
Platform,
__version__ as HAVERSION,
)
from homeassistant.const import Platform, __version__ as HAVERSION
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.discovery import async_load_platform
Expand Down Expand Up @@ -92,7 +88,7 @@ async def async_initialize_integration(
hacs.version = integration.version
hacs.configuration.dev = integration.version == "0.0.0"
hacs.hass = hass
hacs.queue = QueueManager()
hacs.queue = QueueManager(hass=hass)
hacs.data = HacsData(hacs=hacs)
hacs.data_client = HacsDataClient(
session=clientsession,
Expand Down Expand Up @@ -215,15 +211,6 @@ async def async_try_startup(_=None):
return
hacs.enable_hacs()

if config_entry is None:
hacs.recuring_tasks.append(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, hacs.async_cleanup_tasks)
)
else:
config_entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, hacs.async_cleanup_tasks)
)

await async_try_startup()

# Mischief managed!
Expand Down Expand Up @@ -265,7 +252,15 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
"""Handle removal of an entry."""
hacs: HacsBase = hass.data[DOMAIN]

await hacs.async_cleanup_tasks()
# Clear out pending queue
hacs.queue.clear()

for task in hacs.recuring_tasks:
# Cancel all pending tasks
task()

# Store data
await hacs.data.async_write(force=True)

try:
if hass.data.get("frontend_panels", {}).get("hacs"):
Expand Down
54 changes: 24 additions & 30 deletions custom_components/hacs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
from aiohttp.client import ClientSession, ClientTimeout
from awesomeversion import AwesomeVersion
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import Platform
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.loader import Integration
from homeassistant.util import dt
Expand Down Expand Up @@ -361,7 +360,7 @@ class HacsBase:
integration: Integration | None = None
log: logging.Logger = LOGGER
queue: QueueManager | None = None
recuring_tasks: list[CALLBACK_TYPE] = []
recuring_tasks = []
repositories: HacsRepositories = HacsRepositories()
repository: AIOGitHubAPIRepository | None = None
session: ClientSession | None = None
Expand Down Expand Up @@ -621,51 +620,57 @@ async def startup_tasks(self, _=None) -> None:

if not self.configuration.experimental:
self.recuring_tasks.append(
async_track_time_interval(
self.hass, self.async_update_downloaded_repositories, timedelta(hours=48)
self.hass.helpers.event.async_track_time_interval(
self.async_update_downloaded_repositories, timedelta(hours=48)
)
)
self.recuring_tasks.append(
async_track_time_interval(
self.hass,
self.hass.helpers.event.async_track_time_interval(
self.async_update_all_repositories,
timedelta(hours=96),
)
)
else:
self.recuring_tasks.append(
async_track_time_interval(
self.hass,
self.hass.helpers.event.async_track_time_interval(
self.async_load_hacs_from_github,
timedelta(hours=48),
)
)

self.recuring_tasks.append(
async_track_time_interval(
self.hass, self.async_update_downloaded_custom_repositories, timedelta(hours=48)
self.hass.helpers.event.async_track_time_interval(
self.async_update_downloaded_custom_repositories, timedelta(hours=48)
)
)

self.recuring_tasks.append(
async_track_time_interval(
self.hass, self.async_get_all_category_repositories, timedelta(hours=6)
self.hass.helpers.event.async_track_time_interval(
self.async_get_all_category_repositories, timedelta(hours=6)
)
)

self.recuring_tasks.append(
async_track_time_interval(self.hass, self.async_check_rate_limit, timedelta(minutes=5))
self.hass.helpers.event.async_track_time_interval(
self.async_check_rate_limit, timedelta(minutes=5)
)
)
self.recuring_tasks.append(
async_track_time_interval(self.hass, self.async_prosess_queue, timedelta(minutes=10))
self.hass.helpers.event.async_track_time_interval(
self.async_prosess_queue, timedelta(minutes=10)
)
)

self.recuring_tasks.append(
async_track_time_interval(
self.hass, self.async_handle_critical_repositories, timedelta(hours=6)
self.hass.helpers.event.async_track_time_interval(
self.async_handle_critical_repositories, timedelta(hours=6)
)
)

self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_FINAL_WRITE, self.data.async_force_write
)

self.log.debug("There are %s scheduled recurring tasks", len(self.recuring_tasks))

self.status.startup = False
Expand All @@ -684,17 +689,6 @@ async def startup_tasks(self, _=None) -> None:

self.async_dispatch(HacsDispatchEvent.STATUS, {})

async def async_cleanup_tasks(self, _=None) -> None:
"""HACS cleanup tasks."""
self.log.debug("Running cleanup tasks")
for task in self.recuring_tasks:
task()
self.recuring_tasks = []

self.queue.clear()
await self.data.async_write(force=True)
self.log.debug("Completed cleanup tasks")

async def async_download_file(self, url: str, *, headers: dict | None = None) -> bytes | None:
"""Download files, and return the content."""
if url is None:
Expand Down Expand Up @@ -1077,7 +1071,7 @@ async def async_update_downloaded_custom_repositories(self, _=None) -> None:

async def async_handle_critical_repositories(self, _=None) -> None:
"""Handle critical repositories."""
critical_queue = QueueManager()
critical_queue = QueueManager(hass=self.hass)
instored = []
critical = []
was_installed = False
Expand Down
4 changes: 2 additions & 2 deletions custom_components/hacs/repositories/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ async def download_zip_files(self, validate) -> None:
validate.errors.append(f"No assets found for release '{self.ref}'")
return

download_queue = QueueManager()
download_queue = QueueManager(hass=self.hacs.hass)

for content in contents or []:
download_queue.add(self.async_download_zip_file(content, validate))
Expand Down Expand Up @@ -637,7 +637,7 @@ async def download_content(self) -> None:
if not contents:
raise HacsException("No content to download")

download_queue = QueueManager()
download_queue = QueueManager(hass=self.hacs.hass)

for content in contents:
if self.repository_manifest.content_in_root and self.repository_manifest.filename:
Expand Down
76 changes: 33 additions & 43 deletions custom_components/hacs/utils/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@
import time
from typing import Coroutine

from homeassistant.core import HomeAssistant

from ..exceptions import HacsExecutionStillInProgress
from .logger import LOGGER

_LOGGER = LOGGER


class QueueManager:
"""The QueueManager class."""

def __init__(self) -> None:
self._queue: list[asyncio.Task] = []
self._execution_group: asyncio.Future | None = None
self._stopping = False

@property
def running(self) -> bool:
"""Return a bool indicating if we are already running."""
return self._execution_group is not None
def __init__(self, hass: HomeAssistant) -> None:
self.hass = hass
self.queue: list[Coroutine] = []
self.running = False

@property
def pending_tasks(self) -> int:
"""Return a count of pending tasks in the queue."""
return len(self._queue)
return len(self.queue)

@property
def has_pending_tasks(self) -> bool:
Expand All @@ -34,58 +33,49 @@ def has_pending_tasks(self) -> bool:

def clear(self) -> None:
"""Clear the queue."""
self._stopping = True
for task in self._queue:
task.cancel()
if self._execution_group is not None:
self._execution_group.cancel()
self._execution_group = None
self._queue = []
self._stopping = False
self.queue = []

def add(self, task: Coroutine) -> None:
"""Add a task to the queue."""
_task = asyncio.create_task(task)
if self._stopping:
_task.cancel()
return
self._queue.append(_task)
self.queue.append(task)

async def execute(self, number_of_tasks: int | None = None) -> None:
"""Execute the tasks in the queue."""
if self.running:
LOGGER.debug("<QueueManager> Execution is already running")
_LOGGER.debug("<QueueManager> Execution is already running")
raise HacsExecutionStillInProgress
if self.pending_tasks == 0:
LOGGER.debug("<QueueManager> The queue is empty")
return
if self._stopping:
LOGGER.debug("<QueueManager> The queue is stopping")
if len(self.queue) == 0:
_LOGGER.debug("<QueueManager> The queue is empty")
return

LOGGER.debug("<QueueManager> Checking out tasks to execute")
local_queue: list[asyncio.Task] = []
self.running = True

_LOGGER.debug("<QueueManager> Checking out tasks to execute")
local_queue = []

for task in self._queue[:number_of_tasks]:
local_queue.append(task)
self._queue.remove(task)
if number_of_tasks:
for task in self.queue[:number_of_tasks]:
local_queue.append(task)
else:
for task in self.queue:
local_queue.append(task)

local_queue_count = len(local_queue)
for task in local_queue:
self.queue.remove(task)

LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", local_queue_count)
_LOGGER.debug("<QueueManager> Starting queue execution for %s tasks", len(local_queue))
start = time.time()
self._execution_group = asyncio.gather(*local_queue, return_exceptions=True)
result = await self._execution_group
result = await asyncio.gather(*local_queue, return_exceptions=True)
for entry in result:
if isinstance(entry, Exception):
LOGGER.error("<QueueManager> %s", entry)
_LOGGER.error("<QueueManager> %s", entry)
end = time.time() - start

LOGGER.debug(
_LOGGER.debug(
"<QueueManager> Queue execution finished for %s tasks finished in %.2f seconds",
local_queue_count,
len(local_queue),
end,
)
if self.has_pending_tasks:
LOGGER.debug("<QueueManager> %s tasks remaining in the queue", self.pending_tasks)
self._execution_group = None
_LOGGER.debug("<QueueManager> %s tasks remaining in the queue", len(self.queue))
self.running = False
2 changes: 1 addition & 1 deletion scripts/data/generate_category_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __init__(self, session: ClientSession, *, token: str | None = None):
"""Initialize."""
super().__init__()
self.hass = HomeAssistant()
self.queue = QueueManager()
self.queue = QueueManager(self.hass)
self.repositories = HacsRepositories()
self.system.generator = True
self.session = session
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def hacs(hass: HomeAssistant):
)
hacs_obj.common = HacsCommon()
hacs_obj.data = AsyncMock()
hacs_obj.queue = QueueManager()
hacs_obj.queue = QueueManager(hass=hass)
hacs_obj.core = HacsCore()
hacs_obj.system = HacsSystem()

Expand Down
Loading