Skip to content

Commit

Permalink
V0.3.2 Minor Model Improvements + DAG Fixes Due to Data Source Change (
Browse files Browse the repository at this point in the history
…#326)

### Bug Fixes
- DAG: ask_astro_load_astro_cli_docs failure
```
df["content"] = df["content"].apply(enforce_max_token_len)
KeyError: 'content'
```
- DAG: ask_astro_load_stackoverflow failure
```
page above 25 requires access token or app key
```
- DAG: ask_astro_load_blogs failure
```
  File "/usr/local/airflow/include/tasks/extract/blogs.py", line 56, in <lambda>
    lambda x: BeautifulSoup(x, "lxml").find(class_="post-card__meta").find(class_="title").get_text()
AttributeError: 'NoneType' object has no attribute 'find'
```
Astro Blogs formatting has changed

- Astro Docs ingest DAG
Have been using outdated url doc.astronomer.io, but astronomer has moved
to www.astronomer.io/docs

### Minor Improvements
- Remove ingest of Github issues from ingest sources
- This has been adding nothing but noise. Most closed issues are bug
reports and they have been fixed, retrieving these cause the LLM to
think the bug persists
- Github Registry Docs Reformat
What Ask Astro had for registry ingest previously does not provide LLM
on any insights at all
    - How does the LLM know how to use this anyway?
    - Add operator usage and param type details
e.g. of what we had before

```
# Registry
## Provider: astro-sdk-python
Version: 1.8.0
Module: dataframe
Module Description: This decorator will allow users to write python functions while treating SQL
tables as dataframes.
```

- Upgrade from Cohere Rerank 2 to Rerank 3
- Cohere emailed us asking us if we can move to Rerank 3. It's cheaper
better and faster.
- Upgrade from GPT-4 Turbo to GPT-4o
- System Prompt Changes
    - Better LLM filter as last step to get rid of unhelpful documents
- Ask to not include URLs that do not explicitly appear in the context
- Ask LLM to explicit cite sources whenever possible. Overriding LLM
stuffing template and function in LangChain to allow DocLink and
Document # passed into LLM.
  • Loading branch information
davidgxue authored Jun 18, 2024
1 parent 5a4fed4 commit eb58b23
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 217 deletions.
8 changes: 1 addition & 7 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,7 @@ def ask_astro_load_github():
.expand(source=markdown_docs_sources)
)

issues_docs = (
task(github.extract_github_issues)
.partial(github_conn_id=_GITHUB_CONN_ID, cutoff_date=_GITHUB_ISSUE_CUTOFF_DATE)
.expand(repo_base=issues_docs_sources)
)

split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[md_docs, issues_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[md_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
25 changes: 1 addition & 24 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
_GITHUB_CONN_ID = "github_ro"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
_GITHUB_ISSUE_CUTOFF_DATE = os.environ.get("GITHUB_ISSUE_CUTOFF_DATE", "2022-1-1")

markdown_docs_sources = [
{"doc_dir": "", "repo_base": "OpenLineage/docs"},
{"doc_dir": "", "repo_base": "OpenLineage/OpenLineage"},
]

issues_docs_sources = [
"apache/airflow",
]

slack_channel_sources = [
{
"channel_name": "troubleshooting",
Expand Down Expand Up @@ -150,7 +147,6 @@ def check_seed_baseline(seed_baseline_url: str = None) -> str | set:
"extract_airflow_docs",
"extract_stack_overflow",
"extract_astro_registry_cell_types",
"extract_github_issues",
"extract_astro_blogs",
"extract_astro_registry_dags",
"extract_astro_cli_docs",
Expand Down Expand Up @@ -262,23 +258,6 @@ def extract_astro_forum_doc():

return [df]

@task(trigger_rule=TriggerRule.NONE_FAILED)
def extract_github_issues(repo_base: str):
from include.tasks.extract import github

parquet_file = f"include/data/{repo_base}/issues.parquet"

if os.path.isfile(parquet_file):
if os.access(parquet_file, os.R_OK):
df = pd.read_parquet(parquet_file)
else:
raise Exception("Parquet file exists locally but is not readable.")
else:
df = github.extract_github_issues(repo_base, _GITHUB_CONN_ID, _GITHUB_ISSUE_CUTOFF_DATE)
df.to_parquet(parquet_file)

return df

@task(trigger_rule=TriggerRule.NONE_FAILED)
def extract_astro_registry_cell_types():
from include.tasks.extract import registry
Expand Down Expand Up @@ -396,7 +375,6 @@ def import_baseline(
)

md_docs = extract_github_markdown.expand(source=markdown_docs_sources)
issues_docs = extract_github_issues.expand(repo_base=issues_docs_sources)
stackoverflow_docs = extract_stack_overflow.expand(tag=stackoverflow_tags)
registry_cells_docs = extract_astro_registry_cell_types()
blogs_docs = extract_astro_blogs()
Expand All @@ -415,7 +393,6 @@ def import_baseline(

markdown_tasks = [
md_docs,
issues_docs,
stackoverflow_docs,
blogs_docs,
registry_cells_docs,
Expand Down
6 changes: 3 additions & 3 deletions airflow/include/tasks/extract/astro_cli_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ def extract_astro_cli_docs() -> list[pd.DataFrame]:
'content': HTML content of the page
'sha': A UUID from the other fields
"""
astronomer_base_url = "https://docs.astronomer.io"
astronomer_base_url = "https://www.astronomer.io/docs"
astro_cli_overview_endpoint = "/astro/cli/overview"

response = requests.get(f"{astronomer_base_url}/{astro_cli_overview_endpoint}")
response = requests.get(f"{astronomer_base_url}{astro_cli_overview_endpoint}")
soup = BeautifulSoup(response.text, "lxml")
astro_cli_links = {
f"{astronomer_base_url}{link.get('href')}"
for link in soup.find_all("a")
if link.get("href").startswith("/astro/cli")
if link.get("href").startswith("/docs/astro/cli")
}

df = pd.DataFrame(astro_cli_links, columns=["docLink"])
Expand Down
11 changes: 6 additions & 5 deletions airflow/include/tasks/extract/astro_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links

base_url = "https://docs.astronomer.io/"
base_url = "https://www.astronomer.io/docs"


def process_astro_doc_page_content(page_content: str) -> str:
Expand Down Expand Up @@ -57,17 +57,18 @@ def process_astro_doc_page_content(page_content: str) -> str:

def extract_astro_docs(base_url: str = base_url) -> list[pd.DataFrame]:
"""
Extract documentation pages from docs.astronomer.io and its subdomains.
Extract documentation pages from www.astronomer.io/docs and its subdomains.
:return: A list of pandas dataframes with extracted data.
"""
all_links = get_internal_links(base_url, exclude_literal=["learn/tags"])
all_links = get_internal_links(base_url=base_url, exclude_literal=["learn/tags"], prefix_url=base_url)

# for software references, we only want latest docs, ones with version number (old) is removed
old_version_doc_pattern = r"^https://docs\.astronomer\.io/software/\d+\.\d+/.+$"
old_version_doc_pattern = r"^https://www\.astronomer\.io/docs/software/\d+\.\d+/.+$"
# remove duplicate xml files, we only want html pages
non_doc_links = {
link if link.endswith("xml") or re.match(old_version_doc_pattern, link) else "" for link in all_links
link if link.endswith("xml") or re.match(old_version_doc_pattern, link) or not link.startswith(base_url) else ""
for link in all_links
}
docs_links = all_links - non_doc_links

Expand Down
4 changes: 1 addition & 3 deletions airflow/include/tasks/extract/blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]:
df = pd.DataFrame(links, columns=["docLink"])
df.drop_duplicates(inplace=True)
df["content"] = df["docLink"].apply(lambda x: requests.get(x).content)
df["title"] = df["content"].apply(
lambda x: BeautifulSoup(x, "lxml").find(class_="post-card__meta").find(class_="title").get_text()
)
df["title"] = df["content"].apply(lambda x: BeautifulSoup(x, "html").find(class_="hero__title").get_text())

df["content"] = df["content"].apply(lambda x: BeautifulSoup(x, "lxml").find(class_="prose").get_text())
df["content"] = df.apply(lambda x: blog_format.format(title=x.title, content=x.content), axis=1)
Expand Down
110 changes: 0 additions & 110 deletions airflow/include/tasks/extract/github.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from __future__ import annotations

import re
from datetime import datetime
from pathlib import Path
from textwrap import dedent

import pandas as pd
import pypandoc
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from airflow.providers.github.hooks.github import GithubHook

Expand Down Expand Up @@ -179,110 +176,3 @@ def extract_github_python(source: dict, github_conn_id: str) -> pd.DataFrame:
df = df[["docSource", "sha", "content", "docLink"]]

return df


def extract_github_issues(repo_base: str, github_conn_id: str, cutoff_date: str = "2022-1-1") -> pd.DataFrame:
"""
This task downloads github issues as markdown documents in a pandas dataframe. Text from templated
auto responses for issues are removed while building a markdown document for each issue.
param repo_base: The name of organization/repository (ie. "apache/airflow") from which to extract
issues.
type repo_base: str
param github_conn_id: The connection ID to use with the GithubHook
param github_conn_id: str
param cutoff_date: The cutoff date (format: Y-m-d) to extract issues
The returned data includes the following fields:
'docSource': ie. 'astronomer/docs/astro', 'astronomer/docs/learn', etc.
'sha': the github sha for the document
'docLink': URL for the specific document in github.
'content': Entire document content in markdown format.
"""

gh_hook = GithubHook(github_conn_id)

repo = gh_hook.client.get_repo(repo_base)
issues = repo.get_issues(state="all", since=datetime.strptime(cutoff_date, "%Y-%m-%d"))

issue_autoresponse_text = "Thanks for opening your first issue here!"
pr_autoresponse_text = "Congratulations on your first Pull Request and welcome to the Apache Airflow community!"
drop_content = [issue_autoresponse_text, pr_autoresponse_text]

issues_drop_text = [
dedent(
""" <\\!--\r
.*?Licensed to the Apache Software Foundation \\(ASF\\) under one.*?under the License\\.\r
-->"""
),
"<!-- Please keep an empty line above the dashes. -->",
"<!--\r\nThank you.*?http://chris.beams.io/posts/git-commit/\r\n-->",
r"\*\*\^ Add meaningful description above.*?newsfragments\)\.",
]

issue_markdown_template = dedent(
"""
## ISSUE TITLE: {title}
DATE: {date}
BY: {user}
STATE: {state}
{body}
{comments}"""
)

comment_markdown_template = dedent(
"""
#### COMMENT: {user} on {date}
{body}\n"""
)

downloaded_docs = []
page_num = 0

page = issues.get_page(page_num)

while page:
for issue in page:
print(issue.number)
comments = []
for comment in issue.get_comments():
if not any(substring in comment.body for substring in drop_content):
comments.append(
comment_markdown_template.format(
user=comment.user.login, date=issue.created_at.strftime("%m-%d-%Y"), body=comment.body
)
)
downloaded_docs.append(
{
"docLink": issue.html_url,
"sha": "",
"content": issue_markdown_template.format(
title=issue.title,
date=issue.created_at.strftime("%m-%d-%Y"),
user=issue.user.login,
state=issue.state,
body=issue.body,
comments="\n".join(comments),
),
"docSource": f"{repo_base}/issues",
}
)
page_num = page_num + 1
page = issues.get_page(page_num)

df = pd.DataFrame(downloaded_docs)

for _text in issues_drop_text:
df["content"] = df["content"].apply(lambda x: re.sub(_text, "", x, flags=re.DOTALL))

df["content"] = df["content"].apply(lambda x: re.sub(r"\r\n+", "\n\n", x).strip())
df["content"] = df["content"].apply(lambda x: re.sub(r"\n+", "\n\n", x).strip())

df["sha"] = df.apply(generate_uuid5, axis=1)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]

return df
63 changes: 50 additions & 13 deletions airflow/include/tasks/extract/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,59 @@

modules_url = "https://api.astronomer.io/registryV2/v1alpha1/organizations/public/modules?limit=1000"
modules_link_template = "https://registry.astronomer.io/providers/{providerName}/versions/{version}/modules/{_name}"
module_info_url_template = "https://api.astronomer.io/registryV2/v1alpha1/organizations/public/providers/{provider_name}/versions/latest/modules/{module_name}"

dags_url = "https://api.astronomer.io/registryV2/v1alpha1/organizations/public/dags?limit=1000"
dags_link_template = "https://registry.astronomer.io/dags/{_name}/versions/{version}"

registry_cell_md_template = dedent(
"""
# Registry
## Provider: {providerName}

def get_individual_module_detail(provider_name, module_name):
data = requests.get(module_info_url_template.format(provider_name=provider_name, module_name=module_name)).json()
import_path = data["importPath"]

module_name = data["name"]
version = data["version"]
provider_name = data["providerName"]
description = html2text(data["description"]).strip() if data["description"] else "No Description"
description = description.replace("\n", " ")
parameters = data["parameters"]

param_details = []
param_usage = []

for param in parameters:
param_name = param["name"]
param_type = param.get("type", "UNKNOWN")
if param_type == "UNKNOWN" and "typeDef" in param and "rawAnnotation" in param["typeDef"]:
param_type = param["typeDef"]["rawAnnotation"]
required = "(REQUIRED) " if param["required"] else ""
param_details.append(
f"{param_name} ({param_type}): {required}{param.get('description', 'No Param Description')}"
)
param_usage.append(f"\t{param_name}=MY_{param_name.upper()}")

param_details_str = "\n\t".join(param_details)
param_usage_str = ",\n\t".join(param_usage)

# Format the final string
module_info = dedent(
f"""
Module Name: {module_name}
Version: {version}
Module: {module}
Module Description: {description}"""
)
Provider Name: {provider_name}
Import Statement: `from {import_path} import {module_name}`
Module Description: {description}
Parameters:
{param_details_str}
Usage Example:
f = AsyncKubernetesHook(
{param_usage_str}
)"""
)

return module_info


def extract_astro_registry_cell_types() -> list[pd.DataFrame]:
Expand Down Expand Up @@ -51,12 +92,8 @@ def extract_astro_registry_cell_types() -> list[pd.DataFrame]:
df["docSource"] = "astronomer registry modules"

df["description"] = df["description"].apply(lambda x: html2text(x) if x else "No Description")
df["content"] = df.apply(
lambda x: registry_cell_md_template.format(
providerName=x.providerName, version=x.version, module=x["name"], description=x.description
),
axis=1,
)

df["content"] = df.apply(lambda x: pd.Series(get_individual_module_detail(x.providerName, x["name"])), axis=1)

# column order matters for uuid generation
df = df[["docSource", "sha", "content", "docLink"]]
Expand Down
Loading

0 comments on commit eb58b23

Please sign in to comment.