diff --git a/.gitignore b/.gitignore index cf06597..ce2d4cb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,5 @@ # Custom .idea -gdal* airflow.db airflow.cfg unittests.cfg diff --git a/airflow/dags/landsat8/landsat_update_scene_list.py b/airflow/dags/landsat8/landsat_update_scene_list.py index d537ae5..50cb783 100644 --- a/airflow/dags/landsat8/landsat_update_scene_list.py +++ b/airflow/dags/landsat8/landsat_update_scene_list.py @@ -1,72 +1,62 @@ -from airflow import DAG from datetime import datetime, timedelta -from airflow.operators import BaseOperator, BashOperator, DownloadSceneList, ExtractSceneList, UpdateSceneList -from airflow.plugins_manager import AirflowPlugin -from airflow.utils.decorators import apply_defaults -from landsat8.secrets import postgresql_credentials - -################################################## -# General and shared configuration between tasks -################################################## -update_scene_list_default_args = { - 'start_date': datetime(2017, 1, 1), - 'owner': 'airflow', - 'depends_on_past': False, - 'provide_context': True, - 'email': ['xyz@xyz.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'max_threads': 1, - 'download_dir': '/var/data/download/' -} - -###################################################### -# Task specific configuration -###################################################### -download_scene_list_args = { - 'download_url':'http://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz' -} +from getpass import getuser +import os -update_scene_list_args = {\ - 'pg_dbname' : postgresql_credentials['dbname'], - 'pg_hostname' : postgresql_credentials['hostname'], - 'pg_password' : postgresql_credentials['password'], - 'pg_username' : postgresql_credentials['username'], - 'pg_port' : postgresql_credentials['port'] -} +from airflow import DAG +from airflow.operators import DownloadSceneList +from airflow.operators import ExtractSceneList +from airflow.operators import UpdateSceneList -####################################################### -# DAG definition -####################################################### -landsat8_scene_list = DAG('Landsat8_Scene_List', - description='DAG for downloading, extracting, and importing scene_list.gz into postgres db', - default_args=update_scene_list_default_args, - dagrun_timeout=timedelta(hours=1), - schedule_interval=timedelta(days=1), - catchup=False) +from landsat8.secrets import postgresql_credentials -###################################################### -# Tasks difinition -###################################################### +# These ought to be moved to a more central place where other settings might +# be stored +DOWNLOAD_URL = 'http://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz' +DOWNLOAD_DIR = os.path.join(os.path.expanduser("~"), "download") + +landsat8_scene_list = DAG( + 'Landsat8_Scene_List', + description='DAG for downloading, extracting, and importing scene_list.gz ' + 'into postgres db', + default_args={ + "start_date": datetime(2017, 1, 1), + "owner": getuser(), + "depends_on_past": False, + "provide_context": True, + "email": ["xyz@xyz.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, # TODO: change back to 1 + "max_threads": 1, + "download_dir": DOWNLOAD_DIR, + "download_url": DOWNLOAD_URL, + }, + dagrun_timeout=timedelta(hours=1), + schedule_interval=timedelta(days=1), + catchup=False +) + +# more info on Landsat products on AWS at: +# https://aws.amazon.com/public-datasets/landsat/ download_scene_list_gz = DownloadSceneList( - task_id= 'download_scene_list_gz_task', - download_url = download_scene_list_args['download_url'], - #download_dir = download_scene_list_args['download_dir'], - dag = landsat8_scene_list) + task_id='download_scene_list_gz', + dag=landsat8_scene_list +) extract_scene_list = ExtractSceneList( - task_id = 'extract_scene_list_task', - #download_dir = extract_scene_list_args['download_dir'] , - dag = landsat8_scene_list) - -update_scene_list_db = UpdateSceneList(\ - task_id = 'update_scene_list_task', - pg_dbname = update_scene_list_args['pg_dbname'] , - pg_hostname = update_scene_list_args['pg_hostname'], - pg_port = update_scene_list_args['pg_port'], - pg_username = update_scene_list_args['pg_username'], - pg_password = update_scene_list_args['pg_password'], - dag = landsat8_scene_list ) - -download_scene_list_gz >> extract_scene_list >> update_scene_list_db + task_id='extract_scene_list', + dag=landsat8_scene_list +) + +update_scene_list_db = UpdateSceneList( + task_id='update_scene_list', + pg_dbname=postgresql_credentials['dbname'], + pg_hostname=postgresql_credentials['hostname'], + pg_port=postgresql_credentials['port'], + pg_username=postgresql_credentials['username'], + pg_password=postgresql_credentials['password'], + dag=landsat8_scene_list +) + +download_scene_list_gz.set_downstream(extract_scene_list) +extract_scene_list.set_downstream(update_scene_list_db) diff --git a/airflow/dags/landsat8/search_download_daraa.py b/airflow/dags/landsat8/search_download_daraa.py index a7d7f95..55ad625 100644 --- a/airflow/dags/landsat8/search_download_daraa.py +++ b/airflow/dags/landsat8/search_download_daraa.py @@ -1,147 +1,174 @@ -from airflow.models import DAG -from airflow.operators import BaseOperator, BashOperator, Landsat8SearchOperator, Landsat8DownloadOperator, GDALTranslateOperator, GDALAddoOperator, Landsat8MTLReaderOperator, Landsat8ThumbnailOperator, Landsat8ProductDescriptionOperator, Landsat8ProductZipFileOperator -from landsat8.secrets import postgresql_credentials -import logging +from collections import namedtuple from datetime import datetime from datetime import timedelta +import os +from airflow.models import DAG +from airflow.operators import GDALAddoOperator +from airflow.operators import GDALTranslateOperator +from airflow.operators import Landsat8DownloadOperator +from airflow.operators import Landsat8MTLReaderOperator +from airflow.operators import Landsat8ProductDescriptionOperator +from airflow.operators import Landsat8ProductZipFileOperator +from airflow.operators import Landsat8SearchOperator +from airflow.operators import Landsat8ThumbnailOperator -################################################## -# General and shared configuration between tasks -################################################## -daraa_default_args = { - 'start_date': datetime(2017, 1, 1), - 'owner': 'airflow', - 'depends_on_past': False, - 'provide_context': True, - 'email': ['xyz@xyz.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'max_threads': 1, -} -###################################################### -# Task specific configuration -###################################################### -daraa_search_args = {\ - #'acquisition_date': '2017-04-11 05:36:29.349932', - 'cloud_coverage': 90.9, - 'path': 174, - 'row' : 37, - 'pgdbname':postgresql_credentials['dbname'], - 'pghostname':postgresql_credentials['hostname'], - 'pgport':postgresql_credentials['port'], - 'pgusername':postgresql_credentials['username'], - 'pgpassword':postgresql_credentials['password'], -} - - -daraa_download_args = {\ - 'download_dir': '/var/data/download/', - 'number_of_bands' : 1 -} - -daraa_translate_args = {\ - 'working_dir' : '/var/data/download/', - 'blockx_size' : '512', - 'blocky_size' : '512', - 'tiled' : 'YES', - 'b' : '1', - 'ot' : 'UInt16', - 'of' : 'GTiff', - 'outsize' : '', - 'scale' : '' -} - -daraa_addo_args = {\ - 'resampling_method' : 'average', - 'max_overview_level' : 128, - 'compress_overview' : 'PACKBITS', - 'photometric_overview' : 'MINISBLACK', - 'interleave_overview' : '' -} - -product_thumbnail_args = {\ - 'thumb_size_x': '64', - 'thumb_size_y': '64' -} - -metadata_args = {\ - 'metadata_xml_path':'./geo-solutions-work/evo-odas/metadata-ingestion/templates/metadata.xml', - 'description_template':'./geo-solutions-work/evo-odas/metadata-ingestion/templates/product_abstract.html', - 'loc_base_dir':'/efs/geoserver_data/coverages/landsat8/daraa' -} -# DAG definition -daraa_dag = DAG('Search_daraa_Landsat8', - description='DAG for searching Daraa AOI in Landsat8 data from scenes_list', - default_args=daraa_default_args, - dagrun_timeout=timedelta(hours=1), - schedule_interval=timedelta(days=1), - catchup=False) - -# Landsat Search Task Operator -search_daraa_task = Landsat8SearchOperator(\ - task_id = 'landsat8_search_daraa_task', - cloud_coverage = daraa_search_args['cloud_coverage'], - path = daraa_search_args['path'], - row = daraa_search_args['row'], - pgdbname = daraa_search_args['pgdbname'], - pghostname = daraa_search_args['pghostname'], - pgport = daraa_search_args['pgport'], - pgusername = daraa_search_args['pgusername'], - pgpassword = daraa_search_args['pgpassword'], - dag = daraa_dag -) +from landsat8.secrets import postgresql_credentials -# Landsat Download Task Operator -download_daraa_task = Landsat8DownloadOperator(\ - task_id= 'landsat8_download_daraa_task', - download_dir = daraa_download_args['download_dir'], - number_of_bands = daraa_download_args['number_of_bands'], - dag = daraa_dag +# These ought to be moved to a more central place where other settings might +# be stored +PROJECT_ROOT = os.path.dirname( + os.path.dirname( + os.path.dirname( + os.path.dirname(__file__) + ) + ) ) - -# Landsat gdal_translate Task Operator -translate_daraa_task = GDALTranslateOperator(\ - task_id= 'landsat8_translate_daraa_task', - working_dir = daraa_translate_args["working_dir"], - blockx_size = daraa_translate_args['blockx_size'], - blocky_size = daraa_translate_args['blocky_size'], - tiled = daraa_translate_args['tiled'], - b = daraa_translate_args['b'], - ot = daraa_translate_args['ot'], - of = daraa_translate_args['of'], - dag = daraa_dag) - -addo_daraa_task = GDALAddoOperator(\ - task_id = 'landsat8_addo_daraa_task', - xk_pull_dag_id = 'Search_daraa_Landsat8', - xk_pull_task_id = 'landsat8_translate_daraa_task', - xk_pull_key_srcfile = 'translated_scenes_dir', - resampling_method = daraa_addo_args['resampling_method'], - max_overview_level = daraa_addo_args['max_overview_level'], - compress_overview = daraa_addo_args['compress_overview'], - photometric_overview = daraa_addo_args['photometric_overview'], - interleave_overview = daraa_addo_args['interleave_overview'], - dag = daraa_dag) - -product_json_task = Landsat8MTLReaderOperator(\ - task_id = 'landsat8_product_json_task', loc_base_dir = metadata_args["loc_base_dir"], metadata_xml_path = metadata_args["metadata_xml_path"] , dag = daraa_dag) - -product_thumbnail_task = Landsat8ThumbnailOperator(\ - task_id = 'landsat8_product_thumbnail_task', - thumb_size_x = product_thumbnail_args['thumb_size_x'], - thumb_size_y = product_thumbnail_args['thumb_size_y'], - dag = daraa_dag) - -product_description_task = Landsat8ProductDescriptionOperator(\ - description_template = metadata_args["description_template"], - task_id = 'landsat8_product_description_task', - dag = daraa_dag) - -product_zip_task = Landsat8ProductZipFileOperator(\ - task_id = 'landsat8_product_zip_task', - dag = daraa_dag) - - -search_daraa_task >> download_daraa_task >> translate_daraa_task >> addo_daraa_task >> product_json_task >> product_thumbnail_task >> product_description_task >> product_zip_task +DOWNLOAD_DIR = os.path.join(os.path.expanduser("~"), "download") +TEMPLATES_PATH = os.path.join(PROJECT_ROOT, "metadata-ingestion", "templates") + +Landsat8Area = namedtuple("Landsat8Area", [ + "name", + "path", + "row", + "bands" +]) + + +AREAS = [ + Landsat8Area(name="daraa", path=174, row=37, bands=[1, 2, 3]), + # These are just some dummy areas in order to test generation of + # multiple DAGs + Landsat8Area(name="neighbour", path=175, row=37, bands=[1, 2, 3, 7]), + Landsat8Area(name="other", path=176, row=37, bands=range(1, 12)), +] + + +def generate_dag(area, download_dir, default_args): + """Generate Landsat8 ingestion DAGs. + + Parameters + ---------- + area: Landsat8Area + Configuration parameters for the Landsat8 area to be downloaded + default_args: dict + Default arguments for all tasks in the DAG. + + """ + + dag = DAG( + "Search_{}_Landsat8".format(area.name), + description="DAG for searching and ingesting {} AOI in Landsat8 data " + "from scene_list".format(area.name), + default_args=default_args, + dagrun_timeout=timedelta(hours=1), + schedule_interval=timedelta(days=1), + catchup=False, + params={ + "area": area, + } + ) + search_task = Landsat8SearchOperator( + task_id='search_{}'.format(area.name), + area=area, + cloud_coverage=90.9, + db_credentials=postgresql_credentials, + dag=dag + ) + generate_html_description = Landsat8ProductDescriptionOperator( + task_id='generate_html_description', + description_template=os.path.join( + TEMPLATES_PATH, "product_abstract.html"), + download_dir=download_dir, + dag=dag + ) + download_thumbnail = Landsat8DownloadOperator( + task_id="download_thumbnail", + download_dir=download_dir, + get_inputs_from=search_task.task_id, + url_fragment="thumb_small.jpg", + dag=dag + ) + generate_thumbnail = Landsat8ThumbnailOperator( + task_id='generate_thumbnail', + get_inputs_from=download_thumbnail.task_id, + thumb_size_x="64", + thumb_size_y="64", + dag=dag + ) + download_metadata = Landsat8DownloadOperator( + task_id="download_metadata", + download_dir=download_dir, + get_inputs_from=search_task.task_id, + url_fragment="MTL.txt", + dag=dag + ) + generate_metadata = Landsat8MTLReaderOperator( + task_id='generate_metadata', + get_inputs_from=download_metadata.task_id, + loc_base_dir='/efs/geoserver_data/coverages/landsat8/{}'.format( + area.name), + metadata_xml_path=os.path.join(TEMPLATES_PATH, "metadata.xml"), + dag=dag + ) + + product_zip_task = Landsat8ProductZipFileOperator( + task_id='landsat8_product_zip', + get_inputs_from=[ + generate_html_description.task_id, + generate_metadata.task_id, + generate_thumbnail.task_id, + ], + output_dir=download_dir, + dag=dag + ) + for band in area.bands: + download_band = Landsat8DownloadOperator( + task_id="download_band{}".format(band), + download_dir=download_dir, + get_inputs_from=search_task.task_id, + url_fragment="B{}.TIF".format(band), + dag=dag + ) + translate = GDALTranslateOperator( + task_id="translate_band{}".format(band), + get_inputs_from=download_band.task_id, + dag=dag + ) + addo = GDALAddoOperator( + task_id="add_overviews_band{}".format(band), + get_inputs_from=translate.task_id, + resampling_method="average", + max_overview_level=128, + compress_overview="PACKBITS", + dag=dag + ) + download_band.set_upstream(search_task) + translate.set_upstream(download_band) + addo.set_upstream(translate) + + + download_thumbnail.set_upstream(search_task) + download_metadata.set_upstream(search_task) + generate_metadata.set_upstream(download_metadata) + generate_thumbnail.set_upstream(download_thumbnail) + generate_html_description.set_upstream(search_task) + product_zip_task.set_upstream(generate_html_description) + product_zip_task.set_upstream(generate_metadata) + product_zip_task.set_upstream(generate_thumbnail) + return dag + + +for area in AREAS: + dag = generate_dag(area, download_dir=DOWNLOAD_DIR, default_args={ + 'start_date': datetime(2017, 1, 1), + 'owner': 'airflow', + 'depends_on_past': False, + 'provide_context': True, + 'email': ['xyz@xyz.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'max_threads': 1, + }) + globals()[dag.dag_id] = dag diff --git a/airflow/plugins/gdal_plugin.py b/airflow/plugins/gdal_plugin.py index 0f5ddf9..9df9dd7 100644 --- a/airflow/plugins/gdal_plugin.py +++ b/airflow/plugins/gdal_plugin.py @@ -1,21 +1,58 @@ +from itertools import count +import logging +import os + from airflow.operators import BashOperator from airflow.operators import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults -import os -import logging -import math -import json -import shutil -from pgmagick import Image, Geometry -import zipfile log = logging.getLogger(__name__) + +def get_overview_levels(max_level): + levels = [] + current = 2 + while current <= max_level: + levels.append(current) + current *= 2 + return levels + + +def get_gdaladdo_command(source, overview_levels, resampling_method, + compress_overview=None): + compress_token = ( + "--config COMPRESS_OVERVIEW {}".format(compress_overview) if + compress_overview is not None else "" + ) + return "gdaladdo {compress} -r {method} {src} {levels}".format( + method=resampling_method, + compress=compress_token, + src=source, + levels=" ".join(str(level) for level in overview_levels), + ) + + +def get_gdal_translate_command(source, destination, output_type, + creation_options): + return ( + "gdal_translate -ot {output_type} {creation_opts} " + "{src} {dst}".format( + output_type=output_type, + creation_opts=_get_gdal_creation_options(**creation_options), + src=source, + dst=destination + ) + ) + + class GDALWarpOperator(BaseOperator): @apply_defaults - def __init__(self, target_srs, tile_size, overwrite, srcfile=None, dstdir=None, xk_pull_dag_id=None, xk_pull_task_id=None, xk_pull_key_srcfile=None, xk_pull_key_dstdir=None, xk_push_key=None, *args, **kwargs): + def __init__(self, target_srs, tile_size, overwrite, srcfile=None, + dstdir=None, xk_pull_dag_id=None, xk_pull_task_id=None, + xk_pull_key_srcfile=None, xk_pull_key_dstdir=None, + xk_push_key=None, *args, **kwargs): self.target_srs = target_srs self.tile_size = str(tile_size) self.overwrite = overwrite @@ -26,13 +63,11 @@ def __init__(self, target_srs, tile_size, overwrite, srcfile=None, dstdir=None, self.xk_pull_key_srcfile = xk_pull_key_srcfile self.xk_pull_key_dstdir = xk_pull_key_dstdir self.xk_push_key = xk_push_key - super(GDALWarpOperator, self).__init__(*args, **kwargs) def execute(self, context): log.info('--------------------GDAL_PLUGIN Warp running------------') task_instance = context['task_instance'] - log.info(""" target_srs: {} tile_size: {} @@ -67,7 +102,11 @@ def execute(self, context): if self.xk_pull_key_srcfile is None: self.xk_pull_key_srcfile = 'srcdir' log.debug('Fetching srcfile from XCom') - srcfile = task_instance.xcom_pull(task_ids=self.xk_pull_task_id, key=self.xk_pull_key_srcfile, dag_id=self.xk_pull_dag_id) + srcfile = task_instance.xcom_pull( + task_ids=self.xk_pull_task_id, + key=self.xk_pull_key_srcfile, + dag_id=self.xk_pull_dag_id + ) if srcfile is None: log.warn('No srcfile fetched from XCom. Nothing to do') return False @@ -85,7 +124,11 @@ def execute(self, context): if self.xk_pull_key_dstdir is None: self.xk_pull_key_dstdir = 'dstdir' log.debug('Fetching dstdir from XCom') - dstdir = task_instance.xcom_pull(task_ids=self.xk_pull_task_id, key=self.xk_pull_key_dstdir, dag_id=context['dag'].dag_id) + dstdir = task_instance.xcom_pull( + task_ids=self.xk_pull_task_id, + key=self.xk_pull_key_dstdir, + dag_id=context['dag'].dag_id + ) log.info('No dstdir fetched from XCom') # not found in XCom? use source directory if dstdir is None: @@ -98,7 +141,12 @@ def execute(self, context): # build gdalwarp command self.overwrite = '-overwrite' if self.overwrite else '' - gdalwarp_command = 'gdalwarp ' + self.overwrite + ' -t_srs ' + self.target_srs + ' -co TILED=YES -co BLOCKXSIZE=' + self.tile_size + ' -co BLOCKYSIZE=' + self.tile_size + ' ' + srcfile + ' ' + dstfile + gdalwarp_command = ( + 'gdalwarp ' + self.overwrite + ' -t_srs ' + self.target_srs + + ' -co TILED=YES -co BLOCKXSIZE=' + self.tile_size + + ' -co BLOCKYSIZE=' + self.tile_size + ' ' + srcfile + ' ' + + dstfile + ) log.info('The complete GDAL warp command is: %s', gdalwarp_command) # push output path to XCom @@ -107,160 +155,98 @@ def execute(self, context): task_instance.xcom_push(key='dstfile', value=dstfile) # run the command - bo = BashOperator(task_id="bash_operator_warp", bash_command=gdalwarp_command) + bo = BashOperator( + task_id="bash_operator_warp", bash_command=gdalwarp_command) bo.execute(context) class GDALAddoOperator(BaseOperator): @apply_defaults - def __init__(self, resampling_method, max_overview_level, compress_overview = None, photometric_overview = None, interleave_overview = None, srcfile = None, xk_pull_dag_id=None, xk_pull_task_id=None, xk_pull_key_srcfile=None, xk_push_key=None, *args, **kwargs): - self.resampling_method = resampling_method - self.max_overview_level = max_overview_level - self.compress_overview = ' --config COMPRESS_OVERVIEW ' + compress_overview +' ' if compress_overview else ' ' - self.photometric_overview = ' --config PHOTOMETRIC_OVERVIEW ' + photometric_overview +' ' if photometric_overview else ' ' - self.interleave_overview = ' --config INTERLEAVE_OVERVIEW ' + interleave_overview +' ' if interleave_overview else ' ' - self.srcfile = srcfile - self.xk_pull_dag_id = xk_pull_dag_id - self.xk_pull_task_id = xk_pull_task_id - self.xk_pull_key_srcfile = xk_pull_key_srcfile - self.xk_push_key = xk_push_key - - """ - levels = '' - for i in range(1, int(math.log(max_overview_level, 2)) + 1): - levels += str(2**i) + ' ' - self.levels = levels - """ - level = 2 - levels = '' - while(level <= int(self.max_overview_level)): - level = level*2 - levels += str(level) - levels += ' ' - self.levels = levels - super(GDALAddoOperator, self).__init__(*args, **kwargs) + def __init__(self, get_inputs_from, resampling_method, + max_overview_level, compress_overview=None, + *args, **kwargs): super(GDALAddoOperator, self).__init__(*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.resampling_method = resampling_method + self.max_overview_level = int(max_overview_level) + self.compress_overview = compress_overview def execute(self, context): - log.info('-------------------- GDAL_PLUGIN Addo ------------') - task_instance = context['task_instance'] - - log.info(""" - resampling_method: {} - max_overview_level: {} - compress_overview: {} - photometric_overview: {} - interleave_overview: {} - srcfile: {} - xk_pull_dag_id: {} - xk_pull_task_id: {} - xk_pull_key_srcfile: {} - xk_push_key: {} - """.format( - self.resampling_method, - self.max_overview_level, - self.compress_overview, - self.photometric_overview, - self.interleave_overview, - self.srcfile, - self.xk_pull_dag_id, - self.xk_pull_task_id, - self.xk_pull_key_srcfile, - self.xk_push_key - ) + input_path = context["task_instance"].xcom_pull(self.get_inputs_from) + levels = get_overview_levels(self.max_overview_level) + log.info("Generating overviews for {!r}...".format(input_path)) + command = get_gdaladdo_command( + input_path, overview_levels=levels, + resampling_method=self.resampling_method, + compress_overview=self.compress_overview + ) + bo = BashOperator( + task_id='bash_operator_addo_{}'.format( + os.path.basename(input_path)), + bash_command=command ) - - # init XCom parameters - if self.xk_pull_dag_id is None: - self.xk_pull_dag_id = context['dag'].dag_id - # check input file path passed otherwise look for it in XCom - if self.srcfile is not None: - srcfile = self.srcfile - else: - if self.xk_pull_key_srcfile is None: - self.xk_pull_key_srcfile = 'srcdir' - log.debug('Fetching srcdir from XCom') - srcfile = task_instance.xcom_pull(task_ids=self.xk_pull_task_id, key=self.xk_pull_key_srcfile, dag_id=self.xk_pull_dag_id) - if srcfile is None: - log.warn('No srcdir fetched from XCom. Nothing to do') - return False - assert srcfile is not None - log.info('srcfile: %s', srcfile) - log.info('levels %s', self.levels) - - gdaladdo_command = 'gdaladdo -r ' + self.resampling_method + ' ' + self.compress_overview + self.photometric_overview+ self.interleave_overview + srcfile + ' ' + self.levels - log.info('The complete GDAL addo command is: %s', gdaladdo_command) - - # push output path to XCom - if self.xk_push_key is None: - self.xk_push_key = 'dstfile' - task_instance.xcom_push(key='dstfile', value=srcfile) - - # run the command - bo = BashOperator(task_id='bash_operator_addo_', bash_command=gdaladdo_command) bo.execute(context) + class GDALTranslateOperator(BaseOperator): @apply_defaults - def __init__(self, - tiled = None, - working_dir = None, - blockx_size = None, - blocky_size = None, - compress = None, - photometric = None, - ot = None, of = None, - b = None, mask = None, - outsize = None, - scale = None, - *args, **kwargs): - - self.working_dir = working_dir - - self.tiled = ' -co "TILED=' + tiled +'" ' if tiled else ' ' - self.blockx_size = ' -co "BLOCKXSIZE=' + blockx_size + '" ' if blockx_size else ' ' - self.blocky_size = ' -co "BLOCKYSIZE=' + blocky_size + '" ' if blocky_size else ' ' - self.compress = ' -co "COMPRESS=' + compress + '"' if compress else ' ' - self.photometric = ' -co "PHOTOMETRIC=' + photometric + '" ' if photometric else ' ' - - self.ot = ' -ot ' + str(ot) if ot else '' - self.of = ' -of ' + str(of) if of else '' - self.b = ' -b ' + str(b) if b else '' - self.mask = '-mask ' + str(mask) if mask else '' - self.outsize = '-outsize ' + str(outsize) if outsize else '' - self.scale = ' -scale ' + str(scale) if scale else '' - - log.info('--------------------GDAL_PLUGIN Translate initiated------------') + def __init__(self, get_inputs_from, output_type="UInt16", + creation_options=None, *args, **kwargs): super(GDALTranslateOperator, self).__init__(*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.output_type = str(output_type) + self.creation_options = dict( + creation_options) if creation_options is not None else { + "tiled": True, + "blockxsize": 512, + "blockysize": 512, + } def execute(self, context): - log.info('--------------------GDAL_PLUGIN Translate running------------') - task_instance = context['task_instance'] - log.info("GDAL Translate Operator params list") - log.info('Working dir: %s', self.working_dir) - scene_fullpath = context['task_instance'].xcom_pull('landsat8_download_daraa_task', key='scene_fullpath') - scenes = os.listdir(scene_fullpath) - if not os.path.exists(scene_fullpath+"__translated"): - create_translated_dir = BashOperator(task_id="bash_operator_translate", bash_command="mkdir {}".format(scene_fullpath+"__translated")) - create_translated_dir.execute(context) - for scene in scenes: - if scene.endswith(".TIF"): - output_img_filename = 'translated_' +str(scene) - gdaltranslate_command = 'gdal_translate ' + self.ot + self.of + self.b + self.mask + self.outsize + self.scale + self.tiled + self.blockx_size + self.blocky_size + self.compress + self.photometric + os.path.join(scene_fullpath ,scene) +' ' + os.path.join(scene_fullpath+"__translated" , output_img_filename) - log.info('The complete GDAL translate command is: %s', gdaltranslate_command) - b_o = BashOperator(task_id="bash_operator_translate_scenes", bash_command = gdaltranslate_command) - b_o.execute(context) - task_instance.xcom_push(key = 'translated_scenes_dir', value = os.path.join(scene_fullpath+"__translated",output_img_filename)) - task_instance.xcom_push(key = 'product_dir', value = scene_fullpath) - inner_tiffs = os.path.join(scene_fullpath,"*.TIF") - #rm_cmd = "rm -r {}".format(inner_tiffs) - #delete_b_o = BashOperator(task_id="bash_operator_delete_scenes", bash_command = rm_cmd) - #delete_b_o.execute(context) - return True + downloaded_path = context["task_instance"].xcom_pull( + self.get_inputs_from) + working_dir = os.path.join(os.path.dirname(downloaded_path), + "__translated") + try: + os.makedirs(working_dir) + except OSError as exc: + if exc.errno == 17: + pass # directory already exists + else: + raise + + output_img_filename = 'translated_{}'.format( + os.path.basename(downloaded_path)) + output_path = os.path.join(working_dir, output_img_filename) + command = get_gdal_translate_command( + source=downloaded_path, destination=output_path, + output_type=self.output_type, + creation_options=self.creation_options + ) + log.info("The complete GDAL translate command is: {}".format(command)) + b_o = BashOperator( + task_id="bash_operator_translate", + bash_command=command + ) + b_o.execute(context) + return output_path class GDALPlugin(AirflowPlugin): name = "GDAL_plugin" - operators = [GDALWarpOperator, GDALAddoOperator, GDALTranslateOperator] + operators = [ + GDALWarpOperator, + GDALAddoOperator, + GDALTranslateOperator + ] + + +def _get_gdal_creation_options(**creation_options): + result = "" + for name, value in creation_options.items(): + opt = '-co "{}={}"'.format(name.upper(), value) + " ".join((result, opt)) + return result + diff --git a/airflow/plugins/landsat8_db_plugin.py b/airflow/plugins/landsat8_db_plugin.py index 20b4941..e341838 100644 --- a/airflow/plugins/landsat8_db_plugin.py +++ b/airflow/plugins/landsat8_db_plugin.py @@ -1,84 +1,113 @@ +import gzip import logging +import os import pprint -import urllib -import gzip -import psycopg2 -from datetime import datetime, timedelta -from airflow.operators import BaseOperator, BashOperator + +from airflow.operators import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults +import psycopg2 +import requests - -log = logging.getLogger(__name__) +logger = logging.getLogger(__name__) pp = pprint.PrettyPrinter(indent=2) + +def download_file(url, destination_directory): + response = requests.get(url, stream=True) + full_path = os.path.join(destination_directory, url.rpartition("/")[-1]) + with open(full_path, "wb") as fh: + for chunk in response.iter_content(chunk_size=1024): + fh.write(chunk) + return full_path + + class DownloadSceneList(BaseOperator): @apply_defaults - def __init__(self, download_url, download_dir, download_timeout=timedelta(hours=1),*args, **kwargs): + def __init__(self, download_dir, download_url, *args, **kwargs): + super(DownloadSceneList, self).__init__(*args, **kwargs) self.download_url = download_url self.download_dir = download_dir - log.info('-------------------- DownloadSceneList ------------') - super(DownloadSceneList, self).__init__(*args, **kwargs) def execute(self, context): - log.info('-------------------- DownloadSceneList Execute------------') - urllib.urlretrieve(self.download_url,self.download_dir+"scene_list.gz") - context['task_instance'].xcom_push(key='scene_list_gz_path', value=str(self.download_dir)+"scene_list.gz") - return True + try: + os.makedirs(self.download_dir) + except OSError as exc: + if exc.errno == 17: + pass # directory already exists + else: + raise + + logger.info("Downloading {!r}...".format(self.download_url)) + download_file(self.download_url, self.download_dir) + logger.info("Done!") + class ExtractSceneList(BaseOperator): @apply_defaults - def __init__(self, download_dir,*args, **kwargs): - log.info('-------------------- ExtractSceneList ------------') - self.download_dir = download_dir + def __init__(self, download_dir, download_url, *args, **kwargs): super(ExtractSceneList, self).__init__(*args, **kwargs) + self.download_dir = download_dir + self.download_url = download_url def execute(self, context): - log.info('-------------------- ExtractSceneList Execute------------') - scene_list_gz_path = context['task_instance'].xcom_pull('download_scene_list_gz_task', key='scene_list_gz_path') - with gzip.open(scene_list_gz_path, 'rb') as f: - file_content = f.read() - outF = file(self.download_dir+"scene_list.csv", 'wb') - outF.write(file_content) - outF.close() - context['task_instance'].xcom_push(key='scene_list_csv_path', value=str(self.download_dir)+"scene_list.csv") - return True + path_to_extract = os.path.join( + self.download_dir, + self.download_url.rpartition("/")[-1] + ) + target_path = "{}.csv".format( + os.path.splitext(path_to_extract)[0]) + logger.info("Extracting {!r} to {!r}...".format( + path_to_extract, target_path)) + with gzip.open(path_to_extract, 'rb') as zipped_fh, \ + open(target_path, "wb") as extracted_fh: + extracted_fh.write(zipped_fh.read()) + class UpdateSceneList(BaseOperator): @apply_defaults - def __init__(self, pg_dbname, pg_hostname, pg_port, pg_username, pg_password,*args, **kwargs): - log.info('-------------------- UpdateSceneList ------------') + def __init__(self, download_dir, download_url, pg_dbname, pg_hostname, + pg_port, pg_username, pg_password,*args, **kwargs): + super(UpdateSceneList, self).__init__(*args, **kwargs) + self.download_dir = download_dir + self.download_url = download_url self.pg_dbname = pg_dbname self.pg_hostname = pg_hostname self.pg_port = pg_port self.pg_username = pg_username self.pg_password = pg_password - super(UpdateSceneList, self).__init__(*args, **kwargs) def execute(self, context): - log.info('-------------------- UpdateSceneList Execute------------') - scene_list_csv_path = context['task_instance'].xcom_pull('extract_scene_list_task', key='scene_list_csv_path') - - delete_first_line_cmd = "tail -n +2 " + scene_list_csv_path + "> " + scene_list_csv_path+".tmp && mv "+ scene_list_csv_path +".tmp " + scene_list_csv_path - delete_line_operator = BashOperator(task_id='Delete_first_line_OP', bash_command = delete_first_line_cmd) - delete_line_operator.execute(context) - - db = psycopg2.connect("dbname='{}' user='{}' host='{}' password='{}'".format(self.pg_dbname, self.pg_username, self.pg_hostname, self.pg_password)) - cursor = db.cursor() - cursor.execute("delete from scene_list") - db.commit() - - fo = open(scene_list_csv_path, 'r') - cursor.copy_from(fo, 'scene_list',sep=',') - db.commit() - - fo.close() + db_connection = psycopg2.connect( + "dbname='{}' user='{}' host='{}' password='{}'".format( + self.pg_dbname, self.pg_username, self.pg_hostname, + self.pg_password + ) + ) + logger.info("Deleting previous data from db...") + with db_connection as conn: + with conn.cursor() as cursor: + cursor.execute("DELETE FROM scene_list;") + filename = os.path.splitext(self.download_url.rpartition("/")[-1])[0] + scene_list_path = os.path.join( + self.download_dir, + "{}.csv".format(filename)) + logger.info("Loading data from {!r} into db...".format( + scene_list_path)) + with db_connection as conn, open(scene_list_path) as fh: + fh.readline() + with conn.cursor() as cursor: + cursor.copy_from(fh, "scene_list", sep=",") return True + class LANDSAT8DBPlugin(AirflowPlugin): name = "landsat8db_plugin" - operators = [DownloadSceneList, ExtractSceneList, UpdateSceneList] - + operators = [ + DownloadSceneList, + ExtractSceneList, + UpdateSceneList + ] diff --git a/airflow/plugins/landsat8_metadata_plugin.py b/airflow/plugins/landsat8_metadata_plugin.py index 9b7b6d1..88f12e6 100644 --- a/airflow/plugins/landsat8_metadata_plugin.py +++ b/airflow/plugins/landsat8_metadata_plugin.py @@ -1,162 +1,287 @@ +from collections import namedtuple +import json import logging +import os import pprint -from datetime import datetime, timedelta -from airflow.operators import BaseOperator, BashOperator +import shutil +import zipfile + +from airflow.operators import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults -import os, math, json, shutil, zipfile -from pgmagick import Image, Geometry -from jinja2 import Environment, FileSystemLoader, Template - +from pgmagick import Image log = logging.getLogger(__name__) pp = pprint.PrettyPrinter(indent=2) -''' This class will read the .MTL file which is attached with the scene/product directory.The returned value from "final_metadata_dict" will be a python dictionary holding all the available keys from the .MTL file. this dictionary will be saved as json file to be added later -to the product.zip. Also, the execute method will "xcom.push" the absolute path of the generated json file inside the context of the task -''' + +BoundingBox = namedtuple("BoundingBox", [ + "ullon", + "ullat", + "urlon", + "urlat", + "lllon", + "lllat", + "lrlon", + "lrlat" +]) + + +def parse_mtl_data(buffer): + """Parse input file-like object that contains metadata in MTL format.""" + metadata = {} + current = metadata + previous = metadata + for line in buffer: + key, value = (i.strip() for i in line.partition("=")[::2]) + if value == "L1_METADATA_FILE": + pass + elif key == "END": + pass + elif key == "GROUP": + current[value] = {} + previous = current + current = current[value] + elif key == "END_GROUP": + current = previous + elif key == "": + pass + else: + try: + parsed_value = int(value) + except ValueError: + try: + parsed_value = float(value) + except ValueError: + parsed_value = str(value.replace('"', "")) + current[key] = parsed_value + return metadata + + +def get_bounding_box(product_metadata): + return BoundingBox( + ullon=float(product_metadata["CORNER_UL_LON_PRODUCT"]), + ullat=float(product_metadata["CORNER_UL_LAT_PRODUCT"]), + urlon=float(product_metadata["CORNER_UR_LON_PRODUCT"]), + urlat=float(product_metadata["CORNER_UR_LAT_PRODUCT"]), + lllon=float(product_metadata["CORNER_LL_LON_PRODUCT"]), + lllat=float(product_metadata["CORNER_LL_LAT_PRODUCT"]), + lrlon=float(product_metadata["CORNER_LR_LON_PRODUCT"]), + lrlat=float(product_metadata["CORNER_LR_LAT_PRODUCT"]), + ) + + + +def prepare_metadata(metadata, bounding_box): + return { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [[ + [bounding_box.ullat, bounding_box.ullon], + [bounding_box.urlat, bounding_box.urlon], + [bounding_box.lllat, bounding_box.lllon], + [bounding_box.lrlat, bounding_box.lrlon], + [bounding_box.ullat, bounding_box.ullon], + ]], + }, + "properties": { + "eop:identifier": metadata[ + "METADATA_FILE_INFO"]["LANDSAT_PRODUCT_ID"], + "timeStart": metadata["PRODUCT_METADATA"]["SCENE_CENTER_TIME"], + "timeEnd": metadata["PRODUCT_METADATA"]["SCENE_CENTER_TIME"], + "originalPackageLocation": None, + "thumbnailURL": None, + "quicklookURL": None, + "eop:parentIdentifier": "LANDSAT8", + "eop:productionStatus": None, + "eop:acquisitionType": None, + "eop:orbitNumber": None, + "eop:orbitDirection": None, + "eop:track": None, + "eop:frame": None, + "eop:swathIdentifier": None, + "opt:cloudCover": metadata["IMAGE_ATTRIBUTES"]["CLOUD_COVER"], + "opt:snowCover": None, + "eop:productQualityStatus": None, + "eop:productQualityDegradationStatus": None, + "eop:processorName": metadata[ + "METADATA_FILE_INFO"]["PROCESSING_SOFTWARE_VERSION"], + "eop:processingCenter": None, + "eop:creationDate": None, + "eop:modificationDate": metadata[ + "METADATA_FILE_INFO"]["FILE_DATE"], + "eop:processingDate": None, + "eop:sensorMode": None, + "eop:archivingCenter": None, + "eop:processingMode": None, + "eop:availabilityTime": None, + "eop:acquisitionStation": metadata[ + "METADATA_FILE_INFO"]["STATION_ID"], + "eop:acquisitionSubtype": None, + "eop:startTimeFromAscendingNode": None, + "eop:completionTimeFromAscendingNode": None, + "eop:illuminationAzimuthAngle": metadata[ + "IMAGE_ATTRIBUTES"]["SUN_AZIMUTH"], + "eop:illuminationZenithAngle": None, + "eop:illuminationElevationAngle": metadata[ + "IMAGE_ATTRIBUTES"]["SUN_ELEVATION"], + "eop:resolution": metadata[ + "PROJECTION_PARAMETERS"]["GRID_CELL_SIZE_REFLECTIVE"] + } + } + + +def prepare_granules(bounding_box, location): + return { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [[ + [bounding_box.ullat, bounding_box.ullon], + [bounding_box.urlat, bounding_box.urlon], + [bounding_box.lllat, bounding_box.lllon], + [bounding_box.lrlat, bounding_box.lrlon], + [bounding_box.ullat, bounding_box.ullon], + ]], + }, + "properties": {"location": location}, + "id": "GRANULE.1" + } + ] + } + + class Landsat8MTLReaderOperator(BaseOperator): - @apply_defaults - def __init__(self, loc_base_dir, metadata_xml_path, *args, **kwargs): - self.metadata_xml_path = metadata_xml_path - self.loc_base_dir = loc_base_dir - super(Landsat8MTLReaderOperator, self).__init__(*args, **kwargs) - - def execute(self, context): - product_directory = context['task_instance'].xcom_pull('landsat8_translate_daraa_task', key='product_dir') - log.info("PRODUCT DIRECTORY") - log.info(product_directory) - scene_files = os.listdir(product_directory) - tiffs = [] - for item in scene_files: - if item.endswith("MTL.txt"): - lines = open(os.path.join(product_directory,item)).readlines() - if item.endswith("thumb_small.jpg"): - product_jpeg = os.path.join(product_directory,item) - if item.endswith(".TIF"): - tiffs.append(item) - metadata_dictionary = {} - for line in lines: - line_list = line.split("=") - metadata_dictionary[line_list[0].strip()] = line_list[1].strip() if len(line_list)>1 else "XXXXXXXXX" - final_metadata_dict = {"type": "Feature", "geometry": {"type":"Polygon","coordinates":[[[float(metadata_dictionary["CORNER_UL_LAT_PRODUCT"]), float(metadata_dictionary["CORNER_UL_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_UR_LAT_PRODUCT"]),float(metadata_dictionary["CORNER_UR_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_LL_LAT_PRODUCT"]),float(metadata_dictionary["CORNER_LL_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_LR_LAT_PRODUCT"]),float(metadata_dictionary["CORNER_LR_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_UL_LAT_PRODUCT"]), float(metadata_dictionary["CORNER_UL_LON_PRODUCT"])]]]}, - "properties": {"eop:identifier" : metadata_dictionary["LANDSAT_PRODUCT_ID"][1:-1], - "timeStart" : metadata_dictionary["SCENE_CENTER_TIME"], "timeEnd" : metadata_dictionary["SCENE_CENTER_TIME"], "originalPackageLocation" : None, "thumbnailURL" : None, "quicklookURL" : None, - "eop:parentIdentifier" : "LANDSAT8", "eop:productionStatus" : None, "eop:acquisitionType" : None, "eop:orbitNumber" : None, - "eop:orbitDirection" : None, "eop:track" : None, "eop:frame" : None, "eop:swathIdentifier" : None, - "opt:cloudCover" : metadata_dictionary["CLOUD_COVER"], - "opt:snowCover" : None, "eop:productQualityStatus" : None, "eop:productQualityDegradationStatus" : None, - "eop:processorName" : metadata_dictionary["PROCESSING_SOFTWARE_VERSION"][1:-1], - "eop:processingCenter" : None, "eop:creationDate" : None, - "eop:modificationDate" : metadata_dictionary["FILE_DATE"], - "eop:processingDate" : None, "eop:sensorMode" : None, "eop:archivingCenter" : None, "eop:processingMode" : None, - "eop:availabilityTime" : None, - "eop:acquisitionStation" : metadata_dictionary["STATION_ID"][1:-1], - "eop:acquisitionSubtype" : None, "eop:startTimeFromAscendingNode" : None, "eop:completionTimeFromAscendingNode" : None, - "eop:illuminationAzimuthAngle" : metadata_dictionary["SUN_AZIMUTH"], - "eop:illuminationZenithAngle" : None, - "eop:illuminationElevationAngle" : metadata_dictionary["SUN_ELEVATION"], - "eop:resolution" : metadata_dictionary["GRID_CELL_SIZE_REFLECTIVE"]}} - - with open(os.path.join(product_directory,'product.json'), 'w') as outfile: - json.dump(final_metadata_dict, outfile) - log.info("######### JSON FILE PATH") - log.info(os.path.abspath(os.path.join(product_directory,'product.json'))) - context['task_instance'].xcom_push(key='scene_time', value=metadata_dictionary["SCENE_CENTER_TIME"]) - context['task_instance'].xcom_push(key='product_json_abs_path', value=os.path.abspath(os.path.join(product_directory,'product.json'))) - context['task_instance'].xcom_push(key='product_jpeg_abs_path', value=product_jpeg) - - granules_dict = {"type": "FeatureCollection","features": [{"type": "Feature","geometry": {"type":"Polygon","coordinates":[[[float(metadata_dictionary["CORNER_UL_LAT_PRODUCT"]), float(metadata_dictionary["CORNER_UL_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_UR_LAT_PRODUCT"]),float(metadata_dictionary["CORNER_UR_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_LL_LAT_PRODUCT"]),float(metadata_dictionary["CORNER_LL_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_LR_LAT_PRODUCT"]),float(metadata_dictionary["CORNER_LR_LON_PRODUCT"])],[float(metadata_dictionary["CORNER_UL_LAT_PRODUCT"]), float(metadata_dictionary["CORNER_UL_LON_PRODUCT"])]]]},"properties": {"location": os.path.join(self.loc_base_dir,product_directory,tiffs[0])},"id": "GRANULE.1"}]} - - with open(os.path.join(product_directory,'granules.json'), 'w') as outfile: - json.dump(granules_dict, outfile) - log.info("######### JSON FILE PATH") - log.info(os.path.abspath(os.path.join(product_directory,'granules.json'))) - context['task_instance'].xcom_push(key='granules_json_abs_path', value=os.path.abspath(os.path.join(product_directory,'granules.json'))) - log.info("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXxx") - log.info(os.getcwd()) - - shutil.copyfile(self.metadata_xml_path ,os.path.join(product_directory,"metadata.xml")) - context['task_instance'].xcom_push(key='metadata_xml_abs_path', value=os.path.join(product_directory,"metadata.xml")) - return True -# regarding class granules.json need to discuss about it, done and it will be having the 11 bands - - -''' This class will create a compressed, low resolution, square shaped thumbnail for the -original scene then return the absolute path of the generated thumbnail -''' -class Landsat8ThumbnailOperator(BaseOperator): + """ + This class will read the .MTL file which is attached with the + scene/product directory.The returned value from "final_metadata_dict" + will be a python dictionary holding all the available keys from the + .MTL file. this dictionary will be saved as json file to be added later + to the product.zip. Also, the execute method will "xcom.push" the + absolute path of the generated json file inside the context of the task + """ - @apply_defaults - def __init__(self, thumb_size_x, thumb_size_y, *args, **kwargs): - self.thumb_size_x = thumb_size_x - self.thumb_size_y = thumb_size_y - super(Landsat8ThumbnailOperator, self).__init__(*args, **kwargs) + @apply_defaults + def __init__(self, get_inputs_from, loc_base_dir, metadata_xml_path, + *args, **kwargs): + super(Landsat8MTLReaderOperator, self).__init__(*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.metadata_xml_path = metadata_xml_path + self.loc_base_dir = loc_base_dir - def execute(self, context): - product_directory = context['task_instance'].xcom_pull('landsat8_translate_daraa_task', key='product_dir') - jpeg_abs_path = context['task_instance'].xcom_pull('landsat8_product_json_task', key='product_jpeg_abs_path') - img = Image(jpeg_abs_path) - least_dim = min(int(img.columns()),int(img.rows())) - img.crop(str(least_dim)+'x'+str(least_dim)) - img.scale(self.thumb_size_x+'x'+self.thumb_size_y) - img.write(os.path.join(product_directory,"thumbnail.jpeg")) - log.info(os.path.join(product_directory,"thumbnail.jpeg")) - context['task_instance'].xcom_push(key='thumbnail_jpeg_abs_path', value=os.path.join(product_directory,"thumbnail.jpeg")) - return True + def execute(self, context): + mtl_path = context["task_instance"].xcom_pull(self.get_inputs_from) + with open(mtl_path) as mtl_fh: + parsed_metadata = parse_mtl_data(mtl_fh) + bounding_box = get_bounding_box(parsed_metadata["PRODUCT_METADATA"]) + prepared_metadata = prepare_metadata(parsed_metadata, bounding_box) + product_directory, mtl_name = os.path.split(mtl_path) + location = os.path.join(self.loc_base_dir, product_directory, mtl_name) + granules_dict = prepare_granules(bounding_box, location) + json_path = os.path.join(product_directory, "product.json") + granules_path = os.path.join(product_directory, "granules.json") + xml_template_path = os.path.join(product_directory, "metadata.xml") + with open(json_path, 'w') as out_json_fh: + json.dump(prepared_metadata, out_json_fh) + with open(granules_path, 'w') as out_granules_fh: + json.dump(granules_dict, out_granules_fh) + shutil.copyfile(self.metadata_xml_path, xml_template_path) + return json_path, granules_path, xml_template_path -''' This class will create a .html description file by copying it from its config path''' +class Landsat8ThumbnailOperator(BaseOperator): + """This class will create a compressed, low resolution, square shaped + thumbnail for the original scene then return the absolute path of the + generated thumbnail + """ + + @apply_defaults + def __init__(self, get_inputs_from, thumb_size_x, thumb_size_y, + *args, **kwargs): + super(Landsat8ThumbnailOperator, self).__init__(*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.thumb_size_x = thumb_size_x + self.thumb_size_y = thumb_size_y + + def execute(self, context): + downloaded_thumbnail = context["task_instance"].xcom_pull( + self.get_inputs_from) + log.info("downloaded_thumbnail: {}".format(downloaded_thumbnail)) + img = Image(downloaded_thumbnail) + least_dim = min(int(img.columns()), int(img.rows())) + img.crop("{dim}x{dim}".format(dim=least_dim)) + img.scale(self.thumb_size_x+'x'+self.thumb_size_y) + img.scale("{}x{}".format(self.thumb_size_x, self.thumb_size_y)) + output_path = os.path.join( + os.path.dirname(downloaded_thumbnail), "thumbnail.jpeg") + img.write(output_path) + return output_path + class Landsat8ProductDescriptionOperator(BaseOperator): - @apply_defaults - def __init__(self, description_template, *args, **kwargs): - self.description_template = description_template - super(Landsat8ProductDescriptionOperator, self).__init__(*args, **kwargs) - - def execute(self, context): - product_desc_dict = {} - product_directory = context['task_instance'].xcom_pull('landsat8_translate_daraa_task', key='product_dir') - try: - shutil.copyfile(self.description_template, os.path.join(product_directory,"description.html")) - except: - print "Couldn't find description.html" - context['task_instance'].xcom_push(key='product_desc_abs_path', value=os.path.join(product_directory,"description.html")) - return True + """This class will create a .html description file by copying it from + its config path + """ + @apply_defaults + def __init__(self, description_template, download_dir, *args, **kwargs): + super(Landsat8ProductDescriptionOperator, self).__init__( + *args, **kwargs) + self.description_template = description_template + self.download_dir = download_dir + + def execute(self, context): + output_path = os.path.join(self.download_dir, "description.html") + shutil.copyfile(self.description_template, output_path) + return output_path + -''' This class will create product.zip file utilizing from the previous tasks ''' class Landsat8ProductZipFileOperator(BaseOperator): - @apply_defaults - def __init__(self, *args, **kwargs): - #self.zip_location = zip_location - super(Landsat8ProductZipFileOperator, self).__init__(*args, **kwargs) - - def execute(self, context): - product_directory = context['task_instance'].xcom_pull('landsat8_translate_daraa_task', key='product_dir') - - product_json_abs_path = context['task_instance'].xcom_pull('landsat8_product_json_task', key='product_json_abs_path') - thumbnail_jpeg_abs_path = context['task_instance'].xcom_pull('landsat8_product_thumbnail_task', key='thumbnail_jpeg_abs_path') - product_desc_abs_path = context['task_instance'].xcom_pull('landsat8_product_description_task', key='product_desc_abs_path') - granules_json_abs_path = context['task_instance'].xcom_pull('landsat8_product_json_task', key='granules_json_abs_path') - metadata_xml_abs_path = context['task_instance'].xcom_pull('landsat8_product_json_task', key='metadata_xml_abs_path') - list_of_files = [product_json_abs_path, granules_json_abs_path, thumbnail_jpeg_abs_path, product_desc_abs_path, metadata_xml_abs_path] - log.info(list_of_files) - product = zipfile.ZipFile(os.path.join(product_directory,"product.zip"), 'w') - for item in list_of_files: - product.write(item,item.rsplit('/', 1)[-1]) - product.close() - return True + """This class will create product.zip file utilizing from the previous + tasks + """ + + @apply_defaults + def __init__(self, get_inputs_from, output_dir, *args, **kwargs): + super(Landsat8ProductZipFileOperator, self).__init__(*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.output_dir = output_dir + + def execute(self, context): + paths_to_zip = [] + for input_provider in self.get_inputs_from: + inputs = context["task_instance"].xcom_pull(input_provider) + if isinstance(inputs, basestring): + paths_to_zip.append(inputs) + else: # the Landsat8MTLReaderOperator returns a tuple of strings + paths_to_zip.extend(inputs) + log.info("paths_to_zip: {}".format(paths_to_zip)) + output_path = os.path.join(self.output_dir, "product.zip") + with zipfile.ZipFile(output_path, "w") as zip_handler: + for path in paths_to_zip: + zip_handler.write(path, os.path.basename(path)) + return output_path + class Landsat8GranuleJsonFileOperator(BaseOperator): - @apply_defaults - def __init__(self, location_prop, *args, **kwargs): - self.location_prop = location_prop - super(Landsat8GranuleJsonFileOperator, self).__init__(*args, **kwargs) - def execute(self, context): - - return True + @apply_defaults + def __init__(self, location_prop, *args, **kwargs): + self.location_prop = location_prop + super(Landsat8GranuleJsonFileOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + return True + class LANDSAT8METADATAPlugin(AirflowPlugin): - name = "landsat8_metadata_plugin" - operators = [Landsat8MTLReaderOperator, Landsat8ThumbnailOperator, Landsat8ProductDescriptionOperator, Landsat8ProductZipFileOperator] + name = "landsat8_metadata_plugin" + operators = [ + Landsat8MTLReaderOperator, + Landsat8ThumbnailOperator, + Landsat8ProductDescriptionOperator, + Landsat8ProductZipFileOperator + ] diff --git a/airflow/plugins/search_download_daraa_plugin.py b/airflow/plugins/search_download_daraa_plugin.py index 92bbd0e..dcc19cd 100644 --- a/airflow/plugins/search_download_daraa_plugin.py +++ b/airflow/plugins/search_download_daraa_plugin.py @@ -1,104 +1,97 @@ -from airflow import DAG +from datetime import timedelta +from itertools import chain import logging +import os import psycopg2 import urllib -from datetime import datetime, timedelta -from airflow.operators import BaseOperator, BashOperator + +from airflow.operators import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults -import os log = logging.getLogger(__name__) + class Landsat8SearchOperator(BaseOperator): @apply_defaults - def __init__(self, - cloud_coverage, - path, - row, - pgdbname, - pghostname, - pgport, - pgusername, - pgpassword, - processing_level="L1TP", - *args, **kwargs): - self.cloud_coverage = cloud_coverage - #self.acquisition_date = str(acquisition_date) - self.path = path - self.row = row - self.processing_level = processing_level - self.pgdbname = pgdbname - self.pghostname = pghostname - self.pgport = pgport - self.pgusername = pgusername - self.pgpassword = pgpassword - print("Initialization of Daraa Landsat8SearchOperator ...") + def __init__(self, area, cloud_coverage, db_credentials, *args, **kwargs): super(Landsat8SearchOperator, self).__init__(*args, **kwargs) - + self.area = area + self.cloud_coverage = cloud_coverage + self.db_credentials = dict(db_credentials) + def execute(self, context): - log.info(context) - log.info("#####################" ) - log.info("## LANDSAT8 Search ##") - log.info('Cloud Coverage <= % : %f', self.cloud_coverage) - #log.info('Acquisition Date : %s', self.acquisition_date) - log.info('Path : %d', self.path) - log.info('Row : %d', self.row) - log.info('Processing Level: %s', self.processing_level) - print("Executing Landsat8SearchOperator .. ") + connection = psycopg2.connect( + dbname=self.db_credentials["dbname"], + user=self.db_credentials["username"], + password=self.db_credentials["password"], + host=self.db_credentials["hostname"], + port=self.db_credentials["port"], + ) + cursor = connection.cursor() + query = ( + "SELECT productId, entityId, download_url " + "FROM scene_list " + "WHERE cloudCover < %s AND path = %s AND row = %s " + "ORDER BY acquisitionDate DESC " + "LIMIT 1;" + ) + data = (self.cloud_coverage, self.area.path, self.area.row) + cursor.execute(query, data) + try: + product_id, entity_id, download_url = cursor.fetchone() + log.info( + "Found {} product with {} scene id, available for download " + "through {} ".format(product_id, entity_id, download_url) + ) + except TypeError: + log.error( + "Could not find any product for the {} area".format(self.area)) + else: + return (product_id, entity_id, download_url) - db = psycopg2.connect("dbname='{}' user='{}' host='{}' password='{}'".format(self.pgdbname, self.pgusername, self.pghostname, self.pgpassword)) - cursor = db.cursor() - sql_stmt = 'select productId, entityId, download_url from scene_list where cloudCover < {} and path = {} and row = {} order by acquisitionDate desc limit 1'.format(self.cloud_coverage,self.path,self.row) - cursor.execute(sql_stmt) - result_set = cursor.fetchall() - print result_set - log.info("Found {} product with {} scene id, available for download through {} ".format(result_set[0][0],result_set[0][1],result_set[0][2])) - context['task_instance'].xcom_push(key='searched_products', value=result_set[0]) - return True class Landsat8DownloadOperator(BaseOperator): + """Download a single Landsat8 file.""" @apply_defaults - def __init__(self, - download_dir, - number_of_bands = None, - download_timeout=timedelta(hours=1), - *args, **kwargs): + def __init__(self, download_dir, get_inputs_from, url_fragment, + download_timeout=timedelta(hours=1), *args, **kwargs): + super(Landsat8DownloadOperator, self).__init__( + execution_timeout=download_timeout, *args, **kwargs) self.download_dir = download_dir - self.number_of_bands = number_of_bands - log.info("----------------------------------------------------") - print("Initialization of Landsat8 Download ... ") - log.info('Download Directory: %s', self.download_dir) - super(Landsat8DownloadOperator, self).__init__(execution_timeout=download_timeout,*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.url_fragment = url_fragment def execute(self, context): - log.info("#######################") - log.info("## Landsat8 Download ##") - log.info('Download Directory: %s', self.download_dir) - print("Execute Landsat8 Download ... ") - scene_url = context['task_instance'].xcom_pull('landsat8_search_daraa_task', key='searched_products') - log.info("#######################") - log.info(self.download_dir+scene_url[1]) - if os.path.isdir(self.download_dir+scene_url[1]): - pass + task_inputs = context["task_instance"].xcom_pull(self.get_inputs_from) + product_id, entity_id, download_url = task_inputs + target_dir = os.path.join(self.download_dir, entity_id) + try: + os.makedirs(target_dir) + except OSError as exc: + if exc.errno == 17: # directory already exists + pass + url = download_url.replace( + "index.html", "{}_{}".format(product_id, self.url_fragment)) + target_path = os.path.join( + target_dir, + "{}_{}".format(product_id, self.url_fragment) + ) + try: + urllib.urlretrieve(url, target_path) + except Exception: + log.exception( + msg="Error downloading {}".format(self.url_fragment)) + raise else: - create_dir = BashOperator(task_id="bash_operator_translate_daraa", bash_command="mkdir {}".format(self.download_dir+scene_url[1])) - create_dir.execute(context) - counter = 1 - try: - urllib.urlretrieve(os.path.join(scene_url[2].replace("index.html",scene_url[0]+"_MTL.txt")),os.path.join(self.download_dir+scene_url[1],scene_url[0]+'_MTL.txt')) - urllib.urlretrieve(os.path.join(scene_url[2].replace("index.html",scene_url[0]+"_thumb_small.jpg")),os.path.join(self.download_dir+scene_url[1],scene_url[0]+'_thumb_small.jpg')) - while counter <= self.number_of_bands: - urllib.urlretrieve(scene_url[2].replace("index.html",scene_url[0]+"_B"+str(counter)+".TIF"),os.path.join(self.download_dir+scene_url[1],scene_url[0]+'_B'+str(counter)+'.TIF')) - counter+=1 - except: - log.info("EXCEPTION: ### Download not completed successfully, please check all the scenes, mtl and small jpg ###") - context['task_instance'].xcom_push(key='scene_fullpath', value=self.download_dir+scene_url[1]) - return True + return target_path class SearchDownloadDaraaPlugin(AirflowPlugin): name = "search_download_daraa_plugin" - operators = [Landsat8SearchOperator, Landsat8DownloadOperator] + operators = [ + Landsat8SearchOperator, + Landsat8DownloadOperator + ]