Skip to content

Commit

Permalink
Merge pull request geosolutions-it#163 from maabdelghaffar/feature_11
Browse files Browse the repository at this point in the history
issues 154, 160, 152
  • Loading branch information
randomorder authored Sep 4, 2017
2 parents 6b16b47 + 4fb302f commit d27b171
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 35 deletions.
106 changes: 106 additions & 0 deletions airflow/dags/sentinel2/Sentinel2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging, os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators import DHUSSearchOperator, DHUSDownloadOperator, Sentinel2ThumbnailOperator, Sentinel2MetadataOperator, Sentinel2ProductZipOperator, RSYNCOperator
from sentinel1.secrets import dhus_credentials
from sentinel2.config import sentinel2_config

log = logging.getLogger(__name__)

# Settings
default_args = {
##################################################
# General configuration
#
'start_date': datetime.today() - timedelta(days=1),
'owner': 'airflow',
'depends_on_past': False,
'provide_context': True,
'email': ['airflow@evoodas.dlr.de'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'max_threads': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
#
}

# DAG definition
dag = DAG('Sentinel2', description='DAG for searching, filtering and downloading Sentinel-2 data from DHUS server',
default_args = default_args,
dagrun_timeout = timedelta(hours=10),
schedule_interval = '0 * * * *',
catchup = False)

# Sentinel2- Search Task Operator
search_task = DHUSSearchOperator(task_id = 'dhus_search_task',
dhus_url = 'https://scihub.copernicus.eu/dhus',
dhus_user = dhus_credentials['username'],
dhus_pass = dhus_credentials['password'],
geojson_bbox = sentinel2_config['geojson_bbox'],
startdate = sentinel2_config['startdate'],
enddate = sentinel2_config['enddate'],
platformname = sentinel2_config['platformname'],
filename = sentinel2_config['filename'],
dag = dag)

# Sentinel-2 Download Task Operator
download_task = DHUSDownloadOperator(task_id = 'dhus_download_task',
dhus_url = 'https://scihub.copernicus.eu/dhus',
dhus_user = dhus_credentials['username'],
dhus_pass = dhus_credentials['password'],
download_max = sentinel2_config['download_max'],
download_dir = sentinel2_config['download_dir'],
dag = dag)

# Archive Sentinel-2 RSYNC Task Operator
archive_task = RSYNCOperator(task_id="sentinel2_upload_granules",
host = "localhost",
remote_usr = "moataz",
ssh_key_file = "/usr/local/airflow/id_rsa",
remote_dir = sentinel2_config['granules_upload_dir'],
xk_pull_dag_id = 'Sentinel2',
xk_pull_task_id = 'dhus_download_task',
xk_pull_key = 'downloaded_products_paths',
dag=dag)


# Sentinel-2 Create thumbnail Operator
thumbnail_task = Sentinel2ThumbnailOperator(task_id = 'dhus_thumbnail_task',
thumb_size_x = '64',
thumb_size_y = '64',
dag=dag)

# Sentinel-2 Metadata Operator
metadata_task = Sentinel2MetadataOperator(task_id = 'dhus_metadata_task',
bands_res = sentinel2_config['bands_res'],
remote_dir = sentinel2_config['granules_upload_dir'],
dag = dag)

# Archive Sentinel-2 RSYNC with .prj and .wld files Task Operator
archive_wldprj_task = RSYNCOperator(task_id="sentinel2_upload_granules_with_wldprj",
host = "localhost",
remote_usr = "moataz",
ssh_key_file = "/usr/local/airflow/id_rsa",
remote_dir = sentinel2_config['granules_upload_dir'],
xk_pull_dag_id = 'Sentinel2',
xk_pull_task_id = 'dhus_metadata_task',
xk_pull_key = 'downloaded_products_with_wldprj',
dag=dag)

# Sentinel-2 Product.zip Operator.
# The following variables are just pointing to placeholders until we implement the real files.
base_dir = "/usr/local/airflow/metadata-ingestion/templates"
placeholders_list = [os.path.join(base_dir,"metadata.xml"), os.path.join(base_dir,"owsLinks.json"), os.path.join(base_dir,"product_abstract.html")]
generated_files_list = ['product/product.json','product/granules.json','product/thumbnail.jpeg']

product_zip_task = Sentinel2ProductZipOperator(task_id = 'product_zip_task',
target_dir = sentinel2_config["product_zip_target_dir"],
generated_files = generated_files_list,
placeholders = placeholders_list,
dag = dag)

search_task >> download_task >> archive_task >> thumbnail_task >> metadata_task >> archive_wldprj_task >> product_zip_task
6 changes: 5 additions & 1 deletion airflow/dags/sentinel2/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
'platformname': 'Sentinel-2',
'filename': 'S2A_MSIL1C*',
'download_dir': os.path.join(download_base_dir, "Sentinel-2"),
'granules_upload_dir': "/var/data/download/uploads",
'rsync_hostname': 'geoserver.cloudsdi.geo-solutions.it',
'rsync_username': 'ec2-user',
'rsync_ssh_key' : '/usr/local/airflow/id_rsa'
'granules_upload_dir': "/var/data/sentinel2/uploads",
'bands_res':{'10':("B02","B03","B04","B08"),'20':("B05","B06","B07","B8A","B11","B12"),'60':("B01","B09","B10")},
'product_zip_target_dir':"/var/data/download/Sentinel-2",
'rsync_hostname': 'geoserver.cloudsdi.geo-solutions.it',
'rsync_username': 'ec2-user',
'rsync_ssh_key' : '/usr/local/airflow/id_rsa'
Expand Down
2 changes: 1 addition & 1 deletion airflow/plugins/dhus_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(self,
dhus_user,
dhus_pass,
download_dir,
download_timeout=timedelta(hours=1),
download_timeout=timedelta(hours=5),
download_max=100,
product_ids=None,
*args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion airflow/plugins/evo-odas_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from zipfile import ZipFile
import config.xcom_keys as xk
import xcom_keys as xk
from sentinel1.utils.ssat1_metadata import create_procuct_zip

log = logging.getLogger(__name__)
Expand Down
75 changes: 43 additions & 32 deletions airflow/plugins/sentinel2_metadata_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ def execute(self, context):
context['task_instance'].xcom_push(key='ids', value=ids)
#return os.path.join(self.downloaded_products,"thumbnail.jpeg")


'''
This class is creating the product.zip contents and passing the absolute path per every file so that the Sentinel2ProductZipOperator can generate the product.zip file.
Also, this class is creating the .wld and .prj files which are required by Geoserver in order to be publish the granules successfully.
'''
class Sentinel2MetadataOperator(BaseOperator):
@apply_defaults
def __init__(self, bands_res, *args, **kwargs):
def __init__(self, bands_res, remote_dir, *args, **kwargs):
self.bands_res = bands_res
self.remote_dir = remote_dir
super(Sentinel2MetadataOperator, self).__init__(*args, **kwargs)

def execute(self, context):
Expand Down Expand Up @@ -95,22 +99,26 @@ def execute(self, context):
"eop:illuminationAzimuthAngle": None,
"eop:illuminationZenithAngle": None,
"eop:illuminationElevationAngle": None, "eop:resolution": None}}
features_list = []
granule_counter = 1
for i in range(1,13):
for i in self.bands_res.values():
features_list = []
granule_counter = 1
for granule in s2_product.granules:
coords = []
x = [[[m.replace(" ", ",")] for m in str(granule.footprint).replace(", ", ",").partition('((')[-1].rpartition('))')[0].split(",")]]
for item in x[0]:
[x, y] = item[0].split(",")
coords.append([float(x), float(y)])
features_list.append({"type": "Feature", "geometry": { "type": "Polygon", "coordinates": [coords]},
"properties": {
"location":granule.band_path(granule_counter)
},
"id": "GRANULE.{}".format(granule_counter)
})
granule_counter+=1
zipped_product = zipfile.ZipFile(product)
for file_name in zipped_product.namelist():
if file_name.endswith('.jp2') and not file_name.endswith('PVI.jp2'):
log.info("file_name")
log.info(file_name)
features_list.append({"type": "Feature", "geometry": { "type": "Polygon", "coordinates": [coords]},\
"properties": {\
"location":os.path.join(self.remote_dir, granule.granule_path.rsplit("/")[-1], "IMG_DATA", file_name.rsplit("/")[-1])\
},\
"id": "GRANULE.{}".format(granule_counter)})
granule_counter+=1
final_granules_dict = {"type": "FeatureCollection", "features": features_list}
with open('product.json', 'w') as product_outfile:
json.dump(final_metadata_dict, product_outfile)
Expand Down Expand Up @@ -140,35 +148,38 @@ def execute(self, context):
root = tree.getroot()
geometric_info = root.find(root.tag.split('}', 1)[0]+"}Geometric_Info")
tile_geocoding = geometric_info.find("Tile_Geocoding")
tfw_files = []
wld_files = []
prj_files = []
for jp2_file in jp2_files_paths:
tfw_name = os.path.splitext(jp2_file)[0]
gdalinfo_cmd = "gdalinfo {} > {}".format(jp2_file, tfw_name+".prj")
gdalinfo_BO = BashOperator(task_id="bash_operator_gdalinfo_{}".format(tfw_name[-3:]), bash_command = gdalinfo_cmd)
wld_name = os.path.splitext(jp2_file)[0]
gdalinfo_cmd = "gdalinfo {} > {}".format(jp2_file, wld_name+".prj")
gdalinfo_BO = BashOperator(task_id="bash_operator_gdalinfo_{}".format(wld_name[-3:]), bash_command = gdalinfo_cmd)
gdalinfo_BO.execute(context)
sed_cmd = "sed -i -e '1,4d;29,37d' {}".format(tfw_name+".prj")
sed_BO = BashOperator(task_id="bash_operator_sed_{}".format(tfw_name[-3:]), bash_command = sed_cmd)
sed_cmd = "sed -i -e '1,4d;29,37d' {}".format(wld_name+".prj")
sed_BO = BashOperator(task_id="bash_operator_sed_{}".format(wld_name[-3:]), bash_command = sed_cmd)
sed_BO.execute(context)
prj_files.append(tfw_name+".prj")
tfw_file = open(tfw_name+".tfw","w")
tfw_files.append(tfw_name+".tfw")
prj_files.append(wld_name+".prj")
wld_file = open(wld_name+".wld","w")
wld_files.append(wld_name+".wld")
for key,value in self.bands_res.items():
if tfw_name[-3:] in value:
if wld_name[-3:] in value:
element = key
geo_position = tile_geocoding.find('.//Geoposition[@resolution="{}"]'.format(element))
tfw_file.write(geo_position.find("XDIM").text + "\n" + "0" + "\n" + "0" +"\n")
tfw_file.write(geo_position.find("YDIM").text + "\n")
tfw_file.write(geo_position.find("ULX").text + "\n")
tfw_file.write(geo_position.find("ULY").text + "\n")
files_to_archive.extend(prj_files + tfw_files + jp2_files_paths)
zip_with_prj_tfw = zipfile.ZipFile(archive_line.rsplit('.',1)[0]+"__.zip","a")
for item in files_to_archive:
zip_with_prj_tfw.write(item, item.rsplit("/",1)[1])
self.custom_archived.append(archive_line.rsplit('.',1)[0]+"__.zip")
wld_file.write(geo_position.find("XDIM").text + "\n" + "0" + "\n" + "0" +"\n")
wld_file.write(geo_position.find("YDIM").text + "\n")
wld_file.write(geo_position.find("ULX").text + "\n")
wld_file.write(geo_position.find("ULY").text + "\n")
files_to_archive.extend(prj_files + wld_files + jp2_files_paths)
parent_dir = os.path.dirname(jp2_files_paths[0])
self.custom_archived.append(os.path.dirname(parent_dir))
context['task_instance'].xcom_push(key='downloaded_products', value=self.downloaded_products)
context['task_instance'].xcom_push(key='downloaded_products_with_tfwprj', value=' '.join(self.custom_archived))
context['task_instance'].xcom_push(key='downloaded_products_with_wldprj', value=' '.join(self.custom_archived))


'''
This class is receiving the meta data files paths from the Sentinel2MetadataOperator then creates the product.zip
Later, this class will pass the path of the created product.zip to the next task to publish on Geoserver.
'''
class Sentinel2ProductZipOperator(BaseOperator):

@apply_defaults
Expand Down

0 comments on commit d27b171

Please sign in to comment.