Skip to content

Commit

Permalink
Merge pull request geosolutions-it#161 from ricardogsilva/127-ingest-…
Browse files Browse the repository at this point in the history
…more-bands-for-l8-products

127 ingest more bands for l8 products
  • Loading branch information
randomorder authored Oct 2, 2017
2 parents a546c7e + deb3b36 commit 6294ca3
Show file tree
Hide file tree
Showing 6 changed files with 770 additions and 620 deletions.
120 changes: 55 additions & 65 deletions airflow/dags/landsat8/landsat_update_scene_list.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,62 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators import BaseOperator, BashOperator, DownloadSceneList, ExtractSceneList, UpdateSceneList
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from landsat8.secrets import postgresql_credentials

##################################################
# General and shared configuration between tasks
##################################################
update_scene_list_default_args = {
'start_date': datetime(2017, 1, 1),
'owner': 'airflow',
'depends_on_past': False,
'provide_context': True,
'email': ['xyz@xyz.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'max_threads': 1,
'download_dir': '/var/data/download/'
}

######################################################
# Task specific configuration
######################################################
download_scene_list_args = {
'download_url':'http://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz'
}
from getpass import getuser
import os

update_scene_list_args = {\
'pg_dbname' : postgresql_credentials['dbname'],
'pg_hostname' : postgresql_credentials['hostname'],
'pg_password' : postgresql_credentials['password'],
'pg_username' : postgresql_credentials['username'],
'pg_port' : postgresql_credentials['port']
}
from airflow import DAG
from airflow.operators import DownloadSceneList
from airflow.operators import ExtractSceneList
from airflow.operators import UpdateSceneList

#######################################################
# DAG definition
#######################################################
landsat8_scene_list = DAG('Landsat8_Scene_List',
description='DAG for downloading, extracting, and importing scene_list.gz into postgres db',
default_args=update_scene_list_default_args,
dagrun_timeout=timedelta(hours=1),
schedule_interval=timedelta(days=1),
catchup=False)
from landsat8.secrets import postgresql_credentials

######################################################
# Tasks difinition
######################################################
# These ought to be moved to a more central place where other settings might
# be stored
DOWNLOAD_URL = 'http://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz'
DOWNLOAD_DIR = os.path.join(os.path.expanduser("~"), "download")

landsat8_scene_list = DAG(
'Landsat8_Scene_List',
description='DAG for downloading, extracting, and importing scene_list.gz '
'into postgres db',
default_args={
"start_date": datetime(2017, 1, 1),
"owner": getuser(),
"depends_on_past": False,
"provide_context": True,
"email": ["xyz@xyz.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0, # TODO: change back to 1
"max_threads": 1,
"download_dir": DOWNLOAD_DIR,
"download_url": DOWNLOAD_URL,
},
dagrun_timeout=timedelta(hours=1),
schedule_interval=timedelta(days=1),
catchup=False
)

# more info on Landsat products on AWS at:
# https://aws.amazon.com/public-datasets/landsat/
download_scene_list_gz = DownloadSceneList(
task_id= 'download_scene_list_gz_task',
download_url = download_scene_list_args['download_url'],
#download_dir = download_scene_list_args['download_dir'],
dag = landsat8_scene_list)
task_id='download_scene_list_gz',
dag=landsat8_scene_list
)

extract_scene_list = ExtractSceneList(
task_id = 'extract_scene_list_task',
#download_dir = extract_scene_list_args['download_dir'] ,
dag = landsat8_scene_list)

update_scene_list_db = UpdateSceneList(\
task_id = 'update_scene_list_task',
pg_dbname = update_scene_list_args['pg_dbname'] ,
pg_hostname = update_scene_list_args['pg_hostname'],
pg_port = update_scene_list_args['pg_port'],
pg_username = update_scene_list_args['pg_username'],
pg_password = update_scene_list_args['pg_password'],
dag = landsat8_scene_list )

download_scene_list_gz >> extract_scene_list >> update_scene_list_db
task_id='extract_scene_list',
dag=landsat8_scene_list
)

update_scene_list_db = UpdateSceneList(
task_id='update_scene_list',
pg_dbname=postgresql_credentials['dbname'],
pg_hostname=postgresql_credentials['hostname'],
pg_port=postgresql_credentials['port'],
pg_username=postgresql_credentials['username'],
pg_password=postgresql_credentials['password'],
dag=landsat8_scene_list
)

download_scene_list_gz.set_downstream(extract_scene_list)
extract_scene_list.set_downstream(update_scene_list_db)
Loading

0 comments on commit 6294ca3

Please sign in to comment.