Skip to content

Commit

Permalink
added a function to generate DAGs
Browse files Browse the repository at this point in the history
Each designated Landsat8 area shall have its own DAG. The DAG
gets generated by the new generate_Dag function
  • Loading branch information
ricardogsilva committed Sep 5, 2017
1 parent 59b66e2 commit dd4f1cf
Showing 1 changed file with 108 additions and 92 deletions.
200 changes: 108 additions & 92 deletions airflow/dags/landsat8/search_download_daraa.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
import logging

from airflow.models import DAG
from airflow.operators import BaseOperator
from airflow.operators import BashOperator
from airflow.operators import GDALAddoOperator
from airflow.operators import GDALTranslateOperator
from airflow.operators import Landsat8DownloadOperator
Expand All @@ -16,12 +15,112 @@

from landsat8.secrets import postgresql_credentials

Landsat8Area = namedtuple("Landsat8Area", [
"name",
"path",
"row",
"bands"
])

daraa_dag = DAG(
'Search_daraa_Landsat8',
description='DAG for searching Daraa AOI in Landsat8 data from '
'scenes_list',
default_args={

def generate_dag(area, default_args):
"""Generate Landsat8 ingestion DAGs.
Parameters
----------
scene: Landsat8Area
Configuration parameters for the Landsat8 area to be downloaded
default_args: dict
Default arguments for all tasks in the DAG.
"""

dag = DAG(
"Search_{}_Landsat8".format(area.name),
description="DAG for searching and ingesting {} AOI in Landsat8 data "
"from scene_list".format(area.name),
default_args=default_args,
dagrun_timeout=timedelta(hours=1),
schedule_interval=timedelta(days=1),
catchup=False,
params={
"area": area,
}
)
search_task = Landsat8SearchOperator(
task_id='landsat8_search_{}'.format(area.name),
area=area,
cloud_coverage=90.9,
db_credentials=postgresql_credentials,
dag=dag
)
download_task = Landsat8DownloadOperator(
task_id='landsat8_download_{}'.format(area.name),
download_dir="/var/data/download",
area=area,
get_inputs_from=search_task.task_id,
dag=dag
)
translate_task = GDALTranslateOperator(
task_id='landsat8_translate_{}'.format(area.name),
get_inputs_from=download_task.task_id,
dag=dag
)
# TODO: Work-in-progress
#addo_task = GDALAddoOperator(
# task_id='landsat8_addo_{}'.format(area.name),
# xcom=XComPull(
# dag_id=dag.dag_id,
# task_id="landsat8_translate_{}".format(area.name),
# key_srcfile="translated_scenes_dir"
# ),
# resampling_method="average",
# max_overview_level=128,
# compress_overview="PACKBITS",
# photometric_overview="MINISBLACK",
# interleave_overview="",
# dag=dag
#)
product_json_task = Landsat8MTLReaderOperator(
task_id='landsat8_product_json',
loc_base_dir='/efs/geoserver_data/coverages/landsat8/daraa',
metadata_xml_path='./geo-solutions-work/evo-odas/metadata-ingestion/'
'templates/metadata.xml',
dag=dag
)
product_thumbnail_task = Landsat8ThumbnailOperator(
task_id='landsat8_product_thumbnail',
thumb_size_x="64",
thumb_size_y="64",
dag=dag
)
product_description_task = Landsat8ProductDescriptionOperator(
description_template='./geo-solutions-work/evo-odas/metadata-ingestion/'
'templates/product_abstract.html',
task_id='landsat8_product_description',
dag=dag
)

product_zip_task = Landsat8ProductZipFileOperator(
task_id='landsat8_product_zip',
dag=dag
)
download_task.set_upstream(search_task)
translate_task.set_upstream(download_task)
addo_task.set_upstream(translate_task)
product_json_task.set_upstream(addo_task)
product_thumbnail_task.set_upstream(product_json_task)
product_description_task.set_upstream(product_thumbnail_task)
product_zip_task.set_upstream(product_description_task)
return dag


AREAS = [
Landsat8Area(name="daraa", path=174, row=37, bands=[1]),
]

for area in AREAS:
dag = generate_dag(area, default_args={
'start_date': datetime(2017, 1, 1),
'owner': 'airflow',
'depends_on_past': False,
Expand All @@ -31,88 +130,5 @@
'email_on_retry': False,
'retries': 1,
'max_threads': 1,
},
dagrun_timeout=timedelta(hours=1),
schedule_interval=timedelta(days=1),
catchup=False
)

search_daraa_task = Landsat8SearchOperator(
task_id='landsat8_search_daraa_task',
cloud_coverage=90.9,
path=174,
row=37,
pgdbname=postgresql_credentials['dbname'],
pghostname=postgresql_credentials['hostname'],
pgport=postgresql_credentials['port'],
pgusername=postgresql_credentials['username'],
pgpassword=postgresql_credentials['password'],
dag=daraa_dag
)

download_daraa_task = Landsat8DownloadOperator(
task_id='landsat8_download_daraa_task',
download_dir="/var/data/download",
bands=[1],
dag=daraa_dag
)

translate_daraa_task = GDALTranslateOperator(
task_id='landsat8_translate_daraa_task',
working_dir="/var/data/download",
blockx_size="512",
blocky_size="512",
tiled="YES",
b="1",
ot="UInt16",
of="GTiff",
dag=daraa_dag
)

addo_daraa_task = GDALAddoOperator(
task_id='landsat8_addo_daraa_task',
xk_pull_dag_id='Search_daraa_Landsat8',
xk_pull_task_id='landsat8_translate_daraa_task',
xk_pull_key_srcfile='translated_scenes_dir',
resampling_method="average",
max_overview_level=128,
compress_overview="PACKBITS",
photometric_overview="MINISBLACK",
interleave_overview="",
dag=daraa_dag
)

product_json_task = Landsat8MTLReaderOperator(
task_id='landsat8_product_json_task',
loc_base_dir='/efs/geoserver_data/coverages/landsat8/daraa',
metadata_xml_path='./geo-solutions-work/evo-odas/metadata-ingestion/'
'templates/metadata.xml',
dag=daraa_dag
)

product_thumbnail_task = Landsat8ThumbnailOperator(
task_id='landsat8_product_thumbnail_task',
thumb_size_x="64",
thumb_size_y="64",
dag=daraa_dag
)

product_description_task = Landsat8ProductDescriptionOperator(
description_template='./geo-solutions-work/evo-odas/metadata-ingestion/'
'templates/product_abstract.html',
task_id='landsat8_product_description_task',
dag=daraa_dag
)

product_zip_task = Landsat8ProductZipFileOperator(
task_id='landsat8_product_zip_task',
dag=daraa_dag
)

search_daraa_task.set_downstream(download_daraa_task)
download_daraa_task.set_downstream(translate_daraa_task)
translate_daraa_task.set_downstream(addo_daraa_task)
addo_daraa_task.set_downstream(product_json_task)
product_json_task.set_downstream(product_thumbnail_task)
product_thumbnail_task.set_downstream(product_description_task)
product_description_task.set_downstream(product_zip_task)
})
globals()[dag.dag_id] = dag

0 comments on commit dd4f1cf

Please sign in to comment.