Skip to content

Commit

Permalink
Use AskAstroWeaviateHook for ask-astro-load-blogs.py (#156)
Browse files Browse the repository at this point in the history
- Use `AskAstroWeaviateHook` as the taskflow API for
[ask-astro-load-blogs.py](https://github.com/astronomer/ask-astro/blob/main/airflow/dags/ingestion/ask-astro-load-blogs.py)

closes: #135
  • Loading branch information
sunank200 committed Nov 23, 2023
1 parent 2849ec6 commit d8f4fd6
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import datetime
import os

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import blogs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.date(2023, 1, 19)

Expand All @@ -36,12 +38,17 @@ def ask_astro_load_blogs():

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
).partial(
class_name=WEAVIATE_CLASS, primary_key="docLink"
).expand(dfs=[split_md_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_blogs()

0 comments on commit d8f4fd6

Please sign in to comment.