Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): add astro-cli ingestion #200

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-astro-cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import datetime
import os

from include.tasks import split
from include.tasks.extract import astro_cli_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("`ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_astro_cli_docs():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

extract_astro_cli_docs = task(astro_cli_docs.extract_astro_cli_docs)()
split_md_docs = task(split.split_html).expand(dfs=[extract_astro_cli_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_astro_cli_docs()
16 changes: 14 additions & 2 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import pandas as pd
from include.tasks import split
from include.tasks.extract import airflow_docs, blogs, github, registry, stack_overflow
from include.tasks.extract import airflow_docs, astro_cli_docs, blogs, github, registry, stack_overflow
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task
Expand Down Expand Up @@ -192,6 +192,17 @@ def extract_airflow_docs():

return [df]

@task(trigger_rule="none_failed")
def extract_astro_cli_docs():
astro_cli_parquet_path = "include/data/astronomer/docs/astro-cli.parquet"
try:
df = pd.read_parquet(astro_cli_parquet_path)
except Exception:
df = astro_cli_docs.extract_astro_cli_docs()[0]
df.to_parquet(astro_cli_parquet_path)

return [df]

@task(trigger_rule="none_failed")
def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverflow_cutoff_date):
parquet_file = "include/data/stack_overflow/base.parquet"
Expand Down Expand Up @@ -277,6 +288,7 @@ def extract_astro_blogs():
registry_dags_docs = extract_astro_registry_dags()
code_samples = extract_github_python.expand(source=code_samples_sources)
_airflow_docs = extract_airflow_docs()
_astro_cli_docs = extract_astro_cli_docs()

_get_schema = get_schema_and_process(schema_file="include/data/schema.json")
_check_schema = check_schema(class_objects=_get_schema)
Expand All @@ -291,7 +303,7 @@ def extract_astro_blogs():
registry_cells_docs,
]

html_tasks = [_airflow_docs]
html_tasks = [_airflow_docs, _astro_cli_docs]

python_code_tasks = [registry_dags_docs, code_samples]

Expand Down
46 changes: 46 additions & 0 deletions airflow/include/tasks/extract/astro_cli_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from __future__ import annotations

import re

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5


def extract_astro_cli_docs() -> list[pd.DataFrame]:
"""
This task downloads Blogs from the astro-cli documentation website and returns a list of pandas dataframes.
Return type is a list in order to map to upstream dynamic tasks.

The returned data includes the following fields:
'docSource': 'apache/airflow/docs'
'docLink': URL for the page
'content': HTML content of the page
'sha': A UUID from the other fields
"""
astronomer_base_url = "https://docs.astronomer.io"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the doc strings please?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

astro_cli_overview_endpoint = "/astro/cli/overview"

response = requests.get(f"{astronomer_base_url}/{astro_cli_overview_endpoint}")
soup = BeautifulSoup(response.text, "lxml")
astro_cli_links = {
f"{astronomer_base_url}{link.get('href')}"
for link in soup.find_all("a")
if link.get("href").startswith("/astro/cli")
}

df = pd.DataFrame(astro_cli_links, columns=["docLink"])
df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content)

df["content"] = df["html_content"].apply(lambda x: str(BeautifulSoup(x, "html.parser").find("body")))
df["content"] = df["content"].apply(lambda x: re.sub("¶", "", x))

df["sha"] = df["content"].apply(generate_uuid5)
df["docSource"] = "astronomer/docs/astro-cli"
df.reset_index(drop=True, inplace=True)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]

return [df]
Loading