Skip to content

Commit

Permalink
Merge pull request #155 from US-GHG-Center/merge-veda-data-airflow
Browse files Browse the repository at this point in the history
feat: support for scheduled ingest triggers
  • Loading branch information
amarouane-ABDELHAK authored Jun 14, 2024
2 parents ccbfba9 + 486dd3d commit ff2c538
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 19 deletions.
34 changes: 34 additions & 0 deletions dags/generate_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
Builds a DAG for each collection (indicated by a .json file) in the <BUCKET>/collections/ folder.
These DAGs are used to discover and ingest items for each collection.
"""
from airflow.models.variable import Variable


from veda_data_pipeline.veda_discover_pipeline import get_discover_dag


def generate_dags():
import boto3
import json

mwaa_stac_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True)
bucket = mwaa_stac_conf["EVENT_BUCKET"]

client = boto3.client("s3")
response = client.list_objects_v2(Bucket=bucket, Prefix="collections/")

for file_ in response["Contents"]:
key = file_["Key"]
if key.endswith("/"):
continue
result = client.get_object(Bucket=bucket, Key=key)
collection = result["Body"].read().decode()
collection = json.loads(collection)
if collection.get("schedule"):
get_discover_dag(
id=f"discover-{collection['collection']}", event=collection
)


generate_dags()
2 changes: 1 addition & 1 deletion dags/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
--constraint /usr/local/airflow/dags/requirements-constraints.txt
#--constraint /usr/local/airflow/dags/requirements-constraints.txt
affine==2.4.0
netCDF4==1.6.2
pydantic==1.10.4
Expand Down
15 changes: 11 additions & 4 deletions dags/veda_data_pipeline/groups/discover_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
group_kwgs = {"group_id": "Discover", "tooltip": "Discover"}


def discover_from_s3_task(ti):
def discover_from_s3_task(ti, event={}, **kwargs):
"""Discover grouped assets/files from S3 in batches of 2800. Produce a list of such files stored on S3 to process.
This task is used as part of the discover_group subdag and outputs data to EVENT_BUCKET.
"""
config = ti.dag_run.conf
config = {
**event,
**ti.dag_run.conf,
}
last_successful_execution = kwargs.get("prev_start_date_success")
if last_successful_execution:
config["last_successful_execution"] = last_successful_execution.isoformat()
# (event, chunk_size=2800, role_arn=None, bucket_output=None):
MWAA_STAC_CONF = Variable.get("MWAA_STACK_CONF", deserialize_json=True)
read_assume_arn = Variable.get("ASSUME_ROLE_READ_ARN", default_var=None)
Expand Down Expand Up @@ -53,12 +59,13 @@ def vector_raster_choice(ti):
return f"{group_kwgs['group_id']}.parallel_run_process_rasters"


def subdag_discover():
def subdag_discover(event):
with TaskGroup(**group_kwgs) as discover_grp:
discover_from_s3 = PythonOperator(
task_id="discover_from_s3",
python_callable=discover_from_s3_task,
op_kwargs={"text": "Discover from S3"},
op_kwargs={"text": "Discover from S3", "event": event},
provide_context=True,
)

raster_vector_branching = BranchPythonOperator(
Expand Down
4 changes: 3 additions & 1 deletion dags/veda_data_pipeline/groups/processing_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def subdag_process():
"environment": [
{
"name": "EXTERNAL_ROLE_ARN",
"value": Variable.get("ASSUME_ROLE_READ_ARN", default_var=''),
"value": Variable.get(
"ASSUME_ROLE_READ_ARN", default_var=""
),
},
{
"name": "BUCKET",
Expand Down
22 changes: 19 additions & 3 deletions dags/veda_data_pipeline/utils/s3_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from uuid import uuid4
from pathlib import Path

from datetime import datetime
from dateutil.tz import tzlocal
import boto3
from smart_open import open as smrt_open

Expand Down Expand Up @@ -39,7 +41,9 @@ def get_s3_resp_iterator(bucket_name, prefix, s3_client, page_size=1000):
)


def discover_from_s3(response_iterator, filename_regex: str) -> dict:
def discover_from_s3(
response_iterator, filename_regex: str, last_execution: datetime
) -> dict:
"""Iterate through pages of S3 objects returned by a ListObjectsV2 operation.
The discover_from_s3 function takes in an iterator over the pages of S3 objects returned
by a ListObjectsV2 operation. It iterates through the pages and yields each S3 object in the page as a dictionary.
Expand All @@ -56,7 +60,11 @@ def discover_from_s3(response_iterator, filename_regex: str) -> dict:
for page in response_iterator:
for s3_object in page.get("Contents", {}):
key = s3_object["Key"]
if re.match(filename_regex, key):
conditionals = [re.match(filename_regex, key)]
if last_execution:
last_modified = s3_object["LastModified"]
conditionals.append(last_modified > last_execution)
if all(conditionals):
yield s3_object


Expand Down Expand Up @@ -179,6 +187,12 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No
id_template = event.get("id_template", "{}")
date_fields = propagate_forward_datetime_args(event)
dry_run = event.get("dry_run", False)
if process_from := event.get("process_from_yyyy_mm_dd"):
process_from = datetime.strptime(process_from, "%Y-%m-%d").replace(
tzinfo=tzlocal()
)
if last_execution := event.get("last_successful_execution"):
last_execution = datetime.fromisoformat(last_execution)
if dry_run:
print("Running discovery in dry run mode")

Expand All @@ -198,7 +212,9 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No
)
file_uris = [
f"s3://{bucket}/{obj['Key']}"
for obj in discover_from_s3(s3_iterator, filename_regex)
for obj in discover_from_s3(
s3_iterator, filename_regex, last_execution=process_from or last_execution
)
]

if len(file_uris) == 0:
Expand Down
29 changes: 23 additions & 6 deletions dags/veda_data_pipeline/veda_discover_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
"prefix": "s3-prefix/",
"filename_regex": "^(.*).tif$",
"id_regex": ".*_(.*).tif$",
"process_from_yyyy_mm_dd": "YYYY-MM-DD",
"id_template": "example-id-prefix-{}",
"datetime_range": "month",
"last_successful_execution": datetime(2015,01,01),
"assets": {
"asset1": {
"title": "Asset type 1",
Expand All @@ -39,9 +41,9 @@

dag_args = {
"start_date": pendulum.today("UTC").add(days=-1),
"schedule_interval": None,
"catchup": False,
"doc_md": dag_doc_md,
"is_paused_upon_creation": False,
}

templat_dag_run_conf = {
Expand All @@ -66,10 +68,25 @@
},
}

with DAG("veda_discover", params=templat_dag_run_conf, **dag_args) as dag:
start = DummyOperator(task_id="Start", dag=dag)
end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag)

discover_grp = subdag_discover()
def get_discover_dag(id, event={}):
params_dag_run_conf = event or templat_dag_run_conf
with DAG(
id,
schedule_interval=event.get("schedule"),
params=params_dag_run_conf,
**dag_args
) as dag:
start = DummyOperator(task_id="Start", dag=dag)
end = DummyOperator(
task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag
)

start >> discover_grp >> end
discover_grp = subdag_discover(event)

start >> discover_grp >> end

return dag


get_discover_dag("veda_discover")
12 changes: 9 additions & 3 deletions dags/veda_data_pipeline/veda_process_vector_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@
start = DummyOperator(task_id="Start", dag=dag)
end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag)

mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", default_var={}, deserialize_json=True)
vector_ecs_conf = Variable.get("VECTOR_ECS_CONF", default_var={}, deserialize_json=True)
mwaa_stack_conf = Variable.get(
"MWAA_STACK_CONF", default_var={}, deserialize_json=True
)
vector_ecs_conf = Variable.get(
"VECTOR_ECS_CONF", default_var={}, deserialize_json=True
)

ingest_vector = EcsRunTaskOperator(
task_id="ingest_vector",
Expand All @@ -77,7 +81,9 @@
"environment": [
{
"name": "EXTERNAL_ROLE_ARN",
"value": Variable.get("ASSUME_ROLE_READ_ARN", default_var=None),
"value": Variable.get(
"ASSUME_ROLE_READ_ARN", default_var=None
),
},
{
"name": "AWS_REGION",
Expand Down
2 changes: 1 addition & 1 deletion docker_tasks/build_stac/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ awslambdaric
boto3
pystac==1.4.0
python-cmr
rasterio==1.3.0
rasterio==1.3.6
rio-stac>=0.8.0
shapely
smart-open==6.3.0
Expand Down
2 changes: 2 additions & 0 deletions docker_tasks/build_stac/utils/regex.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def extract_dates(
Extracts start & end or single date string from filename.
"""
DATE_REGEX_STRATEGIES = [
(r"_(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})", "%Y-%m-%dT%H:%M:%S"),
(r"_(\d{8}T\d{6})", "%Y%m%dT%H%M%S"),
(r"_(\d{4}_\d{2}_\d{2})", "%Y_%m_%d"),
(r"_(\d{4}-\d{2}-\d{2})", "%Y-%m-%d"),
(r"_(\d{8})", "%Y%m%d"),
Expand Down

0 comments on commit ff2c538

Please sign in to comment.