Skip to content

Commit

Permalink
refactoring custom operators
Browse files Browse the repository at this point in the history
- Switched to using XComs on the operator return values
- Removed some unused operator parameters in order to simplify the code
  • Loading branch information
ricardogsilva committed Sep 5, 2017
1 parent dd4f1cf commit 508c97b
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 142 deletions.
131 changes: 52 additions & 79 deletions airflow/plugins/gdal_plugin.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import json
import logging
import math
import os
import shutil
import zipfile

from airflow.operators import BashOperator
from airflow.operators import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from pgmagick import Image, Geometry

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -245,86 +240,64 @@ def execute(self, context):
class GDALTranslateOperator(BaseOperator):

@apply_defaults
def __init__(self, tiled=None, working_dir=None, blockx_size=None,
blocky_size=None, compress=None, photometric=None,
ot=None, of=None, b=None, mask=None, outsize=None,
scale=None, *args, **kwargs):

self.working_dir = working_dir
self.tiled = (
' -co "TILED=' + tiled + '" ' if tiled else ' ')
self.blockx_size = (
' -co "BLOCKXSIZE=' + blockx_size + '" ' if blockx_size else ' ')
self.blocky_size = (
' -co "BLOCKYSIZE=' + blocky_size + '" ' if blocky_size else ' ')
self.compress = (
' -co "COMPRESS=' + compress + '"' if compress else ' ')
self.photometric = (
' -co "PHOTOMETRIC=' + photometric + '" ' if photometric else ' ')
self.ot = ' -ot ' + str(ot) if ot else ''
self.of = ' -of ' + str(of) if of else ''
self.b = ' -b ' + str(b) if b else ''
self.mask = '-mask ' + str(mask) if mask else ''
self.outsize = '-outsize ' + str(outsize) if outsize else ''
self.scale = ' -scale ' + str(scale) if scale else ''

log.info(
'--------------------GDAL_PLUGIN Translate initiated------------')
def __init__(self, get_inputs_from, output_type="UInt16",
creation_options=None, *args, **kwargs):
super(GDALTranslateOperator, self).__init__(*args, **kwargs)
self.get_inputs_from = get_inputs_from
self.output_type = str(output_type)
self.creation_options = dict(
creation_options) if creation_options is not None else {
"tiled": True,
"blockxsize": 512,
"blockysize": 512,
}

def execute(self, context):
log.info(
'--------------------GDAL_PLUGIN Translate running------------')
task_instance = context['task_instance']
log.info("GDAL Translate Operator params list")
log.info('Working dir: %s', self.working_dir)
scene_fullpath = context['task_instance'].xcom_pull(
'landsat8_download_daraa_task', key='scene_fullpath')
scenes = os.listdir(scene_fullpath)
if not os.path.exists(scene_fullpath+"__translated"):
create_translated_dir = BashOperator(
task_id="bash_operator_translate",
bash_command="mkdir {}".format(scene_fullpath+"__translated")
downloaded_paths = context["task_instance"].xcom_pull(
self.get_inputs_from)
working_dir = os.path.join(
os.path.dirname(downloaded_paths[0]),
"__translated"
)
try:
os.makedirs(working_dir)
except OSError as exc:
if exc.errno == 17:
pass # directory already exists
else:
raise
translated_paths = []
for path in (p for p in downloaded_paths if p.endswith(".TIF")):
output_img_filename = 'translated_{}' + os.path.basename(path)
output_path = os.path.join(working_dir, output_img_filename)
command = self._get_command(source=path, destination=output_path)
log.info("The complete GDAL translate "
"command is: {}".format(command))
b_o = BashOperator(
task_id="bash_operator_translate_scenes",
bash_command=command
)
create_translated_dir.execute(context)
for scene in scenes:
if scene.endswith(".TIF"):
output_img_filename = 'translated_' + str(scene)
gdaltranslate_command = (
'gdal_translate ' + self.ot + self.of + self.b +
self.mask + self.outsize + self.scale + self.tiled +
self.blockx_size + self.blocky_size + self.compress +
self.photometric + os.path.join(scene_fullpath, scene) +
' ' + os.path.join(
scene_fullpath+"__translated",
output_img_filename
)
)
log.info(
'The complete GDAL translate command '
'is: %s', gdaltranslate_command
)
b_o = BashOperator(
task_id="bash_operator_translate_scenes",
bash_command=gdaltranslate_command
)
b_o.execute(context)
task_instance.xcom_push(
key='translated_scenes_dir',
value=os.path.join(
scene_fullpath+"__translated",
output_img_filename
b_o.execute(context)
translated_paths.append(output_path)
return translated_paths

def _get_command(self, source, destination):
return (
"gdal_translate -ot {output_type} {creation_opts} "
"{src} {dst}".format(
output_type=self.output_type,
creation_opts=self._get_gdal_creation_options(),
src=source,
dst=destination
)
)
task_instance.xcom_push(key='product_dir', value=scene_fullpath)
inner_tiffs = os.path.join(scene_fullpath, "*.TIF")
# rm_cmd = "rm -r {}".format(inner_tiffs)
# delete_b_o = BashOperator(
# task_id="bash_operator_delete_scenes",
# bash_command = rm_cmd
# )
# delete_b_o.execute(context)
return True

def _get_gdal_creation_options(self):
result = ""
for name, value in self.creation_options.items():
opt = "{}={}".format(name.upper(), value)
" ".join((result, opt))
return result


class GDALPlugin(AirflowPlugin):
Expand Down
104 changes: 41 additions & 63 deletions airflow/plugins/search_download_daraa_plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
from datetime import timedelta
from itertools import chain
import logging
Expand All @@ -16,79 +15,58 @@
class Landsat8SearchOperator(BaseOperator):

@apply_defaults
def __init__(self, cloud_coverage, path, row, pgdbname, pghostname,
pgport, pgusername, pgpassword, processing_level="L1TP",
*args, **kwargs):
self.cloud_coverage = cloud_coverage
# self.acquisition_date = str(acquisition_date)
self.path = path
self.row = row
self.processing_level = processing_level
self.pgdbname = pgdbname
self.pghostname = pghostname
self.pgport = pgport
self.pgusername = pgusername
self.pgpassword = pgpassword
print("Initialization of Daraa Landsat8SearchOperator ...")
def __init__(self, area, cloud_coverage, db_credentials, *args, **kwargs):
super(Landsat8SearchOperator, self).__init__(*args, **kwargs)

def execute(self, context):
log.info(context)
log.info("#####################")
log.info("## LANDSAT8 Search ##")
log.info('Cloud Coverage <= % : %f', self.cloud_coverage)
# log.info('Acquisition Date : %s', self.acquisition_date)
log.info('Path : %d', self.path)
log.info('Row : %d', self.row)
log.info('Processing Level: %s', self.processing_level)
print("Executing Landsat8SearchOperator .. ")
self.area = area
self.cloud_coverage = cloud_coverage
self.db_credentials = dict(db_credentials)

db = psycopg2.connect(
"dbname='{}' user='{}' host='{}' password='{}'".format(
self.pgdbname, self.pgusername, self.pghostname,
self.pgpassword
)
)
cursor = db.cursor()
sql_stmt = (
'select productId, entityId, download_url '
'from scene_list '
'where cloudCover < {} and path = {} and row = {} '
'order by acquisitionDate desc '
'limit 1'.format(self.cloud_coverage, self.path, self.row)
def execute(self, context):
connection = psycopg2.connect(
dbname=self.db_credentials["dbname"],
user=self.db_credentials["username"],
password=self.db_credentials["password"],
host=self.db_credentials["hostname"],
port=self.db_credentials["port"],
)
cursor.execute(sql_stmt)
result_set = cursor.fetchall()
print(result_set)
log.info(
"Found {} product with {} scene id, available for download "
"through {} ".format(result_set[0][0],
result_set[0][1],
result_set[0][2])
cursor = connection.cursor()
query = (
"SELECT productId, entityId, download_url "
"FROM scene_list "
"WHERE cloudCover < %s AND path = %s AND row = %s "
"ORDER BY acquisitionDate DESC "
"LIMIT 1;"
)
context['task_instance'].xcom_push(
key='searched_products', value=result_set[0])
return True
data = (self.cloud_coverage, self.area.path, self.area.row)
cursor.execute(query, data)
try:
product_id, entity_id, download_url = cursor.fetchone()
log.info(
"Found {} product with {} scene id, available for download "
"through {} ".format(product_id, entity_id, download_url)
)
except TypeError:
log.error(
"Could not find any product for the {} area".format(self.area))
else:
return (product_id, entity_id, download_url)


class Landsat8DownloadOperator(BaseOperator):

@apply_defaults
def __init__(self, download_dir, bands=None,
download_timeout=timedelta(hours=1),
*args, **kwargs):
def __init__(self, download_dir, area, get_inputs_from,
download_timeout=timedelta(hours=1), *args, **kwargs):
super(Landsat8DownloadOperator, self).__init__(
execution_timeout=download_timeout, *args, **kwargs)
self.download_dir = download_dir
self.bands = [int(b) for b in bands] if bands is not None else [1]
self.area = area
self.get_inputs_from = get_inputs_from

def execute(self, context):
task_inputs = context["task_instance"].xcom_pull(
'landsat8_search_daraa_task',
key='searched_products'
)
task_inputs = context["task_instance"].xcom_pull(self.get_inputs_from)
product_id, entity_id, download_url = task_inputs
target_dir = self.download_dir + entity_id
target_dir = os.path.join(self.download_dir, entity_id)
try:
os.makedirs(target_dir)
except OSError as exc:
Expand All @@ -99,23 +77,23 @@ def execute(self, context):
("B{}.TIF".format(i) for i in self.bands)
)
# TODO: Download files in parallel instead of sequentially
downloaded = []
for ending in file_endings:
url = download_url.replace(
"index.html",
"{id}_{ending}".format(id=product_id, ending=ending)
)
target_path = os.path.join(
self.download_dir, entity_id,
target_dir,
"{}_{}".format(product_id, ending)
)
try:
urllib.urlretrieve(url, target_path)
downloaded.append(target_path)
except Exception:
log.exception(
msg="EXCEPTION: ### Error downloading {}".format(ending))
context['task_instance'].xcom_push(key='scene_fullpath',
value=target_dir)
return True
return downloaded


class SearchDownloadDaraaPlugin(AirflowPlugin):
Expand Down

0 comments on commit 508c97b

Please sign in to comment.