Skip to content

Commit

Permalink
fixed bugs in the Landsat8 DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogsilva committed Sep 12, 2017
1 parent dd9d083 commit deb3b36
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
41 changes: 27 additions & 14 deletions airflow/dags/landsat8/search_download_daraa.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from collections import namedtuple
from datetime import datetime
from datetime import timedelta
import logging
import os

from airflow.models import DAG
from airflow.operators import GDALAddoOperator
Expand All @@ -15,6 +15,18 @@

from landsat8.secrets import postgresql_credentials

# These ought to be moved to a more central place where other settings might
# be stored
PROJECT_ROOT = os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.dirname(__file__)
)
)
)
DOWNLOAD_DIR = os.path.join(os.path.expanduser("~"), "download")
TEMPLATES_PATH = os.path.join(PROJECT_ROOT, "metadata-ingestion", "templates")

Landsat8Area = namedtuple("Landsat8Area", [
"name",
"path",
Expand All @@ -23,6 +35,15 @@
])


AREAS = [
Landsat8Area(name="daraa", path=174, row=37, bands=[1, 2, 3]),
# These are just some dummy areas in order to test generation of
# multiple DAGs
Landsat8Area(name="neighbour", path=175, row=37, bands=[1, 2, 3, 7]),
Landsat8Area(name="other", path=176, row=37, bands=range(1, 12)),
]


def generate_dag(area, download_dir, default_args):
"""Generate Landsat8 ingestion DAGs.
Expand Down Expand Up @@ -56,17 +77,16 @@ def generate_dag(area, download_dir, default_args):
)
generate_html_description = Landsat8ProductDescriptionOperator(
task_id='generate_html_description',
description_template="./geo-solutions-work/evo-odas/"
"metadata-ingestion/templates/"
"product_abstract.html",
description_template=os.path.join(
TEMPLATES_PATH, "product_abstract.html"),
download_dir=download_dir,
dag=dag
)
download_thumbnail = Landsat8DownloadOperator(
task_id="download_thumbnail",
download_dir=download_dir,
get_inputs_from=search_task.task_id,
url_fragment="thumb_smal.jpg",
url_fragment="thumb_small.jpg",
dag=dag
)
generate_thumbnail = Landsat8ThumbnailOperator(
Expand All @@ -88,8 +108,7 @@ def generate_dag(area, download_dir, default_args):
get_inputs_from=download_metadata.task_id,
loc_base_dir='/efs/geoserver_data/coverages/landsat8/{}'.format(
area.name),
metadata_xml_path='./geo-solutions-work/evo-odas/metadata-ingestion/'
'templates/metadata.xml',
metadata_xml_path=os.path.join(TEMPLATES_PATH, "metadata.xml"),
dag=dag
)

Expand Down Expand Up @@ -140,14 +159,8 @@ def generate_dag(area, download_dir, default_args):
return dag


AREAS = [
Landsat8Area(name="daraa", path=174, row=37, bands=[1, 2, 3]),
Landsat8Area(name="neighbour", path=175, row=37, bands=[1, 2, 3, 7]),
Landsat8Area(name="other", path=176, row=37, bands=range(1, 12)),
]

for area in AREAS:
dag = generate_dag(area, download_dir="/var/data/download", default_args={
dag = generate_dag(area, download_dir=DOWNLOAD_DIR, default_args={
'start_date': datetime(2017, 1, 1),
'owner': 'airflow',
'depends_on_past': False,
Expand Down
27 changes: 10 additions & 17 deletions airflow/plugins/landsat8_metadata_plugin.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
from collections import namedtuple
from collections import Iterable
import json
import logging
import os
import pprint
import shutil
import zipfile

from airflow.operators import BaseOperator, BashOperator
from airflow.operators import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from pgmagick import Image, Geometry
from jinja2 import Environment, FileSystemLoader, Template
from pgmagick import Image

log = logging.getLogger(__name__)
pp = pprint.PrettyPrinter(indent=2)
Expand All @@ -29,8 +27,8 @@
])



def parse_mtl_data(buffer):
"""Parse input file-like object that contains metadata in MTL format."""
metadata = {}
current = metadata
previous = metadata
Expand Down Expand Up @@ -75,7 +73,6 @@ def get_bounding_box(product_metadata):


def prepare_metadata(metadata, bounding_box):
#bbox = get_bounding_box(metadata["PRODUCT_METADATA"])
return {
"type": "Feature",
"geometry": {
Expand Down Expand Up @@ -183,7 +180,6 @@ def execute(self, context):
bounding_box = get_bounding_box(parsed_metadata["PRODUCT_METADATA"])
prepared_metadata = prepare_metadata(parsed_metadata, bounding_box)
product_directory, mtl_name = os.path.split(mtl_path)
# FIXME: location is probably wrong
location = os.path.join(self.loc_base_dir, product_directory, mtl_name)
granules_dict = prepare_granules(bounding_box, location)
json_path = os.path.join(product_directory, "product.json")
Expand All @@ -193,7 +189,6 @@ def execute(self, context):
json.dump(prepared_metadata, out_json_fh)
with open(granules_path, 'w') as out_granules_fh:
json.dump(granules_dict, out_granules_fh)
# FIXME: This line below seems to be out of place here
shutil.copyfile(self.metadata_xml_path, xml_template_path)
return json_path, granules_path, xml_template_path

Expand All @@ -215,6 +210,7 @@ def __init__(self, get_inputs_from, thumb_size_x, thumb_size_y,
def execute(self, context):
downloaded_thumbnail = context["task_instance"].xcom_pull(
self.get_inputs_from)
log.info("downloaded_thumbnail: {}".format(downloaded_thumbnail))
img = Image(downloaded_thumbnail)
least_dim = min(int(img.columns()), int(img.rows()))
img.crop("{dim}x{dim}".format(dim=least_dim))
Expand All @@ -239,10 +235,7 @@ def __init__(self, description_template, download_dir, *args, **kwargs):

def execute(self, context):
output_path = os.path.join(self.download_dir, "description.html")
try:
shutil.copyfile(self.description_template, output_path)
except Exception:
print("Couldn't find description.html")
shutil.copyfile(self.description_template, output_path)
return output_path


Expand All @@ -261,15 +254,15 @@ def execute(self, context):
paths_to_zip = []
for input_provider in self.get_inputs_from:
inputs = context["task_instance"].xcom_pull(input_provider)
if isinstance(inputs, Iterable):
paths_to_zip.extend(inputs)
else:
if isinstance(inputs, basestring):
paths_to_zip.append(inputs)
log.info(paths_to_zip)
else: # the Landsat8MTLReaderOperator returns a tuple of strings
paths_to_zip.extend(inputs)
log.info("paths_to_zip: {}".format(paths_to_zip))
output_path = os.path.join(self.output_dir, "product.zip")
with zipfile.ZipFile(output_path, "w") as zip_handler:
for path in paths_to_zip:
zip_handler.write(os.path.basename(path))
zip_handler.write(path, os.path.basename(path))
return output_path


Expand Down

0 comments on commit deb3b36

Please sign in to comment.