Skip to content

Commit

Permalink
feat(update): implement asynchronous update checking for Windows and …
Browse files Browse the repository at this point in the history
…Winget

This commit introduces the `UpdateWorker` class, which utilizes QThread to perform update checks for both Windows and Winget asynchronously. This change enhances the user interface by preventing blocking during update checks, allowing for a smoother experience. The `start_windows_update_timer` and `start_winget_update_timer` methods have been updated to initiate these workers, and signals are emitted with the results of the update searches. Additionally, the previous synchronous update methods have been removed to streamline the code. Error handling has been improved to log issues encountered during update checks.
  • Loading branch information
amnweb committed Nov 26, 2024
1 parent 8f202ea commit a21276e
Showing 1 changed file with 175 additions and 106 deletions.
281 changes: 175 additions & 106 deletions src/core/widgets/yasb/update_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,88 @@
from core.widgets.base import BaseWidget
from core.validation.widgets.yasb.update_check import VALIDATION_SCHEMA
from PyQt6.QtWidgets import QLabel, QHBoxLayout, QWidget
from PyQt6.QtCore import Qt
from PyQt6.QtCore import Qt, QThread, pyqtSignal
import win32com.client
import subprocess
import logging

class UpdateWorker(QThread):
windows_update_signal = pyqtSignal(dict)
winget_update_signal = pyqtSignal(dict)

def __init__(self, update_type, parent=None):
super().__init__(parent)
self.update_type = update_type
self.running = True

def stop(self):
self.running = False
self.wait()

def run(self):
try:
if self.update_type == 'windows':
update_session = win32com.client.Dispatch("Microsoft.Update.Session")
update_searcher = update_session.CreateUpdateSearcher()
search_result = update_searcher.Search("IsInstalled=0")
count = search_result.Updates.Count
update_names = [update.Title for update in search_result.Updates]
self.windows_update_signal.emit({"count": count, "names": update_names})

elif self.update_type == 'winget':
result = subprocess.run(
['winget', 'upgrade'],
capture_output=True,
text=True,
check=True,
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW
)

lines = result.stdout.strip().split('\n')
fl = 0
while not lines[fl].startswith("Name"):
fl += 1

id_start = lines[fl].index("Id")
version_start = lines[fl].index("Version")
available_start = lines[fl].index("Available")
source_start = lines[fl].index("Source")

upgrade_list = []
for line in lines[fl + 1:]:
if line.startswith("The following packages have an upgrade available"):
break
if len(line) > (available_start + 1) and not line.startswith('-'):
name = line[:id_start].strip()
id = line[id_start:version_start].strip()
version = line[version_start:available_start].strip()
available = line[available_start:source_start].strip()
software = {
"name": name,
"id": id,
"version": version,
"available_version": available
}
upgrade_list.append(software)

update_names = [
f"{software['name']} ({software['id']}): {software['version']} -> {software['available_version']}"
for software in upgrade_list
]

self.winget_update_signal.emit({
"count": len(upgrade_list),
"names": update_names
})

except Exception as e:
logging.error(f"Error in {self.update_type} worker: {e}")
if self.update_type == 'windows':
self.windows_update_signal.emit({"count": 0, "names": []})
else:
self.winget_update_signal.emit({"count": 0, "names": []})

class UpdateCheckWidget(BaseWidget):
validation_schema = VALIDATION_SCHEMA

Expand Down Expand Up @@ -39,6 +116,9 @@ def __init__(self, windows_update: dict[str, str], winget_update: dict[str, str]

self._stop_event = threading.Event()

self.windows_worker = None
self.winget_worker = None

if self._window_update_enabled:
self.start_windows_update_timer()
if self._winget_update_enabled:
Expand All @@ -48,29 +128,19 @@ def __init__(self, windows_update: dict[str, str], winget_update: dict[str, str]


def start_windows_update_timer(self):
thread = threading.Thread(target=self.windows_update_timer_callback)
thread.daemon = True
thread.start()
self.windows_worker = UpdateWorker('windows')
self.windows_worker.windows_update_signal.connect(
lambda x: self.emit_event('windows_update', x)
)
self.windows_worker.start()


def start_winget_update_timer(self):
thread = threading.Thread(target=self.winget_update_timer_callback)
thread.daemon = True
thread.start()


def windows_update_timer_callback(self):
if not self._stop_event.is_set():
update_info = self.get_windows_update()
self.emit_event('windows_update', update_info)
threading.Timer(self._window_update_interval, self.windows_update_timer_callback).start()


def winget_update_timer_callback(self):
if not self._stop_event.is_set():
update_info = self.get_winget_update()
self.emit_event('winget_update', update_info)
threading.Timer(self._winget_update_interval, self.winget_update_timer_callback).start()
self.winget_worker = UpdateWorker('winget')
self.winget_worker.winget_update_signal.connect(
lambda x: self.emit_event('winget_update', x)
)
self.winget_worker.start()


def emit_event(self, event_type, update_info):
Expand Down Expand Up @@ -159,18 +229,14 @@ def _update_label(self, widget_type, data, names):

def reload_widget(self, widget_type, event=None):
self.hide_container(widget_type)
def run_update():
try:
if widget_type == 'windows':
update_info = self.get_windows_update()
self.emit_event('windows_update', update_info)
elif widget_type == 'winget':
update_info = self.get_winget_update()
self.emit_event('winget_update', update_info)
except Exception as e:
logging.error(f"Error updating {widget_type} widget: {e}")
update_thread = threading.Thread(target=run_update)
update_thread.start()
if widget_type == 'windows':
if self.windows_worker:
self.windows_worker.stop()
self.start_windows_update_timer()
elif widget_type == 'winget':
if self.winget_worker:
self.winget_worker.stop()
self.start_winget_update_timer()


def open_console(self, event=None):
Expand Down Expand Up @@ -213,80 +279,83 @@ def update_widget_visibility(self):
else:
self.show()

def get_windows_update(self):
try:
# Create the Windows Update Session
update_session = win32com.client.Dispatch("Microsoft.Update.Session")
update_searcher = update_session.CreateUpdateSearcher()
# Search for updates that are not installed
search_result = update_searcher.Search("IsInstalled=0")
# Check if there are any updates available
if (count := search_result.Updates.Count) > 0:
update_names = [update.Title for update in search_result.Updates if update.Title not in self._windows_update_exclude]
return {"count": count, "names": update_names}
return {"count": 0, "names": []}
except win32com.client.pywintypes.com_error:
logging.error("No internet connection. Unable to check for Windows updates.")
return {"count": 0, "names": []}
except Exception as e:
logging.error(f"Error running windows update: {e}")
return {"count": 0, "names": []}

def get_winget_update(self):
try:
result = subprocess.run(
['winget', 'upgrade'],
capture_output=True,
text=True,
check=True,
shell=True,
creationflags=subprocess.CREATE_NO_WINDOW
)
# Split the output into lines
lines = result.stdout.strip().split('\n')
# Find the line that starts with "Name", it contains the header
fl = 0
while not lines[fl].startswith("Name"):
fl += 1
# Line fl has the header, we can find char positions for Id, Version, Available, and Source
id_start = lines[fl].index("Id")
version_start = lines[fl].index("Version")
available_start = lines[fl].index("Available")
source_start = lines[fl].index("Source")
# Now cycle through the real packages and split accordingly
upgrade_list = []
# def get_windows_update(self):
# try:
# # Create the Windows Update Session
# update_session = win32com.client.Dispatch("Microsoft.Update.Session")
# update_searcher = update_session.CreateUpdateSearcher()
# # Search for updates that are not installed
# search_result = update_searcher.Search("IsInstalled=0")
# # Check if there are any updates available
# if (count := search_result.Updates.Count) > 0:
# update_names = [update.Title for update in search_result.Updates if update.Title not in self._windows_update_exclude]
# return {"count": count, "names": update_names}
# return {"count": 0, "names": []}
# except win32com.client.pywintypes.com_error:
# logging.error("No internet connection. Unable to check for Windows updates.")
# return {"count": 0, "names": []}
# except Exception as e:
# logging.error(f"Error running windows update: {e}")
# return {"count": 0, "names": []}

# def get_winget_update(self):
# try:
# result = subprocess.run(
# ['winget', 'upgrade'],
# capture_output=True,
# text=True,
# check=True,
# shell=True,
# creationflags=subprocess.CREATE_NO_WINDOW
# )
# # Split the output into lines
# lines = result.stdout.strip().split('\n')
# # Find the line that starts with "Name", it contains the header
# fl = 0
# while not lines[fl].startswith("Name"):
# fl += 1
# # Line fl has the header, we can find char positions for Id, Version, Available, and Source
# id_start = lines[fl].index("Id")
# version_start = lines[fl].index("Version")
# available_start = lines[fl].index("Available")
# source_start = lines[fl].index("Source")
# # Now cycle through the real packages and split accordingly
# upgrade_list = []

for line in lines[fl + 1:]:
# Stop processing when reaching the explicit targeting section
if line.startswith("The following packages have an upgrade available"):
break
if len(line) > (available_start + 1) and not line.startswith('-'):
name = line[:id_start].strip()
if name in self._winget_update_exclude:
continue
id = line[id_start:version_start].strip()
version = line[version_start:available_start].strip()
available = line[available_start:source_start].strip()
software = {
"name": name,
"id": id,
"version": version,
"available_version": available
}
upgrade_list.append(software)
# for line in lines[fl + 1:]:
# # Stop processing when reaching the explicit targeting section
# if line.startswith("The following packages have an upgrade available"):
# break
# if len(line) > (available_start + 1) and not line.startswith('-'):
# name = line[:id_start].strip()
# if name in self._winget_update_exclude:
# continue
# id = line[id_start:version_start].strip()
# version = line[version_start:available_start].strip()
# available = line[available_start:source_start].strip()
# software = {
# "name": name,
# "id": id,
# "version": version,
# "available_version": available
# }
# upgrade_list.append(software)

update_names = [f"{software['name']} ({software['id']}): {software['version']} -> {software['available_version']}" for software in upgrade_list]
count = len(upgrade_list)
return {"count": count, "names": update_names}
except OSError:
logging.error("No internet connection. Unable to check for winget updates.")
return {"count": 0, "names": []}
except subprocess.CalledProcessError as e:
logging.error(f"Error running winget upgrade: {e}")
return {"count": 0, "names": []}
except Exception as e:
logging.error(f"Unexpected error: {e}")
return {"count": 0, "names": []}
# update_names = [f"{software['name']} ({software['id']}): {software['version']} -> {software['available_version']}" for software in upgrade_list]
# count = len(upgrade_list)
# return {"count": count, "names": update_names}
# except OSError:
# logging.error("No internet connection. Unable to check for winget updates.")
# return {"count": 0, "names": []}
# except subprocess.CalledProcessError as e:
# logging.error(f"Error running winget upgrade: {e}")
# return {"count": 0, "names": []}
# except Exception as e:
# logging.error(f"Unexpected error: {e}")
# return {"count": 0, "names": []}

def stop_updates(self):
self._stop_event.set()
if self.windows_worker:
self.windows_worker.stop()
if self.winget_worker:
self.winget_worker.stop()

0 comments on commit a21276e

Please sign in to comment.