Skip to content

Commit

Permalink
Improve HTML Splitter, URL Fetching Logic & Astro Docs Ingestion (#293)
Browse files Browse the repository at this point in the history
### Description
- This is 1st part of a 2 part effort to improve the scraping,
extraction, chunking and tokenizing logic for Ask Astro's data ingestion
process. (see details in this issue
#258)
- This PR mainly focuses on improving noise from ingestion process of
the Astro Docs data source, along with some other related changes such
as only scraping the latest doc versions, add auto exponential backoff
on html get function and etc.

### Closes the Following Issues
- #292 
- #270
- #209

### Partially Completes Issues
- #258 (2 part effort,
only 1 PR completed)
- #221 (tackles token
limit in html splitting logic, other parts needs tackling still)

### Technical Details
- airflow/include/tasks/extract/astro_docs.py
- Add function `process_astro_doc_page_content`: which gets rid of
noisey not useful content such as nav bar, footer, header and only
extract the main page article content
- Remove the previous function `scrape_page` (which scraps the HTML
content AND finds scraps all its sub pages using links contained). This
is done since 1. there is already a centralized util function called
`fetch_page_content()` that does the job of fetching each page's HTML
elements, 2. there is already a centralized util function called
`get_internal_links` that finds all links in, 3. the scraping process
itself does not exclude noisey unrelated content which is replaced by
the function in the previous bullet point
`process_astro_doc_page_content`
- airflow/include/tasks/split.py
- Modify function `split_html`: it previously splits on specific HTML
tags using `HTMLHeaderTextSplitter` but it is not ideal as we do not
want to split that often and there is no guarantee splitting on such
tags retains semantic meaning. This is changed to using
`RecursiveCharacterTextSplitter` with a token limit. This will ONLY
split if the chunk starts exceeding a certain number of specified token
amount. If it still exceeds then go down the separator list and split
further, until splitting by space and character to fit into token limit.
This retains better semantic meaning in each chunks and enforces token
limit.
- airflow/include/tasks/extract/utils/html_utils.py
- Change function `fetch_page_content` to add auto retry with
exponential backoff using tenacity.
- Change function `get_page_links` to make it traverse a given page
recursively and finds all links related to this website. This ensures no
duplicate pages are traversed and no pages are missing. Previously, the
logic is missing some links when traversing potentially due to the fact
that it is using a for loop and not doing recursive traversal until all
links are exhausted.
- Note: This has a huge URL difference. Previously a lot of links were
like https://abc.com/abc#XXX and https://abc.com/abc#YYY where the
hashtag is the same page but one section of the page, but the logic
wasn't able to distinguish them.
    - 
- airflow/requirements.txt: adding required packages
- api/ask_astro/settings.py: remove unused variables

### Results
#### Astro Docs: Better URLs Fetched + Crawling Improvement + HTML
Splitter Improvement
1. Example of formatting and chunking
- Previously (near unreadable)

![image](https://github.com/astronomer/ask-astro/assets/26350341/90ff59f9-1401-4395-8add-cecd8bf08ac4)
- Now (cleaned!)

![image](https://github.com/astronomer/ask-astro/assets/26350341/b465a4fc-497c-4687-b601-aa03ba12fc15)
2. Example of URLs difference
- Previously
- around 1000 links fetched. Many have DUPLCIATE content since they are
the same link.
    - XMLs and non HTML/website content fetch
See old links:
[astro_docs_links_old.txt](https://github.com/astronomer/ask-astro/files/14146665/astro_docs_links_old.txt)
- Now
    - No more duplicate pages or unreleased pages
- No older versions for software docs, only latest docs being ingested.
(e.g.: the .../0.31... links are gone)

[new_astro_docs_links.txt](https://github.com/astronomer/ask-astro/files/14146669/new_astro_docs_links.txt)

#### Evaluation
- Overall improvement in answer and retrieval quality
- No degradation noted
- CSV posted in comments
  • Loading branch information
davidgxue authored Feb 23, 2024
1 parent d6bc44b commit c43ffc1
Show file tree
Hide file tree
Showing 18 changed files with 136 additions and 130 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-forum-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def ask_astro_load_astro_forum():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
25 changes: 6 additions & 19 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
from datetime import datetime

import pandas as pd
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
Expand All @@ -20,21 +19,6 @@
schedule_interval = os.environ.get("INGESTION_SCHEDULE", "0 5 * * 2") if ask_astro_env == "prod" else None


@task
def split_docs(urls: str, chunk_size: int = 100) -> list[list[pd.DataFrame]]:
"""
Split the URLs in chunk and get dataframe for the content
param urls: List for HTTP URL
param chunk_size: Max number of document in split chunk
"""
from include.tasks import split
from include.tasks.extract.utils.html_utils import urls_to_dataframe

chunked_urls = split.split_list(list(urls), chunk_size=chunk_size)
return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls]


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
Expand All @@ -51,19 +35,22 @@ def ask_astro_load_airflow_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)
extracted_airflow_docs = task(split.split_html).expand(
dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)]
)

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100))
).expand(input_data=[extracted_airflow_docs])


ask_astro_load_airflow_docs()
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-astro-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def ask_astro_load_astro_cli_docs():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-astro-sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def ask_astro_load_astro_sdk():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/ingestion/ask-astro-load-astronomer-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ def ask_astro_load_astronomer_docs():

astro_docs = task(extract_astro_docs)()

split_md_docs = task(split.split_markdown).expand(dfs=[astro_docs])
split_html_docs = task(split.split_html).expand(dfs=[astro_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])
).expand(input_data=[split_html_docs])


ask_astro_load_astronomer_docs()
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ask_astro_load_astronomer_providers():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def ask_astro_load_blogs():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def ask_astro_load_github():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ask_astro_load_registry():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def ask_astro_load_slack():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def ask_astro_load_stackoverflow():
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def extract_airflow_docs():
else:
raise Exception("Parquet file exists locally but is not readable.")
else:
df = airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)[0]
df = airflow_docs.extract_airflow_docs.function(docs_base_url=airflow_docs_base_url)[0]
df.to_parquet(parquet_file)

return [df]
Expand Down Expand Up @@ -442,7 +442,7 @@ def import_baseline(
class_name=WEAVIATE_CLASS,
existing="replace",
document_column="docLink",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
Expand All @@ -455,7 +455,7 @@ def import_baseline(
document_column="docLink",
uuid_column="id",
vector_column="vector",
batch_config_params={"batch_size": 1000},
batch_config_params={"batch_size": 7, "dynamic": False},
verbose=True,
)

Expand Down
7 changes: 6 additions & 1 deletion airflow/include/tasks/extract/airflow_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from airflow.decorators import task
from include.tasks.extract.utils.html_utils import get_internal_links


@task
def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:
"""
This task return all internal url for Airflow docs
Expand All @@ -36,7 +38,10 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:
docs_url_parts = urllib.parse.urlsplit(docs_base_url)
docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}"
# make sure we didn't accidentally pickup any unrelated links in recursion
non_doc_links = {link if docs_url_base not in link else "" for link in all_links}
old_version_doc_pattern = r"/(\d+\.)*\d+/"
non_doc_links = {
link if (docs_url_base not in link) or re.search(old_version_doc_pattern, link) else "" for link in all_links
}
docs_links = all_links - non_doc_links

df = pd.DataFrame(docs_links, columns=["docLink"])
Expand Down
114 changes: 51 additions & 63 deletions airflow/include/tasks/extract/astro_docs.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,58 @@
from __future__ import annotations

import logging
from urllib.parse import urldefrag, urljoin
import re

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links

base_url = "https://docs.astronomer.io/"


def fetch_page_content(url: str) -> str:
"""
Fetches the content of a given URL.
def process_astro_doc_page_content(page_content: str) -> str:
soup = BeautifulSoup(page_content, "html.parser")

:param url: URL of the page to fetch.
:return: HTML content of the page.
"""
try:
response = requests.get(url, headers={"User-Agent": "Mozilla/5.0"})
if response.status_code == 200:
return response.content
except requests.RequestException as e:
logging.error(f"Error fetching {url}: {e}")
return ""
# Find the main article container
main_container = soup.find("main", class_="docMainContainer_TBSr")

content_of_interest = main_container if main_container else soup
for nav_tag in content_of_interest.find_all("nav"):
nav_tag.decompose()

def extract_links(soup: BeautifulSoup, base_url: str) -> list[str]:
"""
Extracts all valid links from a BeautifulSoup object.
for script_or_style in content_of_interest.find_all(["script", "style", "button", "img", "svg"]):
script_or_style.decompose()

:param soup: BeautifulSoup object to extract links from.
:param base_url: Base URL for relative links.
:return: List of extracted URLs.
"""
links = []
for link in soup.find_all("a", href=True):
href = link["href"]
if not href.startswith("http"):
href = urljoin(base_url, href)
if href.startswith(base_url):
links.append(href)
return links
feedback_widget = content_of_interest.find("div", id="feedbackWidget")
if feedback_widget:
feedback_widget.decompose()

newsletter_form = content_of_interest.find("form", id="newsletterForm")
if newsletter_form:
newsletter_form.decompose()

def scrape_page(url: str, visited_urls: set, docs_data: list) -> None:
"""
Recursively scrapes a webpage and its subpages.
sidebar = content_of_interest.find("ul", class_=lambda value: value and "table-of-contents" in value)
if sidebar:
sidebar.decompose()

:param url: URL of the page to scrape.
:param visited_urls: Set of URLs already visited.
:param docs_data: List to append extracted data to.
"""
if url in visited_urls or not url.startswith(base_url):
return

# Normalize URL by stripping off the fragment
base_url_no_fragment, frag = urldefrag(url)
footers = content_of_interest.find_all("footer")
for footer in footers:
footer.decompose()

# If the URL is the base URL plus a fragment, ignore it
if base_url_no_fragment == base_url and frag:
return
# The actual article in almost all pages of Astro Docs website is in the following HTML container
container_div = content_of_interest.find("div", class_=lambda value: value and "container" in value)

visited_urls.add(url)
if container_div:
row_div = container_div.find("div", class_="row")

logging.info(f"Scraping : {url}")
if row_div:
col_div = row_div.find("div", class_=lambda value: value and "col" in value)

page_content = fetch_page_content(url)
if not page_content:
return
if col_div:
content_of_interest = str(col_div)

soup = BeautifulSoup(page_content, "lxml")
content = soup.get_text(strip=True)
sha = generate_uuid5(content)
docs_data.append({"docSource": "astro docs", "sha": sha, "content": content, "docLink": url})
# Recursively scrape linked pages
for link in extract_links(soup, base_url):
scrape_page(link, visited_urls, docs_data)
return str(content_of_interest).strip()


def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]:
Expand All @@ -86,13 +61,26 @@ def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]:
:return: A list of pandas dataframes with extracted data.
"""
visited_urls = set()
docs_data = []
all_links = get_internal_links(base_url, exclude_literal=["learn/tags"])

# for software references, we only want latest docs, ones with version number (old) is removed
old_version_doc_pattern = r"^https://docs\.astronomer\.io/software/\d+\.\d+/.+$"
# remove duplicate xml files, we only want html pages
non_doc_links = {
link if link.endswith("xml") or re.match(old_version_doc_pattern, link) else "" for link in all_links
}
docs_links = all_links - non_doc_links

df = pd.DataFrame(docs_links, columns=["docLink"])

df["html_content"] = df["docLink"].apply(lambda url: fetch_page_content(url))

scrape_page(base_url, visited_urls, docs_data)
# Only keep the main article content
df["content"] = df["html_content"].apply(process_astro_doc_page_content)

df = pd.DataFrame(docs_data)
df.drop_duplicates(subset="sha", inplace=True)
df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "astro docs"
df.reset_index(drop=True, inplace=True)

df = df[["docSource", "sha", "content", "docLink"]]
return [df]
Loading

0 comments on commit c43ffc1

Please sign in to comment.