Skip to content

Commit

Permalink
clean html extractor (#237)
Browse files Browse the repository at this point in the history
closes: #164

Currently, We have some duplicate code in the HTML extractor, this PR
aims to remove the duplicate code and reuse it from html_utils.
  • Loading branch information
pankajastro authored Jan 11, 2024
1 parent 17040cf commit 28f1de8
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 372 deletions.
21 changes: 17 additions & 4 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from datetime import datetime

import pandas as pd
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
Expand All @@ -19,6 +20,21 @@
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@task
def split_docs(urls: str, chunk_size: int = 100) -> list[list[pd.DataFrame]]:
"""
Split the URLs in chunk and get dataframe for the content
param urls: List for HTTP URL
param chunk_size: Max number of document in split chunk
"""
from include.tasks import split
from include.tasks.extract.utils.html_utils import urls_to_dataframe

chunked_urls = split.split_list(list(urls), chunk_size=chunk_size)
return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls]


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
Expand All @@ -35,13 +51,10 @@ def ask_astro_load_airflow_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
Expand All @@ -50,7 +63,7 @@ def ask_astro_load_airflow_docs():
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])
).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100))


ask_astro_load_airflow_docs()
55 changes: 4 additions & 51 deletions airflow/include/tasks/extract/airflow_docs.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,13 @@
from __future__ import annotations

import re
import urllib.parse

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.html_helpers import get_all_links
from include.tasks.extract.utils.html_utils import get_internal_links


def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:
"""
This task scrapes docs from the Airflow website and returns a list of pandas dataframes. Return
type is a list in order to map to upstream dynamic tasks. The code recursively generates a list
of html files relative to 'docs_base_url' and then extracts each as text.
Note: Only the (class_: body, role: main) tag and children are extracted.
Note: This code will also pickup https://airflow.apache.org/howto/*
which are also referenced in the docs pages. These are useful for Ask Astro and also they are relatively few
pages so we leave them in.
param docs_base_url: Base URL to start extract.
type docs_base_url: str
The returned data includes the following fields:
'docSource': 'apache/airflow/docs'
'docLink': URL for the page
'content': HTML content of the page
'sha': A UUID from the other fields
This task return all internal url for Airflow docs
"""

# we exclude the following docs which are not useful and/or too large for easy processing.
Expand All @@ -48,30 +25,6 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:
"cli-and-env-variables-ref.html",
]

docs_url_parts = urllib.parse.urlsplit(docs_base_url)
docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}"

all_links = {docs_base_url}
get_all_links(url=list(all_links)[0], all_links=all_links, exclude_docs=exclude_docs)

# make sure we didn't accidentally pickup any unrelated links in recursion
non_doc_links = {link if docs_url_base not in link else "" for link in all_links}
docs_links = all_links - non_doc_links

df = pd.DataFrame(docs_links, columns=["docLink"])

df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content)

df["content"] = df["html_content"].apply(
lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main"))
)
df["content"] = df["content"].apply(lambda x: re.sub("¶", "", x))

df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "apache/airflow/docs"
df.reset_index(drop=True, inplace=True)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]
all_links = get_internal_links(docs_base_url, exclude_literal=exclude_docs)

return [df]
return all_links
93 changes: 4 additions & 89 deletions airflow/include/tasks/extract/astro_forum_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import pytz
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.html_utils import fetch_page_content, urls_to_dataframe

cutoff_date = datetime(2022, 1, 1, tzinfo=pytz.UTC)

Expand Down Expand Up @@ -77,105 +78,19 @@ def get_cutoff_questions(forum_url: str) -> set[str]:
page_url = f"{base_url}{page_number}"
logger.info(page_url)
page_number = page_number + 1
html_content = requests.get(page_url).content
html_content = fetch_page_content(page_url)
questions_urls = get_questions_urls(html_content)
if not questions_urls: # reached at the end of page
return set(all_valid_url)
filter_questions_urls = filter_cutoff_questions(questions_urls)
all_valid_url.extend(filter_questions_urls)


def truncate_tokens(text: str, encoding_name: str, max_length: int = 8192) -> str:
"""
Truncates a text string based on the maximum number of tokens.
param string (str): The input text string to be truncated.
param encoding_name (str): The name of the encoding model.
param max_length (int): The maximum number of tokens allowed. Default is 8192.
"""
import tiktoken

try:
encoding = tiktoken.encoding_for_model(encoding_name)
except ValueError as e:
raise ValueError(f"Invalid encoding_name: {e}")

encoded_string = encoding.encode(text)
num_tokens = len(encoded_string)

if num_tokens > max_length:
text = encoding.decode(encoded_string[:max_length])

return text


def clean_content(row_content: str) -> str | None:
"""
Cleans and extracts text content from HTML.
param row_content (str): The HTML content to be cleaned.
"""
soup = BeautifulSoup(row_content, "html.parser").find("body")

if soup is None:
return
# Remove script and style tags
for script_or_style in soup(["script", "style"]):
script_or_style.extract()

# Get text and handle whitespaces
text = " ".join(soup.stripped_strings)
# Need to truncate because in some cases the token size
# exceeding the max token size. Better solution can be get summary and ingest it.
return truncate_tokens(text, "gpt-3.5-turbo", 7692)


def fetch_url_content(url) -> str | None:
"""
Fetches the content of a URL.
param url (str): The URL to fetch content from.
"""
try:
response = requests.get(url)
response.raise_for_status() # Raise an HTTPError for bad responses
return response.content
except requests.RequestException:
logger.info("Error fetching content for %s: %s", url, url)
return None


def process_url(url: str, doc_source: str = "") -> dict | None:
"""
Process a URL by fetching its content, cleaning it, and generating a unique identifier (SHA) based on the cleaned content.
param url (str): The URL to be processed.
"""
content = fetch_url_content(url)
if content is not None:
cleaned_content = clean_content(content)
sha = generate_uuid5(cleaned_content)
return {"docSource": doc_source, "sha": sha, "content": cleaned_content, "docLink": url}


def url_to_df(urls: set[str], doc_source: str = "") -> pd.DataFrame:
"""
Create a DataFrame from a list of URLs by processing each URL and organizing the results.
param urls (list): A list of URLs to be processed.
"""
df_data = [process_url(url, doc_source) for url in urls]
df_data = [entry for entry in df_data if entry is not None] # Remove failed entries
df = pd.DataFrame(df_data)
df = df[["docSource", "sha", "content", "docLink"]] # Reorder columns if needed
return df


def get_forum_df() -> list[pd.DataFrame]:
"""
Retrieves question links from a forum, converts them into a DataFrame, and returns a list containing the DataFrame.
"""
questions_links = get_cutoff_questions("https://forum.astronomer.io/latest")
logger.info(questions_links)
df = url_to_df(questions_links, "astro-forum")
df = urls_to_dataframe(questions_links, "astro-forum")
return [df]
6 changes: 3 additions & 3 deletions airflow/include/tasks/extract/astro_sdk_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pandas as pd

from include.tasks.extract.utils.html_url_extractor import extract_internal_url, url_to_df
from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe

logger = logging.getLogger("airflow.task")

Expand All @@ -13,12 +13,12 @@ def extract_astro_sdk_docs() -> list[pd.DataFrame]:
exclude_docs = ["autoapi", "genindex.html", "py-modindex.html", ".md", ".py"]
base_url = "https://astro-sdk-python.readthedocs.io/en/stable/"

urls = extract_internal_url(base_url, exclude_docs)
urls = get_internal_links(base_url, exclude_docs)

new_urls = [url for url in urls if "stable" in url]
logger.info("******ingesting****")
logger.info(new_urls)
logger.info("*********************")
df = url_to_df(new_urls, "astro-sdk")
df = urls_to_dataframe(new_urls, "astro-sdk")

return [df]
6 changes: 3 additions & 3 deletions airflow/include/tasks/extract/astronomer_providers_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import pandas as pd

from include.tasks.extract.utils.html_url_extractor import extract_internal_url, url_to_df
from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe


def extract_provider_docs() -> list[pd.DataFrame]:
exclude_docs = ["_api", "_modules", "_sources", "changelog.html", "genindex.html", "py-modindex.html", "#"]
base_url = "https://astronomer-providers.readthedocs.io/en/stable/"

urls = extract_internal_url(base_url, exclude_docs)
df = url_to_df(urls, "astronomer-providers")
urls = get_internal_links(base_url, exclude_docs)
df = urls_to_dataframe(urls, "astronomer-providers")
return [df]
77 changes: 0 additions & 77 deletions airflow/include/tasks/extract/utils/html_helpers.py

This file was deleted.

Loading

0 comments on commit 28f1de8

Please sign in to comment.