diff --git a/.gitignore b/.gitignore index b1547bb..3e4661f 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ build/ # Package specific psi*.json psi*.xlsx +psi.log diff --git a/README.md b/README.md index 61f526a..fedcbb5 100644 --- a/README.md +++ b/README.md @@ -214,7 +214,6 @@ Example: Full list of available metrics: -* `observedTotalCumulativeLayoutShift` * `observedCumulativeLayoutShift` * `observedLargestContentfulPaintAllFrames` * `maxPotentialFID` diff --git a/src/pyspeedinsights/api/keys.py b/src/pyspeedinsights/api/keys.py index 292cc29..d0140cf 100644 --- a/src/pyspeedinsights/api/keys.py +++ b/src/pyspeedinsights/api/keys.py @@ -7,9 +7,21 @@ If no key is set in keyring, the user will be manually prompted for their key. """ +import logging + import keyring from keyring.errors import KeyringError +logger = logging.getLogger(__name__) + + +class RetryLimitExceededError(KeyringError): + """Exception if the user has exceeded the retry limit for entering an API key.""" + + +class InputTerminatedError(KeyringError): + """Exception if the user manually terminates retry for entering an API key.""" + def get_api_key() -> str: """Gets the user's PSI API key from their keyring store. @@ -21,29 +33,38 @@ def get_api_key() -> str: Returns: A str representing the PSI API key. Raises: - SystemExit: The user terminated the reprompt. + InputTerminatedError: The user terminated the reprompt. + RetryLimitExceededError: The reprompt limit was exceeded. """ + logger.info("Attempting to retrieve API key from keystore.") try: # get_password() returns None for an empty key psi_api_key = keyring.get_password("system", "psikey") - except KeyringError: - print("There was an error retrieving your API key from the keystore.") + except KeyringError as err: + logger.error( + f"There was an error retrieving your API key from the keystore: {err}", + exc_info=True, + ) psi_api_key = None if psi_api_key is None: - psi_api_key = input("No API key found. Enter it manually:\n") + logger.warning("No API key found. Defaulting to manual input.") + psi_api_key = input("No API key found. Enter your key manually:\n") retry_limit = 5 while not psi_api_key: + logger.warning("Empty API key supplied. Reprompting user for key.") reprompt = input( "Empty API key supplied. Please re-enter your key or Q to quit:\n" ) + # Below errors logged as CRITICAL in main() before exit if reprompt in ("Q", "q"): - raise SystemExit + raise InputTerminatedError("API key input cancelled.") elif retry_limit < 1: - raise SystemExit("Retry limit exceeded.") + raise RetryLimitExceededError("Retry limit for entering API key exceeded.") else: psi_api_key = reprompt retry_limit -= 1 + logger.info("API key retrieval successful.") return psi_api_key diff --git a/src/pyspeedinsights/api/request.py b/src/pyspeedinsights/api/request.py index 43c7347..bf4bf3b 100644 --- a/src/pyspeedinsights/api/request.py +++ b/src/pyspeedinsights/api/request.py @@ -1,19 +1,23 @@ """Async request preparation and processing for PSI API calls.""" import asyncio +import logging +import ssl from collections import Counter from typing import Any, Coroutine, Optional, Union import aiohttp from ..utils.generic import remove_nonetype_dict_items -from ..utils.urls import validate_url -from .keys import get_api_key +from ..utils.urls import InvalidURLError, validate_url +from .keys import KeyringError, get_api_key +logger = logging.getLogger(__name__) next_delay = 1 # Global for applying a 1s delay between requests async def get_response( + key: str, url: str, category: Optional[str] = None, locale: Optional[str] = None, @@ -34,6 +38,7 @@ async def get_response( base_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed" url = validate_url(url) params = { + "key": key, "url": url, "category": category, "locale": locale, @@ -44,15 +49,15 @@ async def get_response( } # Use API defaults instead of passing None values as query params. params = remove_nonetype_dict_items(params) - params["key"] = get_api_key() req_url = params["url"] # Add a 1s delay between calls to avoid 500 errors from server. global next_delay next_delay += 1 + logger.debug("Sleeping request to prevent server errors.") await asyncio.sleep(next_delay) - print(f"Sending request... ({req_url})") + logger.info(f"Sending request... ({req_url})") # Make async call with query params to PSI API and await response. async with aiohttp.ClientSession() as session: async with session.get(url=base_url, params=params) as resp: @@ -62,18 +67,18 @@ async def get_response( try: resp.raise_for_status() json_resp = await resp.json() - print(f"Request successful! ({req_url})") + logger.info(f"Request successful! ({req_url})") except aiohttp.ClientError as err_c: if retry_attempts < 1: - print(err_c) - print(f"Retry limit reached. Skipping ({req_url})") + logger.error(err_c, exc_info=True) + logger.warning( + f"Retry limit for URL reached. Skipping ({req_url})" + ) raise aiohttp.ClientError(err_c) else: retry_attempts -= 1 - print( - "Request failed. Retrying... ", - f"{retry_attempts} retries left ({req_url})", - ) + logger.warning("Request failed. Retrying.") + logger.info(f"{retry_attempts} retries left ({req_url})") await asyncio.sleep(1) return json_resp @@ -91,15 +96,39 @@ def run_requests( async def gather_responses(tasks: list[Coroutine]) -> list[dict]: """Gathers tasks and awaits the return of the responses for processing.""" - print(f"Preparing {len(tasks)} URL(s)...") + logger.info(f"Gathering {len(tasks)} URL(s) and scheduling tasks.") responses = await asyncio.gather(*tasks, return_exceptions=True) + # Generator expression for bubbling up critical exceptions (handled in main()) + # Purposefully explicit here to avoid raising exceptions for aiohttp.ClientError, + # as we don't want a single client failure to invalidate the entire run. + # OSError and ssl errors are all subclassed by aiohttp exceptions. + critical_exception = next( + ( + r + for r in responses + if isinstance( + r, + ( + KeyringError, + InvalidURLError, + OSError, + ssl.SSLError, + ssl.CertificateError, + ), + ) + ), + None, + ) + + if critical_exception is not None: + raise critical_exception + type_counts = Counter(type(r) for r in responses) c_success, c_fail = type_counts[dict], type_counts[aiohttp.ClientError] - print( - f"{c_success}/{len(tasks)} URL(s) processed successfully. ", - f"{c_fail} skipped due to errors.", - ) + logger.info(f"{c_success}/{len(tasks)} URL(s) processed successfully. ") + logger.warning(f"{c_fail} skipped due to errors. Removing failed URL(s).") + # Remove failures for response processing responses = [r for r in responses if type(r) == dict] return responses @@ -109,8 +138,11 @@ def get_tasks( request_urls: list[str], api_args_dict: dict[str, Any] ) -> list[Coroutine]: """Creates a list of tasks that call get_response() with request params.""" + logger.info("Creating list of tasks based on parsed URL(s).") + key = get_api_key() tasks = [] for url in request_urls: api_args_dict["url"] = url + api_args_dict["key"] = key tasks.append(get_response(**api_args_dict)) return tasks diff --git a/src/pyspeedinsights/api/response.py b/src/pyspeedinsights/api/response.py index b083324..a9e843f 100644 --- a/src/pyspeedinsights/api/response.py +++ b/src/pyspeedinsights/api/response.py @@ -2,12 +2,16 @@ import copy import json +import logging from datetime import datetime from typing import Optional, Union from ..cli.choices import COMMAND_CHOICES +from ..utils.exceptions import JSONKeyError from ..utils.generic import sort_dict_alpha +logger = logging.getLogger(__name__) + def process_json(json_resp: dict, category: str, strategy: str) -> None: """Dumps raw json response to a file in the working directory. @@ -20,7 +24,7 @@ def process_json(json_resp: dict, category: str, strategy: str) -> None: with open(filename, "w", encoding="utf-8") as f: json.dump(json_resp, f, ensure_ascii=False, indent=4) - print("JSON processed. Check your current directory.") + logger.info("JSON processed. Check your current working directory.") def process_excel( @@ -31,12 +35,23 @@ def process_excel( Called within main() in pyspeedinsights.app. Metrics results are only included if the category is performance. """ - audits_base = _get_audits_base(json_resp) # Location of audits in json response - metadata = _parse_metadata(json_resp, category) - audit_results = _parse_audits(audits_base) + json_err = "Malformed JSON response. Skipping Excel processing for URL: " + try: + audits_base = _get_audits_base(json_resp) # Location of audits in json response + metadata = _parse_metadata(json_resp, category) + audit_results = _parse_audits(audits_base) + except JSONKeyError as err: + logger.error(f"{json_err}{err}", exc_info=True) + return {} + if metrics is not None and category == "performance": - metrics_results = _parse_metrics(audits_base, metrics) + try: + metrics_results = _parse_metrics(audits_base, metrics) + except JSONKeyError as err: + logger.error(f"{json_err}{err}", exc_info=True) + return {} else: + logger.warning("Skipping metrics (not chosen or non-performance category)") metrics_results = None results: dict[str, Union[dict, None]] = { @@ -44,11 +59,14 @@ def process_excel( "audit_results": audit_results, "metrics_results": metrics_results, } + logger.info("Response data processed for URL.") return results def _parse_metadata(json_resp: dict, category: str) -> dict[str, Union[str, int]]: """Parses various metadata from the JSON response to write to Excel.""" + logger.info("Parsing metadata from JSON response.") + json_base = json_resp["lighthouseResult"] strategy = json_base["configSettings"]["formFactor"] category_score = json_base["categories"][category]["score"] @@ -72,6 +90,8 @@ def _parse_audits(audits_base: dict) -> dict[str, tuple[Union[int, float]]]: A dict of audit results with audit names as keys and tuples of length 2 as values containing the audit scores and numeric values, respectively. """ + logger.info("Parsing audit data from JSON response.") + audit_results = {} # Create results dict with scores and numerical values for each audit. for k in audits_base.keys(): @@ -97,11 +117,13 @@ def _parse_metrics( and int or float metrics as values. """ if "all" in metrics: + logger.info("Parsing all metrics.") metrics_to_use = copy.copy(COMMAND_CHOICES["metrics"]) # Remove 'all' to avoid key errors, as it doesn't exist in JSON resp. if type(metrics_to_use) == list: metrics_to_use.remove("all") else: + logger.info("Parsing chosen metrics.") metrics_to_use = metrics # Create new dict of metrics based on user's chosen metrics. @@ -125,10 +147,19 @@ def _get_timestamp(json_resp: dict) -> str: Returns: A str in format year-month-day_hour.minute.second. """ - timestamp = json_resp["analysisUTCTimestamp"] + logger.info("Parsing timestamp from JSON response.") + time_format = "%Y-%m-%d_%H.%M.%S" + + try: + timestamp = json_resp["analysisUTCTimestamp"] + except JSONKeyError: + logger.warning("Unable to parse timestamp. Falling back to local time.") + date = datetime.now().strftime(time_format) + return date + ts_no_fractions = timestamp.split(".")[0] # Remove fraction if ts_no_fractions[-1] != "Z": ts_no_fractions += "Z" # Add Z back after fraction removal dt_object = datetime.strptime(ts_no_fractions, "%Y-%m-%dT%H:%M:%SZ") - date = dt_object.strftime("%Y-%m-%d_%H.%M.%S") + date = dt_object.strftime(time_format) return date diff --git a/src/pyspeedinsights/app.py b/src/pyspeedinsights/app.py index cfb5a51..415a24c 100644 --- a/src/pyspeedinsights/app.py +++ b/src/pyspeedinsights/app.py @@ -1,9 +1,22 @@ +import logging +import ssl +import sys + +from keyring.errors import KeyringError + from .api.request import run_requests from .api.response import process_excel, process_json from .cli.commands import arg_group_to_dict, create_arg_groups, set_up_arg_parser from .core.excel import ExcelWorkbook -from .core.sitemap import process_sitemap, request_sitemap +from .core.sitemap import ( + SitemapError, + process_sitemap, + request_sitemap, + validate_sitemap_url, +) +from .utils.exceptions import JSONKeyError from .utils.generic import remove_dupes_from_list +from .utils.urls import InvalidURLError def main() -> None: @@ -14,12 +27,23 @@ def main() -> None: Prepares async API calls, awaits and returns their responses. Iterates through each response and writes them to the chosen format. """ + logging.basicConfig( + level=logging.INFO, + style="{", + format="[{asctime}] {levelname:^8s} --- {message} ({filename}:{lineno})", + handlers=(logging.FileHandler("psi.log"), logging.StreamHandler()), + ) + logger = logging.getLogger(__name__) + logger.info("---Starting---") + parser = set_up_arg_parser() args = parser.parse_args() arg_groups = create_arg_groups(parser, args) api_args_dict = arg_group_to_dict(arg_groups, "API Group") proc_args_dict = arg_group_to_dict(arg_groups, "Processing Group") + logger.info("Parsing CLI arguments.") + format = proc_args_dict.get("format") metrics = proc_args_dict.get("metrics") @@ -35,32 +59,79 @@ def main() -> None: excel_output = format in ("excel", "sitemap") if metrics is not None and json_output or category != "performance": - print("Warning: metrics are only configurable for excel performance reports.") - print("JSON performance reports will include all metrics by default.") + logger.warning( + "Metrics are only configurable for excel performance reports. " + "JSON performance reports will include all metrics by default." + ) if format == "sitemap" and url is not None: - sitemap = request_sitemap(url) - request_urls = remove_dupes_from_list(process_sitemap(sitemap)) + try: + sitemap = request_sitemap(url) + request_urls = remove_dupes_from_list(process_sitemap(sitemap)) + # Let these exceptions bubble up from `core/sitemap.py` + except (SitemapError, InvalidURLError) as err: + logger.critical(err, exc_info=True) + sys.exit(1) elif url is not None: - request_urls = [url] # Only request a single page's URL + logger.info("Sitemap format not specified. Only 1 URL to process.") + if validate_sitemap_url(url): + logger.critical( + "Sitemaps can only be processed if sitemap format is specified." + ) + sys.exit(1) + request_urls = [url] - responses = run_requests(request_urls, api_args_dict) + try: + responses = run_requests(request_urls, api_args_dict) + # Let these exceptions bubble up from `api/request.py` + except ( + KeyringError, + InvalidURLError, + OSError, + ssl.SSLError, + ssl.CertificateError, + ) as err: + logger.critical(err, exc_info=True) + sys.exit(1) - for num, response in enumerate(responses): + logger.info("Processing response data.") + + resp_num = 1 # use instead of enumerate for more control over skipping failures + for response in responses: if json_output: + logger.info("JSON format selected. Processing JSON.") process_json(response, category, strategy) elif excel_output: excel_results = process_excel(response, category, metrics) - final_url = response["lighthouseResult"]["finalUrl"] + try: + final_url = response["lighthouseResult"]["finalUrl"] + except JSONKeyError as err: + # For now, log this as critical and exit. + # A better solution would be to preserve the original request URL + # as a fallback. Temporarily, this is more helpful than just KeyError. + logger.critical( + f"The response data contains no URL: {err}", exc_info=True + ) + sys.exit(1) + + if not excel_results: + # Empty results from process_excel() means skip due to processing issue. + # Don't increment resp_num here in case error occurs on first response + # (would result in the workbook not existing for subsequent responses). + logger.warning( + f"Skipping Excel processing for {final_url} due to malformed JSON." + ) + continue + metadata = excel_results.get("metadata") audit_results = excel_results.get("audit_results") metrics_results = excel_results.get("metrics_results") - first_resp = num == 0 + first_resp = resp_num == 1 if metadata is not None and audit_results is not None: if first_resp: - print("Creating Excel workbook...") + logger.info("Excel format selected. Creating Excel workbook.") workbook = ExcelWorkbook( final_url, metadata, audit_results, metrics_results ) @@ -71,10 +142,14 @@ def main() -> None: workbook.metadata = metadata workbook.audit_results = audit_results workbook.metrics_results = metrics_results + logger.info("Updating workbook to process next URL.") workbook.write_to_worksheet(first_resp) + resp_num += 1 else: - raise ValueError("Invalid format specified. Please try again.") + f_err = "Invalid report format specified. Please try again." + logger.critical(f_err) + sys.exit(1) if excel_output: workbook.finalize_and_save() diff --git a/src/pyspeedinsights/cli/choices.py b/src/pyspeedinsights/cli/choices.py index 9ae0d5c..99205e1 100644 --- a/src/pyspeedinsights/cli/choices.py +++ b/src/pyspeedinsights/cli/choices.py @@ -55,7 +55,6 @@ # This must be a list instead of tuple so "all" can be removed from it "metrics": [ "all", - "observedTotalCumulativeLayoutShift", "observedCumulativeLayoutShift", "observedLargestContentfulPaintAllFrames", "maxPotentialFID", diff --git a/src/pyspeedinsights/cli/commands.py b/src/pyspeedinsights/cli/commands.py index deec66e..970fa58 100644 --- a/src/pyspeedinsights/cli/commands.py +++ b/src/pyspeedinsights/cli/commands.py @@ -7,12 +7,14 @@ arg_group_dict = arg_group_to_dict(arg_groups, "Group Name") """ +import logging from argparse import ArgumentParser, Namespace from typing import Any, TypeAlias, Union from .choices import COMMAND_CHOICES ArgGroups: TypeAlias = dict[str, Namespace] +logger = logging.getLogger(__name__) def set_up_arg_parser() -> ArgumentParser: @@ -23,6 +25,7 @@ def set_up_arg_parser() -> ArgumentParser: Returns: An argparse.ArgumentParser instance with 2 argument groups. """ + logger.info("Setting up CLI arguments.") parser = ArgumentParser(prog="pyspeedinsights") # Add argument options for default API call query params. @@ -122,6 +125,7 @@ def create_arg_groups(parser: ArgumentParser, args: Namespace) -> ArgGroups: Returns: A dict with group names as keys and argparse.Namespace instances as values. """ + logger.info("Creating CLI argument groups.") arg_groups = {} for group in parser._action_groups: group_dict = {a.dest: getattr(args, a.dest, None) for a in group._group_actions} diff --git a/src/pyspeedinsights/core/excel.py b/src/pyspeedinsights/core/excel.py index 51506a4..4117663 100644 --- a/src/pyspeedinsights/core/excel.py +++ b/src/pyspeedinsights/core/excel.py @@ -1,5 +1,6 @@ """Excel workbook operations for writing PSI API results to Excel.""" +import logging from dataclasses import dataclass, field from typing import Any, TypeAlias, Union, cast @@ -9,6 +10,8 @@ AuditResults: TypeAlias = dict[str, tuple[Union[int, float]]] MetricsResults: TypeAlias = Union[dict[str, Union[int, float]], None] +logger = logging.getLogger(__name__) + @dataclass class ExcelWorkbook: @@ -27,12 +30,15 @@ def set_up_worksheet(self) -> None: """Creates the workbook, adds a worksheet, and sets up column headings.""" self._create_workbook() self.worksheet = self.workbook.add_worksheet() + logger.info("Excel worksheet created in workbook.") + self.cur_cell = [0, 0] row, col = self.cur_cell # First cell column_format = self._column_format() metadata_format = self._metadata_format() # Add metadata to the first cell of the Excel sheet. + logger.info("Writing metadata to worksheet.") category = self.metadata["category"].upper() strategy = self.metadata["strategy"].upper() metadata_value = f"{strategy} {category} REPORT" @@ -54,22 +60,24 @@ def set_up_worksheet(self) -> None: def write_to_worksheet(self, first_resp: bool) -> None: """Writes the URL, OVR page score, audit and metrics results to the sheet.""" - if self.worksheet is not None: - self._write_page_url() - self._write_overall_category_score() - self._write_audit_results(self.audit_results, first_resp) - self._write_metrics_results(self.metrics_results, first_resp) - - self.cur_cell[0] += 1 # Move down 1 row for next page's results - self.cur_cell[1] = 5 # Reset to first results column - else: - raise ValueError("The worksheet is not set up.") + if self.worksheet is None: + # Fallback if worksheet doesn't exist for some reason + logger.warning("Worksheet not found. Creating.") + self.set_up_worksheet() + + self._write_page_url() + self._write_overall_category_score() + self._write_audit_results(self.audit_results, first_resp) + self._write_metrics_results(self.metrics_results, first_resp) + + self.cur_cell[0] += 1 # Move down 1 row for next page's results + self.cur_cell[1] = 5 # Reset to first results column def finalize_and_save(self) -> None: - """Writes the average score to the workbook and saves/closes it.""" + """Writes the average score to the worksheet and saves/closes it.""" self._write_average_score() self.workbook.close() - print("Workbook saved. Check your current directory!") + logger.info("Workbook saved. Check your current directory!") def _create_workbook(self) -> None: """Creates an Excel workbook with a unique and descriptive name.""" @@ -77,9 +85,11 @@ def _create_workbook(self) -> None: category = self.metadata["category"] date = self.metadata["timestamp"] self.workbook = Workbook(f"psi-s-{strategy}-c-{category}-{date}.xlsx") + logger.info("Excel workbook created.") def _write_page_url(self) -> None: """Writes the requested page URL being analyzed to the sheet.""" + logger.info("Writing page URL to worksheet.") url_format = self._url_format() row = self.cur_cell[0] + 2 self.worksheet.merge_range( @@ -88,6 +98,7 @@ def _write_page_url(self) -> None: def _write_overall_category_score(self) -> None: """Writes the OVR category score for the page to the sheet.""" + logger.info("Writing overall category score to worksheet.") category_score = self.metadata["category_score"] * 100 cat_score_format = self._score_format(category_score) @@ -100,6 +111,7 @@ def _write_overall_category_score(self) -> None: def _write_average_score(self) -> None: """Writes the average score next to the worksheet metadata.""" + logger.info("Writing average score to worksheet.") scores = self.category_scores avg_score = sum(scores) / len(scores) avg_score = round(avg_score, 2) @@ -113,12 +125,14 @@ def _write_audit_results( column_format = self._column_format() row, col = self.cur_cell + logger.info("Writing audit results to worksheet.") for title, scores in audit_results.items(): self.worksheet.set_column(col, col + 1, 15) # cast() is a mypy workaround for issue #1178 score, value = cast(tuple[Any, Any], scores) score_format = self._score_format(score) if first_resp: + logger.debug("Writing audit result headings to worksheet.") self._write_results_headings( row, col, title, column_format, result_type="audit" ) @@ -137,9 +151,11 @@ def _write_metrics_results( row, col = self.cur_cell if metrics_results is not None: + logger.info("Writing metrics results to worksheet.") for title, score in metrics_results.items(): self.worksheet.set_column(col, col, 30) if first_resp: + logger.debug("Writing metrics result headings to worksheet.") self._write_results_headings( row, col, title, column_format, result_type="metrics" ) diff --git a/src/pyspeedinsights/core/sitemap.py b/src/pyspeedinsights/core/sitemap.py index 1ec1299..f861e60 100644 --- a/src/pyspeedinsights/core/sitemap.py +++ b/src/pyspeedinsights/core/sitemap.py @@ -3,6 +3,7 @@ Includes support for recursive parsing of multiple sitemaps via a sitemap index. """ +import logging from os.path import splitext from typing import Any, Optional, TypeAlias from urllib.parse import urlsplit @@ -13,6 +14,19 @@ from ..utils.urls import validate_url XMLElement: TypeAlias = Any +logger = logging.getLogger(__name__) + + +class SitemapError(Exception): + """A base class for Sitemap exceptions.""" + + +class SitemapRetrievalError(SitemapError): + """A class for Sitemap retrieval exceptions.""" + + +class SitemapParseError(SitemapError): + """A class for Sitemap parsing exceptions.""" def request_sitemap(url: str) -> str: @@ -24,7 +38,7 @@ def request_sitemap(url: str) -> str: Returns: A str with XML sitemap text content. Raises: - SystemExit: The sitemap URL is invalid or a request failed. + SitemapRetrievalError: The sitemap URL is invalid or a request failed. """ url = validate_url(url) # Set a dummy user agent to avoid bot detection by firewalls @@ -36,26 +50,28 @@ def request_sitemap(url: str) -> str: ) headers = {"user-agent": dummy_user_agent} + # Below errors logged as CRITICAL in main() before exit + logger.info("Checking if sitemap URL is a valid sitemap URL.") if not validate_sitemap_url(url): err = ( "Invalid sitemap URL provided. Please provide a URL to a valid XML sitemap." ) - raise SystemExit(err) + raise SitemapRetrievalError(err) try: - print(f"Requesting sitemap... ({url})") + logger.info(f"Requesting sitemap ({url})") resp = requests.get(url, headers=headers, timeout=(3.05, 5)) resp.raise_for_status() except requests.exceptions.HTTPError as errh: - raise SystemExit(f"HTTP Error: {errh}") + raise SitemapRetrievalError(f"HTTP Error: {errh}") except requests.exceptions.ConnectionError as errc: - raise SystemExit(f"Connection Error: {errc}") + raise SitemapRetrievalError(f"Connection Error: {errc}") except requests.exceptions.Timeout as errt: - raise SystemExit(f"Timeout Error: {errt}") + raise SitemapRetrievalError(f"Timeout Error: {errt}") except requests.exceptions.RequestException as err: - raise SystemExit(f"Request Error: {err}") + raise SitemapRetrievalError(f"Request Error: {err}") sitemap = resp.text - print("Sitemap retrieval successful!") + logger.info("Sitemap retrieval successful.") return sitemap @@ -68,11 +84,13 @@ def validate_sitemap_url(url: str) -> bool: def get_sitemap_root(sitemap: str) -> XMLElement: """Gets the root element of the sitemap.""" + logger.info("Locating sitemap root.") return ET.fromstring(sitemap) def get_sitemap_type(root: XMLElement) -> str: """Gets the tag value of the root element to determine sitemap type.""" + logger.info("Getting sitemap type.") return root.tag.split("}")[-1] @@ -84,19 +102,21 @@ def process_sitemap(sitemap: str) -> list[Optional[str]]: Returns: A full list of request URLs for use in requests. Raises: - SystemExit: The sitemap type couldn't be parsed from the root element. + SitemapParseError: The sitemap type couldn't be parsed from the root element. The sitemap type parsed from the root element is invalid. No URLs were parsed from the sitemap(s) successfully. """ err = "Sitemap format invalid." + logger.info("Processing sitemap.") try: root = get_sitemap_root(sitemap) sitemap_type = get_sitemap_type(root) except ET.ParseError: - raise SystemExit(err) + raise SitemapParseError(err) # Logged in main() if sitemap_type == "sitemapindex": + logger.info("Sitemap index detected. Recursively parsing children.") request_urls = [] sitemap_urls = _parse_sitemap_index(root) for sm_url in sitemap_urls: @@ -104,24 +124,25 @@ def process_sitemap(sitemap: str) -> list[Optional[str]]: sitemap = request_sitemap(sm_url) request_urls.extend(process_sitemap(sitemap)) elif sitemap_type == "urlset": + logger.info("Standard sitemap detected.") request_urls = _parse_sitemap_urls(root) else: - raise SystemExit(err) + raise SitemapParseError(err) # Logged in main() if not request_urls: - raise SystemExit("No URLs found in the sitemap(s).") + raise SitemapParseError("No URLs found in the sitemap(s).") # Logged in main() return request_urls def _parse_sitemap_index(root: XMLElement) -> list[Optional[str]]: """Parse sitemap URLs from the sitemap index and return them as a list.""" - print("Sitemap index found. Parsing sitemap URLs...") + logger.info("Parsing child sitemap URLs from index.") return _parse_urls_from_root(root, type="sitemap") def _parse_sitemap_urls(root: XMLElement) -> list[Optional[str]]: """Parse URLs from the XML sitemap and return a list of request URLs.""" - print("Parsing URLs from sitemap...") + logger.info("Parsing URLs from sitemap.") return _parse_urls_from_root(root) diff --git a/src/pyspeedinsights/utils/exceptions.py b/src/pyspeedinsights/utils/exceptions.py new file mode 100644 index 0000000..205f972 --- /dev/null +++ b/src/pyspeedinsights/utils/exceptions.py @@ -0,0 +1,6 @@ +"""Custom exceptions used across multiple modules. +""" + + +class JSONKeyError(KeyError): + """A custom exception for KeyErrors thrown while accessing JSON response dicts.""" diff --git a/src/pyspeedinsights/utils/urls.py b/src/pyspeedinsights/utils/urls.py index 93c5d07..5c3c254 100644 --- a/src/pyspeedinsights/utils/urls.py +++ b/src/pyspeedinsights/utils/urls.py @@ -1,7 +1,14 @@ """Utilities for URL processing.""" +import logging from urllib.parse import urlsplit +logger = logging.getLogger(__name__) + + +class InvalidURLError(Exception): + """A base exception for an invalid URL format.""" + def validate_url(url: str) -> str: """Checks that a URL is valid and fully-qualified. @@ -14,10 +21,12 @@ def validate_url(url: str) -> str: Returns: The sanitized URL as a str. Raises: - SystemExit: The user entered a URL without a path that has no domain + InvalidURLError: The user entered a URL without a path that has no domain or a URL whose path precedes the hostname (e.g. path/example.com). """ + logger.info("Checking if request URL is a valid URL.") err = "Invalid URL. Please enter a valid fully-qualified URL." + replacements = {} u = urlsplit(url) @@ -28,10 +37,13 @@ def validate_url(url: str) -> str: path_before_host = p_sep < dot and p_sep > 0 no_host = "." not in u.path if path_before_host or no_host: - raise SystemExit(err) + raise InvalidURLError(err) # Logged as CRITICAL in main() else: replacements["netloc"] = u.path replacements["path"] = "" + elif "." not in u.netloc: + # URLs with schemes but no TLD + raise InvalidURLError(err) # Logged as CRITICAL in main() replacements["fragment"] = "" replacements["query"] = "" diff --git a/tests/api/conftest.py b/tests/api/conftest.py index bbeb45e..646c288 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -45,7 +45,6 @@ def all_metrics(): "firstMeaningfulPaint": 235, "observedNavigationStart": 0, "firstContentfulPaint": 235, - "observedTotalCumulativeLayoutShift": 0, "observedSpeedIndex": 111, "observedNavigationStartTs": 6367780284457, "observedLargestContentfulPaint": 115, diff --git a/tests/api/test_keys.py b/tests/api/test_keys.py index 7368a01..9aafff6 100644 --- a/tests/api/test_keys.py +++ b/tests/api/test_keys.py @@ -1,4 +1,3 @@ -import keyring import pytest from keyring.errors import KeyringError @@ -9,8 +8,8 @@ class TestKeyRetrieval: def throw_keyringerror(self): raise KeyringError - def throws_systemexit(self): - with pytest.raises(SystemExit): + def throws_keyringerror(self): + with pytest.raises(KeyringError): get_api_key() def test_key_found(self, patch_keyring): @@ -18,16 +17,6 @@ def test_key_found(self, patch_keyring): patch_keyring(mock_pw) assert get_api_key() == mock_pw - def test_key_not_found_exception(self, monkeypatch, patch_input, capsys): - monkeypatch.setattr( - keyring, "get_password", lambda *args, **kwargs: self.throw_keyringerror() - ) - patch_input(mock_input="secret") # Placeholder to avoid stdin errors - - get_api_key() - out = capsys.readouterr().out # Printed when KeyringError is caught - assert out == "There was an error retrieving your API key from the keystore.\n" - def test_key_not_found_fallback_success(self, patch_keyring, patch_input): patch_keyring(mock_pw=None) mock_input = "secret" @@ -47,9 +36,9 @@ def test_key_not_found_fallback_retry_quit(self, patch_iter_input, patch_keyring patch_keyring(mock_pw=None) inputs = iter(["", "Q"]) # Retry then quit patch_iter_input(inputs) - self.throws_systemexit() + self.throws_keyringerror() def test_key_not_found_fallback_retry_limit_hit(self, patch_input, patch_keyring): patch_keyring(mock_pw=None) patch_input(mock_input="") # Empty input will eventually hit limit - self.throws_systemexit() + self.throws_keyringerror() diff --git a/tests/excel/sample_data.py b/tests/excel/sample_data.py index 6739e17..feb8f9b 100644 --- a/tests/excel/sample_data.py +++ b/tests/excel/sample_data.py @@ -42,7 +42,6 @@ "observedSpeedIndexTs": 349111765971, "observedTimeOrigin": 0, "observedTimeOriginTs": 349111281936, - "observedTotalCumulativeLayoutShift": 0, "observedTraceEnd": 3392, "observedTraceEndTs": 349114673907, "speedIndex": 2610, diff --git a/tests/sitemap/test_processing.py b/tests/sitemap/test_processing.py index d1171e6..fdd43a8 100644 --- a/tests/sitemap/test_processing.py +++ b/tests/sitemap/test_processing.py @@ -1,6 +1,7 @@ import pytest from pyspeedinsights.core.sitemap import ( + SitemapParseError, _parse_sitemap_index, _parse_sitemap_urls, _parse_urls_from_root, @@ -31,8 +32,8 @@ def test_sitemap_index_wrong_type_url_not_found(self, sitemap_index_root): class TestProcessSitemap: """Tests processing of URLs from sitemap.""" - def sitemap_raises_system_exit(self, sitemap): - with pytest.raises(SystemExit): + def sitemap_raises_parse_error(self, sitemap): + with pytest.raises(SitemapParseError): process_sitemap(sitemap) def test_regular_sitemap_processed(self, sitemap, sitemap_url): @@ -54,10 +55,10 @@ def test_sitemap_index_processed( assert len(sitemap_urls) == num_sitemaps * num_urls def test_invalid_sitemap_exits(self, sitemap_invalid): - self.sitemap_raises_system_exit(sitemap_invalid) + self.sitemap_raises_parse_error(sitemap_invalid) def test_invalid_sitemap_tag_exits(self, sitemap_invalid_tag): - self.sitemap_raises_system_exit(sitemap_invalid_tag) + self.sitemap_raises_parse_error(sitemap_invalid_tag) def test_sitemap_no_urls_exits(self, sitemap_no_urls): - self.sitemap_raises_system_exit(sitemap_no_urls) + self.sitemap_raises_parse_error(sitemap_no_urls) diff --git a/tests/sitemap/test_requests.py b/tests/sitemap/test_requests.py index 5c09420..5fdc1ef 100644 --- a/tests/sitemap/test_requests.py +++ b/tests/sitemap/test_requests.py @@ -1,14 +1,14 @@ import pytest from requests.exceptions import ConnectionError, HTTPError, RequestException, Timeout -from pyspeedinsights.core.sitemap import request_sitemap +from pyspeedinsights.core.sitemap import SitemapRetrievalError, request_sitemap class TestRequestSitemap: """Tests requesting sitemap text from a sitemap URL.""" - def raises_system_exit(self, url): - with pytest.raises(SystemExit): + def raises_retrieval_error(self, url): + with pytest.raises(SitemapRetrievalError): request_sitemap(url) def test_200_no_exception_returns_sitemap( @@ -19,24 +19,24 @@ def test_200_no_exception_returns_sitemap( def test_200_no_exception_invalid_url_exits(self, patch_response, invalid_url): patch_response(status_code=200, exception=None) - self.raises_system_exit(invalid_url) + self.raises_retrieval_error(invalid_url) def test_404_client_error_exits(self, patch_response, request_url): patch_response(status_code=404, exception=HTTPError) - self.raises_system_exit(request_url) + self.raises_retrieval_error(request_url) def test_500_server_error_exits(self, patch_response, request_url): patch_response(status_code=500, exception=HTTPError) - self.raises_system_exit(request_url) + self.raises_retrieval_error(request_url) def test_connection_error_exits(self, patch_response, request_url): patch_response(status_code=None, exception=ConnectionError) - self.raises_system_exit(request_url) + self.raises_retrieval_error(request_url) def test_timeout_exits(self, patch_response, request_url): patch_response(status_code=None, exception=Timeout) - self.raises_system_exit(request_url) + self.raises_retrieval_error(request_url) def test_request_exception_exits(self, patch_response, request_url): patch_response(status_code=None, exception=RequestException) - self.raises_system_exit(request_url) + self.raises_retrieval_error(request_url) diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index 38dab20..0d048a8 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -5,24 +5,27 @@ remove_nonetype_dict_items, sort_dict_alpha, ) -from pyspeedinsights.utils.urls import validate_url +from pyspeedinsights.utils.urls import InvalidURLError, validate_url class TestValidateUrl: """Tests validation of URLs.""" - def raises_system_exit(self, url): - with pytest.raises(SystemExit): + def raises_invalid_url(self, url): + with pytest.raises(InvalidURLError): validate_url(url) def test_url_without_tld_exits(self): - self.raises_system_exit("badurl") + self.raises_invalid_url("badurl") + + def test_url_without_tld_with_scheme_exits(self): + self.raises_invalid_url("https://badurl") def test_url_without_tld_with_path_exits(self): - self.raises_system_exit("badurl/path") + self.raises_invalid_url("badurl/path") def test_url_with_path_and_extension_without_tld_exits(self): - self.raises_system_exit("badurl/path.html") + self.raises_invalid_url("badurl/path.html") def test_valid_urls_are_not_modified(self): urls = [