From deb3b3612265ee4245304cdb7eddef321181b0c4 Mon Sep 17 00:00:00 2001 From: Ricardo Garcia Silva Date: Tue, 12 Sep 2017 17:55:34 +0100 Subject: [PATCH] fixed bugs in the Landsat8 DAG --- .../dags/landsat8/search_download_daraa.py | 41 ++++++++++++------- airflow/plugins/landsat8_metadata_plugin.py | 27 +++++------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/airflow/dags/landsat8/search_download_daraa.py b/airflow/dags/landsat8/search_download_daraa.py index 0da23c2..55ad625 100644 --- a/airflow/dags/landsat8/search_download_daraa.py +++ b/airflow/dags/landsat8/search_download_daraa.py @@ -1,7 +1,7 @@ from collections import namedtuple from datetime import datetime from datetime import timedelta -import logging +import os from airflow.models import DAG from airflow.operators import GDALAddoOperator @@ -15,6 +15,18 @@ from landsat8.secrets import postgresql_credentials +# These ought to be moved to a more central place where other settings might +# be stored +PROJECT_ROOT = os.path.dirname( + os.path.dirname( + os.path.dirname( + os.path.dirname(__file__) + ) + ) +) +DOWNLOAD_DIR = os.path.join(os.path.expanduser("~"), "download") +TEMPLATES_PATH = os.path.join(PROJECT_ROOT, "metadata-ingestion", "templates") + Landsat8Area = namedtuple("Landsat8Area", [ "name", "path", @@ -23,6 +35,15 @@ ]) +AREAS = [ + Landsat8Area(name="daraa", path=174, row=37, bands=[1, 2, 3]), + # These are just some dummy areas in order to test generation of + # multiple DAGs + Landsat8Area(name="neighbour", path=175, row=37, bands=[1, 2, 3, 7]), + Landsat8Area(name="other", path=176, row=37, bands=range(1, 12)), +] + + def generate_dag(area, download_dir, default_args): """Generate Landsat8 ingestion DAGs. @@ -56,9 +77,8 @@ def generate_dag(area, download_dir, default_args): ) generate_html_description = Landsat8ProductDescriptionOperator( task_id='generate_html_description', - description_template="./geo-solutions-work/evo-odas/" - "metadata-ingestion/templates/" - "product_abstract.html", + description_template=os.path.join( + TEMPLATES_PATH, "product_abstract.html"), download_dir=download_dir, dag=dag ) @@ -66,7 +86,7 @@ def generate_dag(area, download_dir, default_args): task_id="download_thumbnail", download_dir=download_dir, get_inputs_from=search_task.task_id, - url_fragment="thumb_smal.jpg", + url_fragment="thumb_small.jpg", dag=dag ) generate_thumbnail = Landsat8ThumbnailOperator( @@ -88,8 +108,7 @@ def generate_dag(area, download_dir, default_args): get_inputs_from=download_metadata.task_id, loc_base_dir='/efs/geoserver_data/coverages/landsat8/{}'.format( area.name), - metadata_xml_path='./geo-solutions-work/evo-odas/metadata-ingestion/' - 'templates/metadata.xml', + metadata_xml_path=os.path.join(TEMPLATES_PATH, "metadata.xml"), dag=dag ) @@ -140,14 +159,8 @@ def generate_dag(area, download_dir, default_args): return dag -AREAS = [ - Landsat8Area(name="daraa", path=174, row=37, bands=[1, 2, 3]), - Landsat8Area(name="neighbour", path=175, row=37, bands=[1, 2, 3, 7]), - Landsat8Area(name="other", path=176, row=37, bands=range(1, 12)), -] - for area in AREAS: - dag = generate_dag(area, download_dir="/var/data/download", default_args={ + dag = generate_dag(area, download_dir=DOWNLOAD_DIR, default_args={ 'start_date': datetime(2017, 1, 1), 'owner': 'airflow', 'depends_on_past': False, diff --git a/airflow/plugins/landsat8_metadata_plugin.py b/airflow/plugins/landsat8_metadata_plugin.py index 349a2f0..88f12e6 100644 --- a/airflow/plugins/landsat8_metadata_plugin.py +++ b/airflow/plugins/landsat8_metadata_plugin.py @@ -1,5 +1,4 @@ from collections import namedtuple -from collections import Iterable import json import logging import os @@ -7,11 +6,10 @@ import shutil import zipfile -from airflow.operators import BaseOperator, BashOperator +from airflow.operators import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults -from pgmagick import Image, Geometry -from jinja2 import Environment, FileSystemLoader, Template +from pgmagick import Image log = logging.getLogger(__name__) pp = pprint.PrettyPrinter(indent=2) @@ -29,8 +27,8 @@ ]) - def parse_mtl_data(buffer): + """Parse input file-like object that contains metadata in MTL format.""" metadata = {} current = metadata previous = metadata @@ -75,7 +73,6 @@ def get_bounding_box(product_metadata): def prepare_metadata(metadata, bounding_box): - #bbox = get_bounding_box(metadata["PRODUCT_METADATA"]) return { "type": "Feature", "geometry": { @@ -183,7 +180,6 @@ def execute(self, context): bounding_box = get_bounding_box(parsed_metadata["PRODUCT_METADATA"]) prepared_metadata = prepare_metadata(parsed_metadata, bounding_box) product_directory, mtl_name = os.path.split(mtl_path) - # FIXME: location is probably wrong location = os.path.join(self.loc_base_dir, product_directory, mtl_name) granules_dict = prepare_granules(bounding_box, location) json_path = os.path.join(product_directory, "product.json") @@ -193,7 +189,6 @@ def execute(self, context): json.dump(prepared_metadata, out_json_fh) with open(granules_path, 'w') as out_granules_fh: json.dump(granules_dict, out_granules_fh) - # FIXME: This line below seems to be out of place here shutil.copyfile(self.metadata_xml_path, xml_template_path) return json_path, granules_path, xml_template_path @@ -215,6 +210,7 @@ def __init__(self, get_inputs_from, thumb_size_x, thumb_size_y, def execute(self, context): downloaded_thumbnail = context["task_instance"].xcom_pull( self.get_inputs_from) + log.info("downloaded_thumbnail: {}".format(downloaded_thumbnail)) img = Image(downloaded_thumbnail) least_dim = min(int(img.columns()), int(img.rows())) img.crop("{dim}x{dim}".format(dim=least_dim)) @@ -239,10 +235,7 @@ def __init__(self, description_template, download_dir, *args, **kwargs): def execute(self, context): output_path = os.path.join(self.download_dir, "description.html") - try: - shutil.copyfile(self.description_template, output_path) - except Exception: - print("Couldn't find description.html") + shutil.copyfile(self.description_template, output_path) return output_path @@ -261,15 +254,15 @@ def execute(self, context): paths_to_zip = [] for input_provider in self.get_inputs_from: inputs = context["task_instance"].xcom_pull(input_provider) - if isinstance(inputs, Iterable): - paths_to_zip.extend(inputs) - else: + if isinstance(inputs, basestring): paths_to_zip.append(inputs) - log.info(paths_to_zip) + else: # the Landsat8MTLReaderOperator returns a tuple of strings + paths_to_zip.extend(inputs) + log.info("paths_to_zip: {}".format(paths_to_zip)) output_path = os.path.join(self.output_dir, "product.zip") with zipfile.ZipFile(output_path, "w") as zip_handler: for path in paths_to_zip: - zip_handler.write(os.path.basename(path)) + zip_handler.write(path, os.path.basename(path)) return output_path