Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

127 ingest more bands for l8 products #161

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Custom
.idea
gdal*
airflow.db
airflow.cfg
unittests.cfg
Expand Down
120 changes: 55 additions & 65 deletions airflow/dags/landsat8/landsat_update_scene_list.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,62 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators import BaseOperator, BashOperator, DownloadSceneList, ExtractSceneList, UpdateSceneList
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from landsat8.secrets import postgresql_credentials

##################################################
# General and shared configuration between tasks
##################################################
update_scene_list_default_args = {
'start_date': datetime(2017, 1, 1),
'owner': 'airflow',
'depends_on_past': False,
'provide_context': True,
'email': ['xyz@xyz.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'max_threads': 1,
'download_dir': '/var/data/download/'
}

######################################################
# Task specific configuration
######################################################
download_scene_list_args = {
'download_url':'http://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz'
}
from getpass import getuser
import os

update_scene_list_args = {\
'pg_dbname' : postgresql_credentials['dbname'],
'pg_hostname' : postgresql_credentials['hostname'],
'pg_password' : postgresql_credentials['password'],
'pg_username' : postgresql_credentials['username'],
'pg_port' : postgresql_credentials['port']
}
from airflow import DAG
from airflow.operators import DownloadSceneList
from airflow.operators import ExtractSceneList
from airflow.operators import UpdateSceneList

#######################################################
# DAG definition
#######################################################
landsat8_scene_list = DAG('Landsat8_Scene_List',
description='DAG for downloading, extracting, and importing scene_list.gz into postgres db',
default_args=update_scene_list_default_args,
dagrun_timeout=timedelta(hours=1),
schedule_interval=timedelta(days=1),
catchup=False)
from landsat8.secrets import postgresql_credentials

######################################################
# Tasks difinition
######################################################
# These ought to be moved to a more central place where other settings might
# be stored
DOWNLOAD_URL = 'http://landsat-pds.s3.amazonaws.com/c1/L8/scene_list.gz'
DOWNLOAD_DIR = os.path.join(os.path.expanduser("~"), "download")

landsat8_scene_list = DAG(
'Landsat8_Scene_List',
description='DAG for downloading, extracting, and importing scene_list.gz '
'into postgres db',
default_args={
"start_date": datetime(2017, 1, 1),
"owner": getuser(),
"depends_on_past": False,
"provide_context": True,
"email": ["xyz@xyz.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0, # TODO: change back to 1
"max_threads": 1,
"download_dir": DOWNLOAD_DIR,
"download_url": DOWNLOAD_URL,
},
dagrun_timeout=timedelta(hours=1),
schedule_interval=timedelta(days=1),
catchup=False
)

# more info on Landsat products on AWS at:
# https://aws.amazon.com/public-datasets/landsat/
download_scene_list_gz = DownloadSceneList(
task_id= 'download_scene_list_gz_task',
download_url = download_scene_list_args['download_url'],
#download_dir = download_scene_list_args['download_dir'],
dag = landsat8_scene_list)
task_id='download_scene_list_gz',
dag=landsat8_scene_list
)

extract_scene_list = ExtractSceneList(
task_id = 'extract_scene_list_task',
#download_dir = extract_scene_list_args['download_dir'] ,
dag = landsat8_scene_list)

update_scene_list_db = UpdateSceneList(\
task_id = 'update_scene_list_task',
pg_dbname = update_scene_list_args['pg_dbname'] ,
pg_hostname = update_scene_list_args['pg_hostname'],
pg_port = update_scene_list_args['pg_port'],
pg_username = update_scene_list_args['pg_username'],
pg_password = update_scene_list_args['pg_password'],
dag = landsat8_scene_list )

download_scene_list_gz >> extract_scene_list >> update_scene_list_db
task_id='extract_scene_list',
dag=landsat8_scene_list
)

update_scene_list_db = UpdateSceneList(
task_id='update_scene_list',
pg_dbname=postgresql_credentials['dbname'],
pg_hostname=postgresql_credentials['hostname'],
pg_port=postgresql_credentials['port'],
pg_username=postgresql_credentials['username'],
pg_password=postgresql_credentials['password'],
dag=landsat8_scene_list
)

download_scene_list_gz.set_downstream(extract_scene_list)
extract_scene_list.set_downstream(update_scene_list_db)
Loading