Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean html extractor #237

Merged
merged 6 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from datetime import datetime

import pandas as pd
from include.utils.slack import send_failure_notification

from airflow.decorators import dag, task
Expand All @@ -19,6 +20,21 @@
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@task
def split_docs(urls: str, chunk_size: int = 100) -> list[list[pd.DataFrame]]:
"""
Split the URLs in chunk and get dataframe for the content

param urls: List for HTTP URL
param chunk_size: Max number of document in split chunk
"""
from include.tasks import split
from include.tasks.extract.utils.html_utils import urls_to_dataframe

chunked_urls = split.split_list(list(urls), chunk_size=chunk_size)
return [[urls_to_dataframe(chunk_url)] for chunk_url in chunked_urls]


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
Expand All @@ -35,13 +51,10 @@ def ask_astro_load_airflow_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
Expand All @@ -50,7 +63,7 @@ def ask_astro_load_airflow_docs():
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[split_md_docs])
).expand(input_data=split_docs(extracted_airflow_docs, chunk_size=100))


ask_astro_load_airflow_docs()
55 changes: 4 additions & 51 deletions airflow/include/tasks/extract/airflow_docs.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,13 @@
from __future__ import annotations

import re
import urllib.parse

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.html_helpers import get_all_links
from include.tasks.extract.utils.html_utils import get_internal_links


def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:
"""
This task scrapes docs from the Airflow website and returns a list of pandas dataframes. Return
type is a list in order to map to upstream dynamic tasks. The code recursively generates a list
of html files relative to 'docs_base_url' and then extracts each as text.

Note: Only the (class_: body, role: main) tag and children are extracted.

Note: This code will also pickup https://airflow.apache.org/howto/*
which are also referenced in the docs pages. These are useful for Ask Astro and also they are relatively few
pages so we leave them in.

param docs_base_url: Base URL to start extract.
type docs_base_url: str

The returned data includes the following fields:
'docSource': 'apache/airflow/docs'
'docLink': URL for the page
'content': HTML content of the page
'sha': A UUID from the other fields
This task return all internal url for Airflow docs
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
"""

# we exclude the following docs which are not useful and/or too large for easy processing.
Expand All @@ -48,30 +25,6 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:
"cli-and-env-variables-ref.html",
]

docs_url_parts = urllib.parse.urlsplit(docs_base_url)
docs_url_base = f"{docs_url_parts.scheme}://{docs_url_parts.netloc}"

all_links = {docs_base_url}
get_all_links(url=list(all_links)[0], all_links=all_links, exclude_docs=exclude_docs)

# make sure we didn't accidentally pickup any unrelated links in recursion
non_doc_links = {link if docs_url_base not in link else "" for link in all_links}
docs_links = all_links - non_doc_links

df = pd.DataFrame(docs_links, columns=["docLink"])

df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content)

df["content"] = df["html_content"].apply(
lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main"))
)
df["content"] = df["content"].apply(lambda x: re.sub("¶", "", x))

df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "apache/airflow/docs"
df.reset_index(drop=True, inplace=True)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]
all_links = get_internal_links(docs_base_url, exclude_literal=exclude_docs)

return [df]
return all_links
93 changes: 4 additions & 89 deletions airflow/include/tasks/extract/astro_forum_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import pytz
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from include.tasks.extract.utils.html_utils import fetch_page_content, urls_to_dataframe

cutoff_date = datetime(2022, 1, 1, tzinfo=pytz.UTC)

Expand Down Expand Up @@ -77,105 +78,19 @@ def get_cutoff_questions(forum_url: str) -> set[str]:
page_url = f"{base_url}{page_number}"
logger.info(page_url)
page_number = page_number + 1
html_content = requests.get(page_url).content
html_content = fetch_page_content(page_url)
questions_urls = get_questions_urls(html_content)
if not questions_urls: # reached at the end of page
return set(all_valid_url)
filter_questions_urls = filter_cutoff_questions(questions_urls)
all_valid_url.extend(filter_questions_urls)


def truncate_tokens(text: str, encoding_name: str, max_length: int = 8192) -> str:
"""
Truncates a text string based on the maximum number of tokens.

param string (str): The input text string to be truncated.
param encoding_name (str): The name of the encoding model.
param max_length (int): The maximum number of tokens allowed. Default is 8192.
"""
import tiktoken

try:
encoding = tiktoken.encoding_for_model(encoding_name)
except ValueError as e:
raise ValueError(f"Invalid encoding_name: {e}")

encoded_string = encoding.encode(text)
num_tokens = len(encoded_string)

if num_tokens > max_length:
text = encoding.decode(encoded_string[:max_length])

return text


def clean_content(row_content: str) -> str | None:
"""
Cleans and extracts text content from HTML.

param row_content (str): The HTML content to be cleaned.
"""
soup = BeautifulSoup(row_content, "html.parser").find("body")

if soup is None:
return
# Remove script and style tags
for script_or_style in soup(["script", "style"]):
script_or_style.extract()

# Get text and handle whitespaces
text = " ".join(soup.stripped_strings)
# Need to truncate because in some cases the token size
# exceeding the max token size. Better solution can be get summary and ingest it.
return truncate_tokens(text, "gpt-3.5-turbo", 7692)


def fetch_url_content(url) -> str | None:
"""
Fetches the content of a URL.

param url (str): The URL to fetch content from.
"""
try:
response = requests.get(url)
response.raise_for_status() # Raise an HTTPError for bad responses
return response.content
except requests.RequestException:
logger.info("Error fetching content for %s: %s", url, url)
return None


def process_url(url: str, doc_source: str = "") -> dict | None:
"""
Process a URL by fetching its content, cleaning it, and generating a unique identifier (SHA) based on the cleaned content.

param url (str): The URL to be processed.
"""
content = fetch_url_content(url)
if content is not None:
cleaned_content = clean_content(content)
sha = generate_uuid5(cleaned_content)
return {"docSource": doc_source, "sha": sha, "content": cleaned_content, "docLink": url}


def url_to_df(urls: set[str], doc_source: str = "") -> pd.DataFrame:
"""
Create a DataFrame from a list of URLs by processing each URL and organizing the results.

param urls (list): A list of URLs to be processed.
"""
df_data = [process_url(url, doc_source) for url in urls]
df_data = [entry for entry in df_data if entry is not None] # Remove failed entries
df = pd.DataFrame(df_data)
df = df[["docSource", "sha", "content", "docLink"]] # Reorder columns if needed
return df


def get_forum_df() -> list[pd.DataFrame]:
"""
Retrieves question links from a forum, converts them into a DataFrame, and returns a list containing the DataFrame.
"""
questions_links = get_cutoff_questions("https://forum.astronomer.io/latest")
logger.info(questions_links)
df = url_to_df(questions_links, "astro-forum")
df = urls_to_dataframe(questions_links, "astro-forum")
return [df]
6 changes: 3 additions & 3 deletions airflow/include/tasks/extract/astro_sdk_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pandas as pd

from include.tasks.extract.utils.html_url_extractor import extract_internal_url, url_to_df
from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe

logger = logging.getLogger("airflow.task")

Expand All @@ -13,12 +13,12 @@ def extract_astro_sdk_docs() -> list[pd.DataFrame]:
exclude_docs = ["autoapi", "genindex.html", "py-modindex.html", ".md", ".py"]
base_url = "https://astro-sdk-python.readthedocs.io/en/stable/"

urls = extract_internal_url(base_url, exclude_docs)
urls = get_internal_links(base_url, exclude_docs)

new_urls = [url for url in urls if "stable" in url]
logger.info("******ingesting****")
logger.info(new_urls)
logger.info("*********************")
df = url_to_df(new_urls, "astro-sdk")
df = urls_to_dataframe(new_urls, "astro-sdk")

return [df]
6 changes: 3 additions & 3 deletions airflow/include/tasks/extract/astronomer_providers_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import pandas as pd

from include.tasks.extract.utils.html_url_extractor import extract_internal_url, url_to_df
from include.tasks.extract.utils.html_utils import get_internal_links, urls_to_dataframe


def extract_provider_docs() -> list[pd.DataFrame]:
exclude_docs = ["_api", "_modules", "_sources", "changelog.html", "genindex.html", "py-modindex.html", "#"]
base_url = "https://astronomer-providers.readthedocs.io/en/stable/"

urls = extract_internal_url(base_url, exclude_docs)
df = url_to_df(urls, "astronomer-providers")
urls = get_internal_links(base_url, exclude_docs)
df = urls_to_dataframe(urls, "astronomer-providers")
return [df]
77 changes: 0 additions & 77 deletions airflow/include/tasks/extract/utils/html_helpers.py

This file was deleted.

Loading
Loading