From fe2c0f9ca4105b3ea0f283c9023c60bfda1ed37f Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 4 Dec 2023 17:12:58 +0800 Subject: [PATCH 1/3] feat(airflow): add astro-cli ingestion Closes: #190 --- .../ingestion/ask-astro-load-astro-cli.py | 51 +++++++++++++++++++ .../include/tasks/extract/astro_cli_docs.py | 36 +++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 airflow/dags/ingestion/ask-astro-load-astro-cli.py create mode 100644 airflow/include/tasks/extract/astro_cli_docs.py diff --git a/airflow/dags/ingestion/ask-astro-load-astro-cli.py b/airflow/dags/ingestion/ask-astro-load-astro-cli.py new file mode 100644 index 00000000..ee44c4bd --- /dev/null +++ b/airflow/dags/ingestion/ask-astro-load-astro-cli.py @@ -0,0 +1,51 @@ +import datetime +import os + +from include.tasks import split +from include.tasks.extract import astro_cli_docs +from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook + +from airflow.decorators import dag, task + +ask_astro_env = os.environ.get("`ASK_ASTRO_ENV", "dev") + +_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") +ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) + +default_args = {"retries": 3, "retry_delay": 30} + +schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None + + +@dag( + schedule_interval=schedule_interval, + start_date=datetime.datetime(2023, 9, 27), + catchup=False, + is_paused_upon_creation=True, + default_args=default_args, +) +def ask_astro_load_astro_cli_docs(): + """ + This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported + data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator + any existing documents that have been updated will be removed and re-added. + """ + + extract_astro_cli_docs = task(astro_cli_docs.extract_astro_cli_docs)() + split_md_docs = task(split.split_html).expand(dfs=[extract_astro_cli_docs]) + + _import_data = ( + task(ask_astro_weaviate_hook.ingest_data, retries=10) + .partial( + class_name=WEAVIATE_CLASS, + existing="upsert", + doc_key="docLink", + batch_params={"batch_size": 1000}, + verbose=True, + ) + .expand(dfs=[split_md_docs]) + ) + + +ask_astro_load_astro_cli_docs() diff --git a/airflow/include/tasks/extract/astro_cli_docs.py b/airflow/include/tasks/extract/astro_cli_docs.py new file mode 100644 index 00000000..fb639d90 --- /dev/null +++ b/airflow/include/tasks/extract/astro_cli_docs.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +import re + +import pandas as pd +import requests +from bs4 import BeautifulSoup +from weaviate.util import generate_uuid5 + + +def extract_astro_cli_docs() -> list[pd.DataFrame]: + astronomer_base_url = "https://docs.astronomer.io" + astro_cli_overview_endpoint = "/astro/cli/overview" + + response = requests.get(f"{astronomer_base_url}/{astro_cli_overview_endpoint}") + soup = BeautifulSoup(response.text, "lxml") + astro_cli_links = { + f"{astronomer_base_url}{link.get('href')}" + for link in soup.find_all("a") + if link.get("href").startswith("/astro/cli") + } + + df = pd.DataFrame(astro_cli_links, columns=["docLink"]) + df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content) + + df["content"] = df["html_content"].apply(lambda x: str(BeautifulSoup(x, "html.parser").find("body"))) + df["content"] = df["content"].apply(lambda x: re.sub("¶", "", x)) + + df["sha"] = df["content"].apply(generate_uuid5) + df["docSource"] = "apache/airflow/docs" + df.reset_index(drop=True, inplace=True) + + # column order matters for uuid generation + df = df[["docSource", "sha", "content", "docLink"]] + + return [df] From 2e74f2dcf90b6389812f3fa4237993609f97151e Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 5 Dec 2023 16:32:09 +0800 Subject: [PATCH 2/3] feat(airflow): add astro cli bulk load --- airflow/dags/ingestion/ask-astro-load.py | 16 ++++++++++++++-- airflow/include/tasks/extract/astro_cli_docs.py | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 0c165d7d..483230d1 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -8,7 +8,7 @@ import pandas as pd from include.tasks import split -from include.tasks.extract import airflow_docs, blogs, github, registry, stack_overflow +from include.tasks.extract import airflow_docs, astro_cli_docs, blogs, github, registry, stack_overflow from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook from airflow.decorators import dag, task @@ -192,6 +192,17 @@ def extract_airflow_docs(): return [df] + @task(trigger_rule="none_failed") + def extract_astro_cli_docs(): + astro_cli_parquet_path = "include/data/astronomer/docs/astro-cli.parquet" + try: + df = pd.read_parquet(astro_cli_parquet_path) + except Exception: + df = astro_cli_docs.extract_astro_cli_docs()[0] + df.to_parquet(astro_cli_parquet_path) + + return [df] + @task(trigger_rule="none_failed") def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverflow_cutoff_date): parquet_file = "include/data/stack_overflow/base.parquet" @@ -277,6 +288,7 @@ def extract_astro_blogs(): registry_dags_docs = extract_astro_registry_dags() code_samples = extract_github_python.expand(source=code_samples_sources) _airflow_docs = extract_airflow_docs() + _astro_cli_docs = extract_astro_cli_docs() _get_schema = get_schema_and_process(schema_file="include/data/schema.json") _check_schema = check_schema(class_objects=_get_schema) @@ -291,7 +303,7 @@ def extract_astro_blogs(): registry_cells_docs, ] - html_tasks = [_airflow_docs] + html_tasks = [_airflow_docs, _astro_cli_docs] python_code_tasks = [registry_dags_docs, code_samples] diff --git a/airflow/include/tasks/extract/astro_cli_docs.py b/airflow/include/tasks/extract/astro_cli_docs.py index fb639d90..2765d5d0 100644 --- a/airflow/include/tasks/extract/astro_cli_docs.py +++ b/airflow/include/tasks/extract/astro_cli_docs.py @@ -27,7 +27,7 @@ def extract_astro_cli_docs() -> list[pd.DataFrame]: df["content"] = df["content"].apply(lambda x: re.sub("¶", "", x)) df["sha"] = df["content"].apply(generate_uuid5) - df["docSource"] = "apache/airflow/docs" + df["docSource"] = "astronomer/docs/astro-cli" df.reset_index(drop=True, inplace=True) # column order matters for uuid generation From e8a77cd50cb23b1c04c456f94973fa520e0c68f7 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 12 Dec 2023 07:47:04 +0800 Subject: [PATCH 3/3] docs(airflow): add docstring to extract_astro_cli_docs --- airflow/include/tasks/extract/astro_cli_docs.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/airflow/include/tasks/extract/astro_cli_docs.py b/airflow/include/tasks/extract/astro_cli_docs.py index 2765d5d0..b8ddd749 100644 --- a/airflow/include/tasks/extract/astro_cli_docs.py +++ b/airflow/include/tasks/extract/astro_cli_docs.py @@ -9,6 +9,16 @@ def extract_astro_cli_docs() -> list[pd.DataFrame]: + """ + This task downloads Blogs from the astro-cli documentation website and returns a list of pandas dataframes. + Return type is a list in order to map to upstream dynamic tasks. + + The returned data includes the following fields: + 'docSource': 'apache/airflow/docs' + 'docLink': URL for the page + 'content': HTML content of the page + 'sha': A UUID from the other fields + """ astronomer_base_url = "https://docs.astronomer.io" astro_cli_overview_endpoint = "/astro/cli/overview"