From 508c97be43ece7ca77acec4476fa491aa4473ba9 Mon Sep 17 00:00:00 2001 From: Ricardo Garcia Silva Date: Tue, 5 Sep 2017 01:09:39 +0100 Subject: [PATCH] refactoring custom operators - Switched to using XComs on the operator return values - Removed some unused operator parameters in order to simplify the code --- airflow/plugins/gdal_plugin.py | 131 +++++++----------- .../plugins/search_download_daraa_plugin.py | 104 ++++++-------- 2 files changed, 93 insertions(+), 142 deletions(-) diff --git a/airflow/plugins/gdal_plugin.py b/airflow/plugins/gdal_plugin.py index be0e13e..a5f500d 100644 --- a/airflow/plugins/gdal_plugin.py +++ b/airflow/plugins/gdal_plugin.py @@ -1,15 +1,10 @@ -import json import logging -import math import os -import shutil -import zipfile from airflow.operators import BashOperator from airflow.operators import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults -from pgmagick import Image, Geometry log = logging.getLogger(__name__) @@ -245,86 +240,64 @@ def execute(self, context): class GDALTranslateOperator(BaseOperator): @apply_defaults - def __init__(self, tiled=None, working_dir=None, blockx_size=None, - blocky_size=None, compress=None, photometric=None, - ot=None, of=None, b=None, mask=None, outsize=None, - scale=None, *args, **kwargs): - - self.working_dir = working_dir - self.tiled = ( - ' -co "TILED=' + tiled + '" ' if tiled else ' ') - self.blockx_size = ( - ' -co "BLOCKXSIZE=' + blockx_size + '" ' if blockx_size else ' ') - self.blocky_size = ( - ' -co "BLOCKYSIZE=' + blocky_size + '" ' if blocky_size else ' ') - self.compress = ( - ' -co "COMPRESS=' + compress + '"' if compress else ' ') - self.photometric = ( - ' -co "PHOTOMETRIC=' + photometric + '" ' if photometric else ' ') - self.ot = ' -ot ' + str(ot) if ot else '' - self.of = ' -of ' + str(of) if of else '' - self.b = ' -b ' + str(b) if b else '' - self.mask = '-mask ' + str(mask) if mask else '' - self.outsize = '-outsize ' + str(outsize) if outsize else '' - self.scale = ' -scale ' + str(scale) if scale else '' - - log.info( - '--------------------GDAL_PLUGIN Translate initiated------------') + def __init__(self, get_inputs_from, output_type="UInt16", + creation_options=None, *args, **kwargs): super(GDALTranslateOperator, self).__init__(*args, **kwargs) + self.get_inputs_from = get_inputs_from + self.output_type = str(output_type) + self.creation_options = dict( + creation_options) if creation_options is not None else { + "tiled": True, + "blockxsize": 512, + "blockysize": 512, + } def execute(self, context): - log.info( - '--------------------GDAL_PLUGIN Translate running------------') - task_instance = context['task_instance'] - log.info("GDAL Translate Operator params list") - log.info('Working dir: %s', self.working_dir) - scene_fullpath = context['task_instance'].xcom_pull( - 'landsat8_download_daraa_task', key='scene_fullpath') - scenes = os.listdir(scene_fullpath) - if not os.path.exists(scene_fullpath+"__translated"): - create_translated_dir = BashOperator( - task_id="bash_operator_translate", - bash_command="mkdir {}".format(scene_fullpath+"__translated") + downloaded_paths = context["task_instance"].xcom_pull( + self.get_inputs_from) + working_dir = os.path.join( + os.path.dirname(downloaded_paths[0]), + "__translated" + ) + try: + os.makedirs(working_dir) + except OSError as exc: + if exc.errno == 17: + pass # directory already exists + else: + raise + translated_paths = [] + for path in (p for p in downloaded_paths if p.endswith(".TIF")): + output_img_filename = 'translated_{}' + os.path.basename(path) + output_path = os.path.join(working_dir, output_img_filename) + command = self._get_command(source=path, destination=output_path) + log.info("The complete GDAL translate " + "command is: {}".format(command)) + b_o = BashOperator( + task_id="bash_operator_translate_scenes", + bash_command=command ) - create_translated_dir.execute(context) - for scene in scenes: - if scene.endswith(".TIF"): - output_img_filename = 'translated_' + str(scene) - gdaltranslate_command = ( - 'gdal_translate ' + self.ot + self.of + self.b + - self.mask + self.outsize + self.scale + self.tiled + - self.blockx_size + self.blocky_size + self.compress + - self.photometric + os.path.join(scene_fullpath, scene) + - ' ' + os.path.join( - scene_fullpath+"__translated", - output_img_filename - ) - ) - log.info( - 'The complete GDAL translate command ' - 'is: %s', gdaltranslate_command - ) - b_o = BashOperator( - task_id="bash_operator_translate_scenes", - bash_command=gdaltranslate_command - ) - b_o.execute(context) - task_instance.xcom_push( - key='translated_scenes_dir', - value=os.path.join( - scene_fullpath+"__translated", - output_img_filename + b_o.execute(context) + translated_paths.append(output_path) + return translated_paths + + def _get_command(self, source, destination): + return ( + "gdal_translate -ot {output_type} {creation_opts} " + "{src} {dst}".format( + output_type=self.output_type, + creation_opts=self._get_gdal_creation_options(), + src=source, + dst=destination ) ) - task_instance.xcom_push(key='product_dir', value=scene_fullpath) - inner_tiffs = os.path.join(scene_fullpath, "*.TIF") - # rm_cmd = "rm -r {}".format(inner_tiffs) - # delete_b_o = BashOperator( - # task_id="bash_operator_delete_scenes", - # bash_command = rm_cmd - # ) - # delete_b_o.execute(context) - return True + + def _get_gdal_creation_options(self): + result = "" + for name, value in self.creation_options.items(): + opt = "{}={}".format(name.upper(), value) + " ".join((result, opt)) + return result class GDALPlugin(AirflowPlugin): diff --git a/airflow/plugins/search_download_daraa_plugin.py b/airflow/plugins/search_download_daraa_plugin.py index f69fa4f..3e473be 100644 --- a/airflow/plugins/search_download_daraa_plugin.py +++ b/airflow/plugins/search_download_daraa_plugin.py @@ -1,4 +1,3 @@ -from datetime import datetime from datetime import timedelta from itertools import chain import logging @@ -16,79 +15,58 @@ class Landsat8SearchOperator(BaseOperator): @apply_defaults - def __init__(self, cloud_coverage, path, row, pgdbname, pghostname, - pgport, pgusername, pgpassword, processing_level="L1TP", - *args, **kwargs): - self.cloud_coverage = cloud_coverage - # self.acquisition_date = str(acquisition_date) - self.path = path - self.row = row - self.processing_level = processing_level - self.pgdbname = pgdbname - self.pghostname = pghostname - self.pgport = pgport - self.pgusername = pgusername - self.pgpassword = pgpassword - print("Initialization of Daraa Landsat8SearchOperator ...") + def __init__(self, area, cloud_coverage, db_credentials, *args, **kwargs): super(Landsat8SearchOperator, self).__init__(*args, **kwargs) - - def execute(self, context): - log.info(context) - log.info("#####################") - log.info("## LANDSAT8 Search ##") - log.info('Cloud Coverage <= % : %f', self.cloud_coverage) - # log.info('Acquisition Date : %s', self.acquisition_date) - log.info('Path : %d', self.path) - log.info('Row : %d', self.row) - log.info('Processing Level: %s', self.processing_level) - print("Executing Landsat8SearchOperator .. ") + self.area = area + self.cloud_coverage = cloud_coverage + self.db_credentials = dict(db_credentials) - db = psycopg2.connect( - "dbname='{}' user='{}' host='{}' password='{}'".format( - self.pgdbname, self.pgusername, self.pghostname, - self.pgpassword - ) - ) - cursor = db.cursor() - sql_stmt = ( - 'select productId, entityId, download_url ' - 'from scene_list ' - 'where cloudCover < {} and path = {} and row = {} ' - 'order by acquisitionDate desc ' - 'limit 1'.format(self.cloud_coverage, self.path, self.row) + def execute(self, context): + connection = psycopg2.connect( + dbname=self.db_credentials["dbname"], + user=self.db_credentials["username"], + password=self.db_credentials["password"], + host=self.db_credentials["hostname"], + port=self.db_credentials["port"], ) - cursor.execute(sql_stmt) - result_set = cursor.fetchall() - print(result_set) - log.info( - "Found {} product with {} scene id, available for download " - "through {} ".format(result_set[0][0], - result_set[0][1], - result_set[0][2]) + cursor = connection.cursor() + query = ( + "SELECT productId, entityId, download_url " + "FROM scene_list " + "WHERE cloudCover < %s AND path = %s AND row = %s " + "ORDER BY acquisitionDate DESC " + "LIMIT 1;" ) - context['task_instance'].xcom_push( - key='searched_products', value=result_set[0]) - return True + data = (self.cloud_coverage, self.area.path, self.area.row) + cursor.execute(query, data) + try: + product_id, entity_id, download_url = cursor.fetchone() + log.info( + "Found {} product with {} scene id, available for download " + "through {} ".format(product_id, entity_id, download_url) + ) + except TypeError: + log.error( + "Could not find any product for the {} area".format(self.area)) + else: + return (product_id, entity_id, download_url) class Landsat8DownloadOperator(BaseOperator): @apply_defaults - def __init__(self, download_dir, bands=None, - download_timeout=timedelta(hours=1), - *args, **kwargs): + def __init__(self, download_dir, area, get_inputs_from, + download_timeout=timedelta(hours=1), *args, **kwargs): super(Landsat8DownloadOperator, self).__init__( execution_timeout=download_timeout, *args, **kwargs) self.download_dir = download_dir - self.bands = [int(b) for b in bands] if bands is not None else [1] + self.area = area + self.get_inputs_from = get_inputs_from def execute(self, context): - task_inputs = context["task_instance"].xcom_pull( - 'landsat8_search_daraa_task', - key='searched_products' - ) + task_inputs = context["task_instance"].xcom_pull(self.get_inputs_from) product_id, entity_id, download_url = task_inputs - target_dir = self.download_dir + entity_id + target_dir = os.path.join(self.download_dir, entity_id) try: os.makedirs(target_dir) except OSError as exc: @@ -99,23 +77,23 @@ def execute(self, context): ("B{}.TIF".format(i) for i in self.bands) ) # TODO: Download files in parallel instead of sequentially + downloaded = [] for ending in file_endings: url = download_url.replace( "index.html", "{id}_{ending}".format(id=product_id, ending=ending) ) target_path = os.path.join( - self.download_dir, entity_id, + target_dir, "{}_{}".format(product_id, ending) ) try: urllib.urlretrieve(url, target_path) + downloaded.append(target_path) except Exception: log.exception( msg="EXCEPTION: ### Error downloading {}".format(ending)) - context['task_instance'].xcom_push(key='scene_fullpath', - value=target_dir) - return True + return downloaded class SearchDownloadDaraaPlugin(AirflowPlugin):