Skip to content

Commit

Permalink
add exception handling for dynamic xlsx ursl scraping
Browse files Browse the repository at this point in the history
  • Loading branch information
charlie-costanzo committed Dec 19, 2024
1 parent f2e8bb5 commit cc236da
Showing 1 changed file with 56 additions and 14 deletions.
70 changes: 56 additions & 14 deletions airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import requests
from bs4 import BeautifulSoup
from pydantic import HttpUrl, parse_obj_as
from pydantic import HttpUrl, ValidationError, parse_obj_as

from airflow.exceptions import AirflowException

xlsx_urls = {
"ridership_url": "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release",
Expand All @@ -18,8 +20,14 @@

# pushes the scraped URL value to XCom
def push_url_to_xcom(key, scraped_url, context):
task_instance = context["ti"]
task_instance.xcom_push(key=key, value=scraped_url)
try:
task_instance = context.get("ti")
if task_instance is None:
raise AirflowException("Task instance not found in context")
task_instance.xcom_push(key=key, value=scraped_url)
except Exception as e:
logging.error(f"Error pushing URL to XCom for key {key}: {e}")
raise AirflowException(f"Failed to push URL to XCom: {e}")


# Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'
Expand All @@ -30,20 +38,54 @@ def href_matcher(href):


def scrape_ntd_xlsx_urls(**context):
for key, value in xlsx_urls.items():
url = value
req = requests.get(url)
soup = BeautifulSoup(req.text, "html.parser")
for key, url in xlsx_urls.items():
try:
# Make HTTP request with proper error handling
try:
response = requests.get(url)
response.raise_for_status() # Raises HTTPError for bad responses
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP error occurred while fetching {url}: {e}")
raise AirflowException(f"HTTP error for {key}: {e}")
except requests.exceptions.RequestException as e:
logging.error(f"Error occurred while fetching {url}: {e}")
raise AirflowException(f"Request failed for {key}: {e}")

# Parse HTML with error handling
try:
soup = BeautifulSoup(response.text, "html.parser")
except Exception as e:
logging.error(f"Error parsing HTML for {url}: {e}")
raise AirflowException(f"HTML parsing failed for {key}: {e}")

link = soup.find("a", href=href_matcher)
# Find link with error handling
link = soup.find("a", href=href_matcher)
if not link:
error_msg = f"No XLSX download link found for {key} at {url}"
logging.error(error_msg)
raise AirflowException(error_msg)

# Extract the href if the link is found
file_link = link["href"] if link else None
# Extract href with error handling
file_link = link.get("href")
if not file_link:
error_msg = f"Found link for {key} but href attribute is missing"
logging.error(error_msg)
raise AirflowException(error_msg)

updated_url = f"https://www.transit.dot.gov{file_link}"
# Construct and validate URL
updated_url = f"https://www.transit.dot.gov{file_link}"
try:
validated_url = parse_obj_as(HttpUrl, updated_url)
except ValidationError as e:
logging.error(f"URL validation failed for {updated_url}: {e}")
raise AirflowException(f"Invalid URL constructed for {key}: {e}")

validated_url = parse_obj_as(HttpUrl, updated_url)
logging.info(f"Successfully validated URL for {key}: {validated_url}")

logging.info(f"Validated URL: {validated_url}.")
# Push to XCom
push_url_to_xcom(key=key, scraped_url=validated_url, context=context)

push_url_to_xcom(key=key, scraped_url=validated_url, context=context)
except Exception as e:
# Log any unhandled exceptions and re-raise as AirflowException
logging.error(f"Unexpected error processing {key}: {e}")
raise AirflowException(f"Failed to process {key}: {e}")

0 comments on commit cc236da

Please sign in to comment.