diff --git a/airflow/dags/ingestion/ask-astro-forum-load.py b/airflow/dags/ingestion/ask-astro-forum-load.py index 0d0496c7..7e4d4bd1 100644 --- a/airflow/dags/ingestion/ask-astro-forum-load.py +++ b/airflow/dags/ingestion/ask-astro-forum-load.py @@ -44,7 +44,7 @@ def ask_astro_load_astro_forum(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py index 391176c8..bb3ec557 100644 --- a/airflow/dags/ingestion/ask-astro-load-airflow-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-airflow-docs.py @@ -1,7 +1,6 @@ import os from datetime import datetime -import pandas as pd from include.utils.slack import send_failure_notification from airflow.decorators import dag, task @@ -20,21 +19,6 @@ schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None -@task -def split_docs(urls: str, chunk_size: int = 100) -> list[list[pd.DataFrame]]: - """ - Split the URLs in chunk and get dataframe for the content - - param urls: List for HTTP URL - param chunk_size: Max number of document in split chunk - """ - from include.tasks import split - from include.tasks.extract.utils.html_utils import urls_to_dataframe - - chunked_urls = split.split_list(list(urls), chunk_size=chunk_size) - return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls] - - @dag( schedule_interval=schedule_interval, start_date=datetime(2023, 9, 27), @@ -51,19 +35,22 @@ def ask_astro_load_airflow_docs(): data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator any existing documents that have been updated will be removed and re-added. """ + from include.tasks import split from include.tasks.extract import airflow_docs - extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url) + extracted_airflow_docs = task(split.split_html).expand( + dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)] + ) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100)) + ).expand(input_data=[extracted_airflow_docs]) ask_astro_load_airflow_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-astro-cli.py b/airflow/dags/ingestion/ask-astro-load-astro-cli.py index 80658d9f..20fd2da1 100644 --- a/airflow/dags/ingestion/ask-astro-load-astro-cli.py +++ b/airflow/dags/ingestion/ask-astro-load-astro-cli.py @@ -42,7 +42,7 @@ def ask_astro_load_astro_cli_docs(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-astro-sdk.py b/airflow/dags/ingestion/ask-astro-load-astro-sdk.py index 9b4e1b6d..11d2f7d9 100644 --- a/airflow/dags/ingestion/ask-astro-load-astro-sdk.py +++ b/airflow/dags/ingestion/ask-astro-load-astro-sdk.py @@ -41,7 +41,7 @@ def ask_astro_load_astro_sdk(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py index 175b5a6f..bf6b26cd 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-docs.py @@ -36,17 +36,17 @@ def ask_astro_load_astronomer_docs(): astro_docs = task(extract_astro_docs)() - split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs]) + split_html_docs = task(split.split_html).expand(dfs=[astro_docs]) _import_data = WeaviateDocumentIngestOperator.partial( class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", - ).expand(input_data=[split_md_docs]) + ).expand(input_data=[split_html_docs]) ask_astro_load_astronomer_docs() diff --git a/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py b/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py index 729ce291..f5d5c65d 100644 --- a/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py +++ b/airflow/dags/ingestion/ask-astro-load-astronomer-provider.py @@ -47,7 +47,7 @@ def ask_astro_load_astronomer_providers(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-blogs.py b/airflow/dags/ingestion/ask-astro-load-blogs.py index 91a81d58..84f78019 100644 --- a/airflow/dags/ingestion/ask-astro-load-blogs.py +++ b/airflow/dags/ingestion/ask-astro-load-blogs.py @@ -45,7 +45,7 @@ def ask_astro_load_blogs(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-github.py b/airflow/dags/ingestion/ask-astro-load-github.py index 6aa1a48c..e68055e2 100644 --- a/airflow/dags/ingestion/ask-astro-load-github.py +++ b/airflow/dags/ingestion/ask-astro-load-github.py @@ -64,7 +64,7 @@ def ask_astro_load_github(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-registry.py b/airflow/dags/ingestion/ask-astro-load-registry.py index 33282ab4..480a70cb 100644 --- a/airflow/dags/ingestion/ask-astro-load-registry.py +++ b/airflow/dags/ingestion/ask-astro-load-registry.py @@ -47,7 +47,7 @@ def ask_astro_load_registry(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-slack.py b/airflow/dags/ingestion/ask-astro-load-slack.py index 4435da4d..8fbf1b44 100644 --- a/airflow/dags/ingestion/ask-astro-load-slack.py +++ b/airflow/dags/ingestion/ask-astro-load-slack.py @@ -53,7 +53,7 @@ def ask_astro_load_slack(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py index 12f553a1..6135f82d 100644 --- a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py +++ b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py @@ -53,7 +53,7 @@ def ask_astro_load_stackoverflow(): class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 707ca65e..a71fd090 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -206,7 +206,7 @@ def extract_airflow_docs(): else: raise Exception("Parquet file exists locally but is not readable.") else: - df = airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)[0] + df = airflow_docs.extract_airflow_docs.function(docs_base_url=airflow_docs_base_url)[0] df.to_parquet(parquet_file) return [df] @@ -442,7 +442,7 @@ def import_baseline( class_name=WEAVIATE_CLASS, existing="replace", document_column="docLink", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, conn_id=_WEAVIATE_CONN_ID, task_id="WeaviateDocumentIngestOperator", @@ -455,7 +455,7 @@ def import_baseline( document_column="docLink", uuid_column="id", vector_column="vector", - batch_config_params={"batch_size": 1000}, + batch_config_params={"batch_size": 7, "dynamic": False}, verbose=True, ) diff --git a/airflow/include/tasks/extract/airflow_docs.py b/airflow/include/tasks/extract/airflow_docs.py index fb9de592..7869745e 100644 --- a/airflow/include/tasks/extract/airflow_docs.py +++ b/airflow/include/tasks/extract/airflow_docs.py @@ -8,9 +8,11 @@ from bs4 import BeautifulSoup from weaviate.util import generate_uuid5 +from airflow.decorators import task from include.tasks.extract.utils.html_utils import get_internal_links +@task def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: """ This task return all internal url for Airflow docs @@ -36,7 +38,10 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]: docs_url_parts = urllib.parse.urlsplit(docs_base_url) docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}" # make sure we didn't accidentally pickup any unrelated links in recursion - non_doc_links = {link if docs_url_base not in link else "" for link in all_links} + old_version_doc_pattern = r"/(\d+\.)*\d+/" + non_doc_links = { + link if (docs_url_base not in link) or re.search(old_version_doc_pattern, link) else "" for link in all_links + } docs_links = all_links - non_doc_links df = pd.DataFrame(docs_links, columns=["docLink"]) diff --git a/airflow/include/tasks/extract/astro_docs.py b/airflow/include/tasks/extract/astro_docs.py index 5cec7a54..e984d753 100644 --- a/airflow/include/tasks/extract/astro_docs.py +++ b/airflow/include/tasks/extract/astro_docs.py @@ -1,83 +1,58 @@ from __future__ import annotations -import logging -from urllib.parse import urldefrag, urljoin +import re import pandas as pd -import requests from bs4 import BeautifulSoup from weaviate.util import generate_uuid5 +from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links + base_url = "https://docs.astronomer.io/" -def fetch_page_content(url: str) -> str: - """ - Fetches the content of a given URL. +def process_astro_doc_page_content(page_content: str) -> str: + soup = BeautifulSoup(page_content, "html.parser") - :param url: URL of the page to fetch. - :return: HTML content of the page. - """ - try: - response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}) - if response.status_code == 200: - return response.content - except requests.RequestException as e: - logging.error(f"Error fetching {url}: {e}") - return "" + # Find the main article container + main_container = soup.find("main", class_="docMainContainer_TBSr") + content_of_interest = main_container if main_container else soup + for nav_tag in content_of_interest.find_all("nav"): + nav_tag.decompose() -def extract_links(soup: BeautifulSoup, base_url: str) -> list[str]: - """ - Extracts all valid links from a BeautifulSoup object. + for script_or_style in content_of_interest.find_all(["script", "style", "button", "img", "svg"]): + script_or_style.decompose() - :param soup: BeautifulSoup object to extract links from. - :param base_url: Base URL for relative links. - :return: List of extracted URLs. - """ - links = [] - for link in soup.find_all("a", href=True): - href = link["href"] - if not href.startswith("http"): - href = urljoin(base_url, href) - if href.startswith(base_url): - links.append(href) - return links + feedback_widget = content_of_interest.find("div", id="feedbackWidget") + if feedback_widget: + feedback_widget.decompose() + newsletter_form = content_of_interest.find("form", id="newsletterForm") + if newsletter_form: + newsletter_form.decompose() -def scrape_page(url: str, visited_urls: set, docs_data: list) -> None: - """ - Recursively scrapes a webpage and its subpages. + sidebar = content_of_interest.find("ul", class_=lambda value: value and "table-of-contents" in value) + if sidebar: + sidebar.decompose() - :param url: URL of the page to scrape. - :param visited_urls: Set of URLs already visited. - :param docs_data: List to append extracted data to. - """ - if url in visited_urls or not url.startswith(base_url): - return - - # Normalize URL by stripping off the fragment - base_url_no_fragment, frag = urldefrag(url) + footers = content_of_interest.find_all("footer") + for footer in footers: + footer.decompose() - # If the URL is the base URL plus a fragment, ignore it - if base_url_no_fragment == base_url and frag: - return + # The actual article in almost all pages of Astro Docs website is in the following HTML container + container_div = content_of_interest.find("div", class_=lambda value: value and "container" in value) - visited_urls.add(url) + if container_div: + row_div = container_div.find("div", class_="row") - logging.info(f"Scraping : {url}") + if row_div: + col_div = row_div.find("div", class_=lambda value: value and "col" in value) - page_content = fetch_page_content(url) - if not page_content: - return + if col_div: + content_of_interest = str(col_div) - soup = BeautifulSoup(page_content, "lxml") - content = soup.get_text(strip=True) - sha = generate_uuid5(content) - docs_data.append({"docSource": "astro docs", "sha": sha, "content": content, "docLink": url}) - # Recursively scrape linked pages - for link in extract_links(soup, base_url): - scrape_page(link, visited_urls, docs_data) + return str(content_of_interest).strip() def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]: @@ -86,13 +61,26 @@ def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]: :return: A list of pandas dataframes with extracted data. """ - visited_urls = set() - docs_data = [] + all_links = get_internal_links(base_url, exclude_literal=["learn/tags"]) + + # for software references, we only want latest docs, ones with version number (old) is removed + old_version_doc_pattern = r"^https://docs\.astronomer\.io/software/\d+\.\d+/.+$" + # remove duplicate xml files, we only want html pages + non_doc_links = { + link if link.endswith("xml") or re.match(old_version_doc_pattern, link) else "" for link in all_links + } + docs_links = all_links - non_doc_links + + df = pd.DataFrame(docs_links, columns=["docLink"]) + + df["html_content"] = df["docLink"].apply(lambda url: fetch_page_content(url)) - scrape_page(base_url, visited_urls, docs_data) + # Only keep the main article content + df["content"] = df["html_content"].apply(process_astro_doc_page_content) - df = pd.DataFrame(docs_data) - df.drop_duplicates(subset="sha", inplace=True) + df["sha"] = df["content"].apply(generate_uuid5) + df["docSource"] = "astro docs" df.reset_index(drop=True, inplace=True) + df = df[["docSource", "sha", "content", "docLink"]] return [df] diff --git a/airflow/include/tasks/extract/utils/html_utils.py b/airflow/include/tasks/extract/utils/html_utils.py index 1caac139..c8034676 100644 --- a/airflow/include/tasks/extract/utils/html_utils.py +++ b/airflow/include/tasks/extract/utils/html_utils.py @@ -7,12 +7,14 @@ import pandas as pd import requests from bs4 import BeautifulSoup +from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from weaviate.util import generate_uuid5 logger = logging.getLogger("airflow.task") - +attempted_urls = set() internal_urls = set() +internal_page_hashset = set() def is_valid_url(url: str) -> bool: @@ -25,19 +27,29 @@ def is_valid_url(url: str) -> bool: return bool(parsed.netloc) and bool(parsed.scheme) +def _fetch_page_content_retry_default_return(retry_state: RetryCallState) -> str: + logger.info( + "Error fetching content for %s. May be expected if making attempts to validate unknown URLs.", + retry_state.args[0], + ) + return "" + + +@retry( + retry=retry_if_exception_type(requests.RequestException), + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, max=10), + retry_error_callback=_fetch_page_content_retry_default_return, +) def fetch_page_content(url: str) -> str: """ Fetch the content of a html page param url: The url of a page """ - try: - response = requests.get(url, headers={"User-agent": "Ask Astro"}) - response.raise_for_status() # Raise an HTTPError for bad responses - return response.content - except requests.RequestException: - logger.info("Error fetching content for %s: %s", url, url) - return "" + response = requests.get(url, headers={"User-agent": "Ask Astro"}) + response.raise_for_status() # Raise an HTTPError for bad responses + return response.content def is_excluded_url(url: str, exclude_literal: list[str]) -> bool: @@ -98,16 +110,19 @@ def truncate_tokens(text: str, encoding_name: str = "gpt-3.5-turbo", max_length: logger.info(e) -def get_page_links(url: str, exclude_literal: list[str]) -> set[str]: +def get_page_links(url: str, current_page_content: bytes, exclude_literal: list[str]) -> None: """ - Extract all valid and internal links from the given URL. + Recursively extract all valid and internal links from the given URL. + Deduplicates any links with the exact same page content in the process. param url (str): The URL to extract links from. + param current_page_content: Bytes of the content of the url passed in for hashing. param exclude_docs (list): List of strings to exclude from the URL path. """ - urls = set() domain_name = urlparse(url).netloc - soup = BeautifulSoup(requests.get(url).content, "html.parser") + page_content_hash = generate_uuid5(current_page_content) + internal_page_hashset.add(page_content_hash) + soup = BeautifulSoup(current_page_content, "html.parser") for a_tag in soup.findAll("a"): href = a_tag.attrs.get("href") if href == "" or href is None: @@ -117,16 +132,20 @@ def get_page_links(url: str, exclude_literal: list[str]) -> set[str]: href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path if ( not is_valid_url(href) + or not href.startswith("https") or href in internal_urls + or href in attempted_urls or domain_name not in href or is_excluded_url(href, exclude_literal) ): continue - urls.add(href) + attempted_urls.add(href) + new_page_content = fetch_page_content(href) + if (not new_page_content) or generate_uuid5(new_page_content) in page_content_hash: + continue logger.info(href) internal_urls.add(href) - - return urls + get_page_links(href, new_page_content, exclude_literal) def get_internal_links(base_url: str, exclude_literal: list[str] | None = None) -> set[str]: @@ -139,10 +158,9 @@ def get_internal_links(base_url: str, exclude_literal: list[str] | None = None) if exclude_literal is None: exclude_literal = [] - links = get_page_links(base_url, exclude_literal) - - for link in links: - get_page_links(link, exclude_literal) + page_content = fetch_page_content(base_url) + get_page_links(base_url, page_content, exclude_literal) + internal_urls.add(base_url) return internal_urls diff --git a/airflow/include/tasks/split.py b/airflow/include/tasks/split.py index c5933bf3..ba387009 100644 --- a/airflow/include/tasks/split.py +++ b/airflow/include/tasks/split.py @@ -3,10 +3,10 @@ import pandas as pd from langchain.schema import Document from langchain.text_splitter import ( - HTMLHeaderTextSplitter, Language, RecursiveCharacterTextSplitter, ) +from langchain_community.document_transformers import Html2TextTransformer def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame: @@ -86,18 +86,29 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame: 'content': Chunked content in markdown format. """ - - headers_to_split_on = [ - ("h2", "h2"), - ] + separators = ["