Skip to content

Commit

Permalink
working DAG; expanded validation task
Browse files Browse the repository at this point in the history
  • Loading branch information
btylerburton committed Aug 21, 2023
1 parent d73aecc commit 601946a
Show file tree
Hide file tree
Showing 9 changed files with 847 additions and 61 deletions.
10 changes: 10 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// .devcontainer/devcontainer.json
{
"name": "Airflow Container",
"dockerComposeFile": "docker-compose.yaml",
"service": "airflow",
"workspaceFolder": "/home/airflow",
"extensions": [
"ms-python.python"
]
}
50 changes: 50 additions & 0 deletions airflow/dags/dcatus_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json

Check failure on line 1 in airflow/dags/dcatus_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (F401)

airflow/dags/dcatus_pipeline.py:1:8: F401 `json` imported but unused

Check failure on line 1 in airflow/dags/dcatus_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (F401)

airflow/dags/dcatus_pipeline.py:1:8: F401 `json` imported but unused
import pendulum
import requests

from pathlib import Path
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from includes.validate.dcat_us import validate_json_schema
from includes.utils.json_util import open_json

SRC = Dataset(
"https://raw.githubusercontent.com/GSA/catalog.data.gov/main/tests/harvest-sources/data.json"
)
now = pendulum.now()


def make_path(fileName):
return Path(__file__).parents[1] / "data" / "schemas" / fileName


@dag(
"dcatus_pipeline",
start_date=datetime(year=2023, month=2, day=5),
catchup=False,
tags=["etl"],
schedule_interval=timedelta(minutes=2),
)
def dcatus_pipeline():
@task(task_id="extract_dcatus")
def extract(src: Dataset) -> list:
resp = requests.get(url=src.uri)
data = resp.json()
print(data)
return data["dataset"]

@task(task_id="validate_dcatus")
def validate(dataset: Dataset) -> Dataset:
file_path = make_path("dataset.json")
print("file path ::: {file_path}")
dataset_schema = open_json(file_path)
[success, error] = validate_json_schema(dataset, dataset_schema)
print(f"success: {success}, error: {error}")
return True if success else False

dataset = extract(SRC)
validated = validate.expand(dataset=dataset)

Check failure on line 47 in airflow/dags/dcatus_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (F841)

airflow/dags/dcatus_pipeline.py:47:5: F841 Local variable `validated` is assigned to but never used

Check failure on line 47 in airflow/dags/dcatus_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (F841)

airflow/dags/dcatus_pipeline.py:47:5: F841 Local variable `validated` is assigned to but never used


dcatus_pipeline()
16 changes: 0 additions & 16 deletions airflow/dags/etl_pipeline.py

This file was deleted.

23 changes: 0 additions & 23 deletions airflow/dags/extract.py

This file was deleted.

21 changes: 0 additions & 21 deletions airflow/dags/transform.py

This file was deleted.

Loading

1 comment on commit 601946a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
harvester
   __init__.py30100% 
harvester/extract
   __init__.py1922 89%
   dcatus.py1122 82%
harvester/utils
   __init__.py00100% 
   json.py2266 73%
   pg.py3544 89%
   s3.py2466 75%
harvester/validate
   __init__.py00100% 
   dcat_us.py240100% 
TOTAL1382086% 

Tests Skipped Failures Errors Time
22 0 💤 0 ❌ 0 🔥 24.375s ⏱️

Please sign in to comment.