Skip to content

Commit

Permalink
create validate and extract dcatus catalog workflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
rshewitt committed Aug 21, 2023
1 parent 8b8e1f0 commit 685c40d
Showing 1 changed file with 39 additions and 41 deletions.
80 changes: 39 additions & 41 deletions airflow/dags/etl_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
from airflow import DAG

Check failure on line 1 in airflow/dags/etl_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (F401)

airflow/dags/etl_pipeline.py:1:21: F401 `airflow.DAG` imported but unused
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import time

Check failure on line 4 in airflow/dags/etl_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (F401)

airflow/dags/etl_pipeline.py:4:8: F401 `time` imported but unused
import json
from uuid import uuid4
import os
from load import create_s3_client, create_s3_upload_data, get_s3_object, upload_to_S3

# from uuid import uuid4
from pathlib import Path
from jsonschema import Draft202012Validator
import requests

BUCKET_NAME = "test-bucket"
# s3 stuff

S3_CONFIG = {
"aws_access_key_id": "_placeholder",
"aws_secret_access_key": "_placeholder",
"region_name": "us-east-1",
"endpoint_url": "http://127.0.0.1:4566",
}

S3_CLIENT = create_s3_client(S3_CONFIG)
FILEKEY = str(uuid4())
# BUCKET_NAME = "test-bucket"
# S3_CONFIG = {
# "aws_access_key_id": "_placeholder",
# "aws_secret_access_key": "_placeholder",
# "region_name": "us-east-1",
# "endpoint_url": "http://127.0.0.1:4566",
# }
# S3_CLIENT = create_s3_client(S3_CONFIG)
# FILEKEY = str(uuid4())

"""
- test passing data via xcom for all tasks
- don't worry about s3 for right now
"""

ROOT_DIR = Path(__file__).parents[2]
SCHEMA_DIR = ROOT_DIR / "schemas"
CATALOG_SCHEMA = SCHEMA_DIR / "catalog.json"
DATASET_SCHEMA = SCHEMA_DIR / "dataset.json"

with open(DATASET_SCHEMA) as json_file:
dcatus_dataset_schema = json.load(json_file)


@dag(
"datagov_etl_pipeline",
Expand All @@ -38,11 +45,12 @@
def etl_pipeline():
@task(task_id="extract_dcatus")
def extract():
url = "https://data.ny.gov/api/views/5xaw-6ayf/rows.json?accessType=DOWNLOAD"
url = "https://github.com/GSA/ckanext-datajson/blob/main/ckanext/datajson/tests/datajson-samples/large-spatial.data.json"
# url = "https://github.com/GSA/ckanext-datajson/blob/main/ckanext/datajson/tests/datajson-samples/ny.data.json"
res = requests.get(url)
if res.status_code == 200:
dcatus = res.json()
return dcatus["dataset"]
data = res.json()["payload"]["blob"]["rawLines"]
return json.loads("".join(data))["dataset"]

# try:
# catalog = download_json(job_info["url"])
Expand All @@ -64,27 +72,14 @@ def extract():

# return output

return

@tast(task_id="validate_dcatus")
def validate():
# success = None
# error_message = ""

# validator = Draft202012Validator(dataset_schema)

# try:
# validator.validate(json_data)
# success = True
# error_message = "no errors"
# except ValidationError:
# success = False
# errors = validator.iter_errors(json_data)
# error_message = parse_errors(errors)

# return success, error_message

return
@task(task_id="validate_dcatus")
def validate(dcatus_record):
validator = Draft202012Validator(dcatus_dataset_schema)
try:
validator.validate(dcatus_record)
return dcatus_record
except:

Check failure on line 81 in airflow/dags/etl_pipeline.py

View workflow job for this annotation

GitHub Actions / Python Lint

Ruff (E722)

airflow/dags/etl_pipeline.py:81:9: E722 Do not use bare `except`
return False

@task(task_id="transform_dcatus")
def transform():
Expand All @@ -101,9 +96,12 @@ def load():

# extract() >> transform() >> load()

list_filenames = extract()
validated = list_filenames.output.map(validate)
validated.output.map(load)
# list_filenames = extract()
# validated = list_filenames.output.map(validate)

validate.expand(dcatus_record=extract())

# validated.output.map(load)


_ = etl_pipeline()

1 comment on commit 685c40d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
harvester
   __init__.py30100% 
harvester/extract
   __init__.py1922 89%
   dcatus.py1122 82%
harvester/utils
   __init__.py00100% 
   json.py2266 73%
   pg.py3544 89%
   s3.py2466 75%
harvester/validate
   __init__.py00100% 
   dcat_us.py240100% 
TOTAL1382086% 

Tests Skipped Failures Errors Time
22 0 💤 0 ❌ 0 🔥 25.014s ⏱️

Please sign in to comment.