Skip to content

Commit

Permalink
Use AskAstroWeaviateHook for ask_astro_load_airflow_docs DAG (#155)
Browse files Browse the repository at this point in the history
- Implement AskAstroWeaviateHook which inherits from WeavaiteHook from
OSS.
- Use AskAstroWeaviateHook as the taskflow API for
[ask-astro-load-airflow-docs.py](https://github.com/astronomer/ask-astro/blob/45ce6543d044a977d97e9314e443f629604e84a9/airflow/dags/ingestion/ask-astro-load-airflow-docs.py)


closes: #141 

Note: Merge this PR before merging
[132](#132)
  • Loading branch information
sunank200 committed Nov 23, 2023
1 parent d8f4fd6 commit 3e494fa
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import airflow_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

airflow_docs_base_url = "https://airflow.apache.org/docs/"

Expand All @@ -36,12 +39,17 @@ def ask_astro_load_airflow_docs():

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_airflow_docs()

0 comments on commit 3e494fa

Please sign in to comment.