From 96cc7d7cc16a7effec2fa433aafedf9d9f9d7baf Mon Sep 17 00:00:00 2001 From: Ankit Chaurasia <8670962+sunank200@users.noreply.github.com> Date: Thu, 23 Nov 2023 16:28:47 +0545 Subject: [PATCH] Update airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py --- airflow/dags/ingestion/ask-astro-load-github.py | 9 ++------- .../dags/ingestion/ask-astro-load-stackoverflow.py | 2 +- airflow/dags/ingestion/ask-astro-load.py | 13 ------------- .../utils/weaviate/ask_astro_weaviate_hook.py | 2 +- 4 files changed, 4 insertions(+), 22 deletions(-) diff --git a/airflow/dags/ingestion/ask-astro-load-github.py b/airflow/dags/ingestion/ask-astro-load-github.py index 156dc109..8d19741f 100644 --- a/airflow/dags/ingestion/ask-astro-load-github.py +++ b/airflow/dags/ingestion/ask-astro-load-github.py @@ -1,7 +1,6 @@ import datetime import os -from dateutil.relativedelta import relativedelta from include.tasks import split from include.tasks.extract import github from include.tasks.extract.utils.weaviate.ask_astro_weaviate_hook import AskAstroWeaviateHook @@ -26,11 +25,7 @@ {"doc_dir": "code-samples", "repo_base": "astronomer/docs"}, ] issues_docs_sources = [ - { - "repo_base": "apache/airflow", - "cutoff_date": datetime.date.today() - relativedelta(months=1), - "cutoff_issue_number": 30000, - } + "apache/airflow", ] default_args = {"retries": 3, "retry_delay": 30} @@ -59,7 +54,7 @@ def ask_astro_load_github(): ) issues_docs = ( - task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(source=issues_docs_sources) + task(github.extract_github_issues).partial(github_conn_id=_GITHUB_CONN_ID).expand(repo_base=issues_docs_sources) ) code_samples = ( diff --git a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py index 533ece43..66f24a33 100644 --- a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py +++ b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py @@ -10,7 +10,7 @@ ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev") _WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" -WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDevAnkit") +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) stackoverflow_cutoff_date = "2021-09-01" diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 817659be..0cdb554c 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -140,7 +140,6 @@ def check_seed_baseline(seed_baseline_url: str = None) -> str | set: "extract_github_markdown", "extract_airflow_docs", "extract_stack_overflow", - # "extract_slack_archive", "extract_astro_registry_cell_types", "extract_github_issues", "extract_astro_blogs", @@ -190,16 +189,6 @@ def extract_stack_overflow(tag: str, stackoverflow_cutoff_date: str = stackoverf return df - # @task(trigger_rule="none_failed") - # def extract_slack_archive(source: dict): - # try: - # df = pd.read_parquet("include/data/slack/troubleshooting.parquet") - # except Exception: - # df = slack.extract_slack_archive(source) - # df.to_parquet("include/data/slack/troubleshooting.parquet") - # - # return df - @task(trigger_rule="none_failed") def extract_github_issues(repo_base: str): try: @@ -243,7 +232,6 @@ def extract_astro_blogs(): md_docs = extract_github_markdown.expand(source=markdown_docs_sources) issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources) stackoverflow_docs = extract_stack_overflow.expand(tag=stackoverflow_tags) - # slack_docs = extract_slack_archive.expand(source=slack_channel_sources) registry_cells_docs = extract_astro_registry_cell_types() blogs_docs = extract_astro_blogs() registry_dags_docs = extract_astro_registry_dags() @@ -259,7 +247,6 @@ def extract_astro_blogs(): md_docs, issues_docs, stackoverflow_docs, - # slack_docs, blogs_docs, registry_cells_docs, ] diff --git a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py index a56e8444..34764ad8 100644 --- a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py +++ b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py @@ -64,7 +64,7 @@ def is_class_missing(self, class_object: dict) -> bool: """ try: class_schema = self.client.schema.get(class_object.get("class", "")) - return self.compare_schema_subset(class_object=class_object, class_schema=class_schema) + return not self.compare_schema_subset(class_object=class_object, class_schema=class_schema) except UnexpectedStatusCodeException as e: return e.status_code == 404 and "with response body: None." in e.message except Exception as e: