Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create TaskFlow API to use WeaviateHook and use taskflow API for ask-astro-load.py #132

Merged
merged 27 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b16e431
fixes #105, #106, #107, #108
mpgreg Nov 13, 2023
33766a4
fixes #109
mpgreg Nov 13, 2023
5bc00bb
fixes #110
mpgreg Nov 13, 2023
41c28d8
removed test code
mpgreg Nov 14, 2023
76ae2ae
fixes #113, fixes #114, fixes #115, fixes #116
mpgreg Nov 14, 2023
9f9704f
fixes #126
mpgreg Nov 14, 2023
226af01
start of refactor for oss
mpgreg Nov 15, 2023
333c2f4
Create AskAstroWeaviateHook to inherit from OSS WeaviateHook
sunank200 Nov 16, 2023
3485a47
Add ingest_data for AskAstroWeaviateHook
sunank200 Nov 17, 2023
c8bc5ee
Fix pre-commit error
sunank200 Nov 17, 2023
c0ccb2e
Add ingest and upsert for weavaite
sunank200 Nov 20, 2023
4a9d428
Update dockerfile
sunank200 Nov 20, 2023
2ffceb8
Fix comments from Wei
sunank200 Nov 20, 2023
9945a3b
Add context manager for batch ingestion
sunank200 Nov 20, 2023
e2fc253
Remove the code which is not in scope for this PR
sunank200 Nov 21, 2023
b238278
Fix comments from Michael
sunank200 Nov 21, 2023
d4c9c31
Use AskAstroWeaviateHook for ask-astro-load-slack.py DAG (#161)
sunank200 Nov 22, 2023
0761bbc
Use AskAstroWeaviateHook for ask-astro-load-stackoverflow.py DAG (#160)
sunank200 Nov 22, 2023
efcf8f9
Use AskAstroWeaviateHook for ask-astro-load-registry.py DAG (#159)
sunank200 Nov 22, 2023
2849ec6
Use AskAstroWeaviateHook for ask-astro-load-github.py DAG (#158)
sunank200 Nov 22, 2023
d8f4fd6
Use AskAstroWeaviateHook for ask-astro-load-blogs.py (#156)
sunank200 Nov 22, 2023
3e494fa
Use AskAstroWeaviateHook for ask_astro_load_airflow_docs DAG (#155)
sunank200 Nov 22, 2023
f5ddc1e
Fix the comments from Wei
sunank200 Nov 22, 2023
63bcb55
Fix comments from Michael
sunank200 Nov 22, 2023
0d10815
Resolve comments from Michael
sunank200 Nov 23, 2023
1bfd4ca
Add a note to use directly in future releases
sunank200 Nov 23, 2023
93ea3d8
Update airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviat…
sunank200 Nov 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@ astro
.env
airflow_settings.yaml
logs/
dags/
6 changes: 1 addition & 5 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
# syntax=quay.io/astronomer/airflow-extensions:latest

FROM quay.io/astronomer/astro-runtime:9.5.0-base

COPY include/airflow_provider_weaviate-0.0.1-py3-none-any.whl /tmp
FROM quay.io/astronomer/astro-runtime:9.5.0
55 changes: 55 additions & 0 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
from datetime import datetime

from include.tasks import split
from include.tasks.extract import airflow_docs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

airflow_docs_base_url = "https://airflow.apache.org/docs/"

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_airflow_docs():
"""
This DAG performs incremental load for any new Airflow docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

extracted_airflow_docs = task(airflow_docs.extract_airflow_docs)(docs_base_url=airflow_docs_base_url)

split_md_docs = task(split.split_html).expand(dfs=[extracted_airflow_docs])

_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_airflow_docs()
47 changes: 32 additions & 15 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,54 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import blogs
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

blog_cutoff_date = datetime.strptime("2023-01-19", "%Y-%m-%d")
blog_cutoff_date = datetime.date(2023, 1, 19)

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_blogs():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

blogs_docs = task(blogs.extract_astro_blogs, retries=3)(blog_cutoff_date=blog_cutoff_date)
blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date)

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_blogs()
70 changes: 36 additions & 34 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,81 @@
import datetime
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import github
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

markdown_docs_sources = [
{"doc_dir": "learn", "repo_base": "astronomer/docs"},
{"doc_dir": "astro", "repo_base": "astronomer/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]
rst_docs_sources = [
{"doc_dir": "docs", "repo_base": "apache/airflow", "exclude_docs": ["changelog.rst", "commits.rst"]},
]
code_samples_sources = [
{"doc_dir": "code-samples", "repo_base": "astronomer/docs"},
]
issues_docs_sources = [
"apache/airflow",
]

default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)

@dag(
schedule_interval=schedule_interval,
start_date=datetime.datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_github():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

md_docs = (
task(github.extract_github_markdown, retries=3)
task(github.extract_github_markdown)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=markdown_docs_sources)
)

rst_docs = (
task(github.extract_github_rst, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=rst_docs_sources)
)
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

issues_docs = (
task(github.extract_github_issues, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(repo_base=issues_docs_sources)
task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(repo_base=issues_docs_sources)
)

code_samples = (
task(github.extract_github_python, retries=3)
.partial(github_conn_id=_GITHUB_CONN_ID)
.expand(source=code_samples_sources)
task(github.extract_github_python).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=code_samples_sources)
)

markdown_tasks = [md_docs, rst_docs, issues_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)
split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])

split_code_docs = task(split.split_python).expand(dfs=[code_samples])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])

issues_docs >> md_docs >> rst_docs >> code_samples
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
)


ask_astro_load_github()
46 changes: 32 additions & 14 deletions airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,57 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import registry
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")

ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
default_args = {"retries": 3, "retry_delay": 30}

schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_registry():
"""
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
This DAG performs incremental load for any new docs. Initial load via ask_astro_load_bulk imported
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""

registry_cells_docs = task(registry.extract_astro_registry_cell_types, retries=3)()
registry_cells_docs = task(registry.extract_astro_registry_cell_types)()

registry_dags_docs = task(registry.extract_astro_registry_dags, retries=3)()
registry_dags_docs = task(registry.extract_astro_registry_dags)()

split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs])

split_code_docs = task(split.split_python).expand(dfs=[registry_dags_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs, split_code_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs, split_code_docs])
)


ask_astro_load_registry()
43 changes: 30 additions & 13 deletions airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
from datetime import datetime

from include.tasks import ingest, split
from include.tasks import split
from include.tasks.extract import slack
from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook

from airflow.decorators import dag, task

ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "")
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsProd")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)
slack_channel_sources = [
{
"channel_name": "troubleshooting",
Expand All @@ -20,25 +22,40 @@
}
]

default_args = {"retries": 3, "retry_delay": 30}

@dag(schedule_interval="0 5 * * *", start_date=datetime(2023, 9, 27), catchup=False, is_paused_upon_creation=True)
schedule_interval = "0 5 * * *" if ask_astro_env == "prod" else None


@dag(
schedule_interval=schedule_interval,
start_date=datetime(2023, 9, 27),
catchup=False,
is_paused_upon_creation=True,
default_args=default_args,
)
def ask_astro_load_slack():
"""
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
This DAG performs incremental load for any new slack threads. The slack archive is a point-in-time capture. This
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
weaviate_import decorator any existing documents that have been updated will be removed and re-added.
"""

slack_docs = task(slack.extract_slack, retries=3).expand(source=slack_channel_sources)
slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources)

split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs])

task.weaviate_import(
ingest.import_upsert_data,
weaviate_conn_id=_WEAVIATE_CONN_ID,
retries=10,
retry_delay=30,
).partial(class_name=WEAVIATE_CLASS, primary_key="docLink").expand(dfs=[split_md_docs])
_import_data = (
task(ask_astro_weaviate_hook.ingest_data, retries=10)
.partial(
class_name=WEAVIATE_CLASS,
existing="upsert",
doc_key="docLink",
batch_params={"batch_size": 1000},
verbose=True,
)
.expand(dfs=[split_md_docs])
)


ask_astro_load_slack()
Loading
Loading