diff --git a/cli_076_surface_temperature_monthly_avg/Dockerfile b/cli_076_surface_temperature_monthly_avg/Dockerfile new file mode 100644 index 00000000..c3b59bf9 --- /dev/null +++ b/cli_076_surface_temperature_monthly_avg/Dockerfile @@ -0,0 +1,29 @@ +FROM python:3.6 +MAINTAINER Taufiq Rashid +#Note this script was originally developed by Taufiq Rashid + +# install core libraries +RUN apt-get update +RUN pip install -U pip + +# install application libraries +RUN apt-get install -y gdal-bin libgdal-dev +RUN apt-get -y install python-gdal +RUN pip install oauth2client==4.1.3 +RUN pip install -e git+https://github.com/resource-watch/eeUtil/tree/fix/upgrade_version_gee#egg=eeUtil +RUN pip install netCDF4==1.5.3 + +# set name +ARG NAME=nrt-script +ENV NAME ${NAME} + +# copy the application folder inside the container +RUN mkdir -p /opt/$NAME/data +WORKDIR /opt/$NAME/ +COPY contents/ . + +RUN useradd -r $NAME +RUN chown -R $NAME:$NAME /opt/$NAME +USER $NAME + +CMD ["python", "main.py"] diff --git a/cli_076_surface_temperature_monthly_avg/README.md b/cli_076_surface_temperature_monthly_avg/README.md new file mode 100644 index 00000000..adea596e --- /dev/null +++ b/cli_076_surface_temperature_monthly_avg/README.md @@ -0,0 +1,13 @@ + +## {Resource Watch Public Title} Dataset Near Real-time Script +This file describes the near real-time script that retrieves and processes the [Berkeley Earth Surface Temperature dataset](http://berkeleyearth.lbl.gov/auto/Global/Gridded/Gridded_README.txt) for [display on Resource Watch]({link to dataset's metadata page on Resource Watch}). + +This dataset was provided by the source as a netcdf file. The data shown on Resource Watch can be found in the 'temperature' variable of the netcdf file. This variable was converted to a set of tif files so that it could be uploaded to Google Earth Engine. The netcdf file contained all the monthly temperature anomaly data since 1850. We are only interested in data going back to 1950. So, only the temperature data since 1950 were processed. To process this data for display on Resource Watch, all the monthly tif files were combined into yearly files to get an annual average temperature anomaly from 1950 till now. + +Please see the [Python script]({link to Python script on Github}) for more details on this processing. + +**Schedule** + +This script is run monthly. The exact time that the script is run to update the dataset can be found in the the `time.cron` file. This time is in Coordinated Universal Time (UTC), expressed in cron format. + +###### Note: This script was originally written by [Taufiq Rashid](https://www.wri.org/profile/taufiq-rashid), and is currently maintained by [Taufiq Rashid](https://www.wri.org/profile/taufiq-rashid). diff --git a/cli_076_surface_temperature_monthly_avg/contents/main.py b/cli_076_surface_temperature_monthly_avg/contents/main.py new file mode 100644 index 00000000..b5fbc403 --- /dev/null +++ b/cli_076_surface_temperature_monthly_avg/contents/main.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 +if __name__ == '__main__': + import src + src.main() \ No newline at end of file diff --git a/cli_076_surface_temperature_monthly_avg/contents/src/__init__.py b/cli_076_surface_temperature_monthly_avg/contents/src/__init__.py new file mode 100644 index 00000000..0dc0e221 --- /dev/null +++ b/cli_076_surface_temperature_monthly_avg/contents/src/__init__.py @@ -0,0 +1,485 @@ +from __future__ import unicode_literals + +import os +import sys +import urllib +import datetime +import logging +import subprocess +import eeUtil +import urllib.request +import requests +import ee +import time +from string import ascii_uppercase +from netCDF4 import Dataset + +# url for Berkeley Earth Surface Temperature data +SOURCE_URL = 'http://berkeleyearth.lbl.gov/auto/Global/Gridded/Land_and_Ocean_LatLong1.nc' + +# subdataset to be converted to tif +# should be of the format 'NETCDF:"filename.nc":variable' +SDS_NAME = 'NETCDF:"{fname}":temperature' + +# nodata value for netcdf +NODATA_VALUE = 9.969209968386869e+36 + +# name of data directory in Docker container +DATA_DIR = 'data' + +# name of collection in GEE where we will upload the final data +EE_COLLECTION = '/projects/resource-watch-gee/cli_076_surface_temperature_monthly_avg' + +# generate generic string that can be formatted to name each variable's asset name +FILENAME = 'cli_076_surface_temperature_monthly_avg_{date}' + +# specify Google Cloud Storage folder name +GS_FOLDER = EE_COLLECTION[1:] + +# do you want to delete everything currently in the GEE collection when you run this script? +CLEAR_COLLECTION_FIRST = False + +# how many assets can be stored in the GEE collection before the oldest ones are deleted? +MAX_ASSETS = 150 + +# date format to use in GEE +DATE_FORMAT = '%Y' + +# Resource Watch dataset API IDs +# Important! Before testing this script: +# Please change these IDs OR comment out the getLayerIDs(DATASET_ID) function in the script below +# Failing to do so will overwrite the last update date on different datasets on Resource Watch +DATASET_ID = '' + +''' +FUNCTIONS FOR ALL DATASETS + +The functions below must go in every near real-time script. +Their format should not need to be changed. +''' + +def lastUpdateDate(dataset, date): + ''' + Given a Resource Watch dataset's API ID and a datetime, + this function will update the dataset's 'last update date' on the API with the given datetime + INPUT dataset: Resource Watch API dataset ID (string) + date: date to set as the 'last update date' for the input dataset (datetime) + ''' + # generate the API url for this dataset + apiUrl = 'http://api.resourcewatch.org/v1/dataset/{0}'.format(dataset) + # create headers to send with the request to update the 'last update date' + headers = { + 'Content-Type': 'application/json', + 'Authorization': os.getenv('apiToken') + } + # create the json data to send in the request + body = { + "dataLastUpdated": date.isoformat() # date should be a string in the format 'YYYY-MM-DDTHH:MM:SS' + } + # send the request + try: + r = requests.patch(url = apiUrl, json = body, headers = headers) + logging.info('[lastUpdated]: SUCCESS, '+ date.isoformat() +' status code '+str(r.status_code)) + return 0 + except Exception as e: + logging.error('[lastUpdated]: '+str(e)) + +''' +FUNCTIONS FOR RASTER DATASETS + +The functions below must go in every near real-time script for a RASTER dataset. +Their format should not need to be changed. +''' + +def getLastUpdate(dataset): + ''' + Given a Resource Watch dataset's API ID, + this function will get the current 'last update date' from the API + and return it as a datetime + INPUT dataset: Resource Watch API dataset ID (string) + RETURN lastUpdateDT: current 'last update date' for the input dataset (datetime) + ''' + # generate the API url for this dataset + apiUrl = 'http://api.resourcewatch.org/v1/dataset/{}'.format(dataset) + # pull the dataset from the API + r = requests.get(apiUrl) + # find the 'last update date' + lastUpdateString=r.json()['data']['attributes']['dataLastUpdated'] + # split this date into two pieces at the seconds decimal so that the datetime module can read it: + # ex: '2020-03-11T00:00:00.000Z' will become '2020-03-11T00:00:00' (nofrag) and '000Z' (frag) + nofrag, frag = lastUpdateString.split('.') + # generate a datetime object + nofrag_dt = datetime.datetime.strptime(nofrag, "%Y-%m-%dT%H:%M:%S") + # add back the microseconds to the datetime + lastUpdateDT = nofrag_dt.replace(microsecond=int(frag[:-1])*1000) + return lastUpdateDT + +def getLayerIDs(dataset): + ''' + Given a Resource Watch dataset's API ID, + this function will return a list of all the layer IDs associated with it + INPUT dataset: Resource Watch API dataset ID (string) + RETURN layerIDs: Resource Watch API layer IDs for the input dataset (list of strings) + ''' + # generate the API url for this dataset - this must include the layers + apiUrl = 'http://api.resourcewatch.org/v1/dataset/{}?includes=layer'.format(dataset) + # pull the dataset from the API + r = requests.get(apiUrl) + #get a list of all the layers + layers = r.json()['data']['attributes']['layer'] + # create an empty list to store the layer IDs + layerIDs =[] + # go through each layer and add its ID to the list + for layer in layers: + # only add layers that have Resource Watch listed as its application + if layer['attributes']['application']==['rw']: + layerIDs.append(layer['id']) + return layerIDs + +def flushTileCache(layer_id): + """ + Given the API ID for a GEE layer on Resource Watch, + this function will clear the layer cache. + If the cache is not cleared, when you view the dataset on Resource Watch, old and new tiles will be mixed together. + INPUT layer_id: Resource Watch API layer ID (string) + """ + # generate the API url for this layer's cache + apiUrl = 'http://api.resourcewatch.org/v1/layer/{}/expire-cache'.format(layer_id) + # create headers to send with the request to clear the cache + headers = { + 'Content-Type': 'application/json', + 'Authorization': os.getenv('apiToken') + } + + # clear the cache for the layer + # sometimetimes this fails, so we will try multiple times, if it does + + # specify that we are on the first try + try_num=1 + tries = 4 + while try_num= start_idx_1950 and prcs_yr not in existing_dates: + # increment the month being processed after each loop + prcs_mnth += 1 + # generate a name for the monthly tif that will be produced from the netcdf + mnth_tif = os.path.join(DATA_DIR, '{}.tif'.format(prcs_yr + '_' + str(prcs_mnth))) + # translate the netcdf into a tif + # make sure the index of the band is correct and matches exactly with the source netcdf + # the band index in the gdal_translate command determines which month of data we are processing + cmd = ['gdal_translate', '-b', str(i), '-q', '-a_nodata', str(NODATA_VALUE), '-a_srs', 'EPSG:4326', sds_path, mnth_tif] + logging.debug('Converting {} to {}'.format(file, mnth_tif)) + # use subprocess to use gdal_translate in the command line from inside python + subprocess.call(cmd) + # add the new tif files to the list of monthly tifs for the year being processed + mnth_tifs.append(mnth_tif) + # if we have finished processing all 12 months or all available months in a year + if prcs_mnth == 12 or i == last_idx: + # find the average for the year from the monthly tifs created so far + yrly_avgd_tif = monthly_avg(prcs_yr, prcs_mnth, mnth_tifs) + # add the averaged tif to the list of yearly tif + yrly_avgd_tifs.append(yrly_avgd_tif) + # reset the list for monthly tifs since we will process a new year in the next loop + mnth_tifs = [] + # reset the month number to 1 as well for next loop + prcs_mnth = 1 + + return yrly_avgd_tifs + +def fetch(): + ''' + Fetch file from source url + RETURN file: file name for netcdfs that have been downloaded (string) + ''' + # construct the filename we want to save the file under locally + file = getFilename() + logging.info('Fetching {}'.format(SOURCE_URL)) + try: + # try to download the data + urllib.request.urlretrieve(SOURCE_URL, file) + logging.info('Successfully retrieved {}'.format(file)) + except: + # if unsuccessful, log that the file was not downloaded + logging.error('Unable to retrieve data from {}'.format(SOURCE_URL)) + + return file + +def processNewData(existing_dates): + ''' + fetch, process, and upload new data + INPUT existing_dates: list of dates we already have in GEE, in the format of the DATE_FORMAT variable (list of strings) + RETURN assets: list of file names for netcdfs that have been downloaded (list of strings) + ''' + # Fetch data from source url + logging.info('Fetching Monthly Land + Ocean temperature data from berkeleyearth') + file = fetch() + + # If we have successfully been able to fetch new data file + if file: + # Convert new file from netcdf to tif files + logging.info('Converting netcdf file to tifs') + tifs = convert(file, existing_dates) + # Get a list of the dates we have to upload from the tif file names + dates = [getDateTimeString(tif) for tif in tifs] + # Get a list of the names we want to use for the assets once we upload the files to GEE + assets = [getAssetName(date) for date in dates] + # Get a list of the datetimes we have to upload from the tif file names + new_datetimes = [getDateTime(date) for date in dates] + logging.info('Uploading files') + # Upload new files (tifs) to GEE + eeUtil.uploadAssets(tifs, assets, GS_FOLDER, new_datetimes, timeout=3000) + + # Delete local files + logging.info('Cleaning local files') + for tif in tifs: + os.remove(tif) + os.remove(file) + + return assets + return [] + +def checkCreateCollection(collection): + ''' + List assests in collection if it exists, else create new collection + INPUT collection: GEE collection to check or create (string) + RETURN list of assets in collection (list of strings) + ''' + # if collection exists, return list of assets in collection + if eeUtil.exists(collection): + return eeUtil.ls(collection) + # if collection does not exist, create it and return an empty list (because no assets are in the collection) + else: + logging.info('{} does not exist, creating'.format(collection)) + eeUtil.createFolder(collection, True, public=True) + return [] + +def deleteExcessAssets(dates, max_assets): + ''' + Delete oldest assets, if more than specified in max_assets variable + INPUT dates: dates for all the assets currently in the GEE collection; dates should be in the format specified + in DATE_FORMAT variable (list of strings) + max_assets: maximum number of assets allowed in the collection (int) + ''' + # sort the list of dates so that the oldest is first + dates.sort() + # if we have more dates of data than allowed, + if len(dates) > max_assets: + # go through each date, starting with the oldest, and delete until we only have the max number of assets left + for date in dates[:-max_assets]: + eeUtil.removeAsset(getAssetName(date)) + +def get_most_recent_date(collection): + ''' + Get most recent data we have assets for + INPUT collection: GEE collection to check dates for (string) + RETURN most_recent_date: most recent date in GEE collection (datetime) + ''' + # get list of assets in collection + existing_assets = checkCreateCollection(collection) + # get a list of strings of dates in the collection + existing_dates = [getDateTimeString(a) for a in existing_assets] + # sort these dates oldest to newest + existing_dates.sort() + # get the most recent date (last in the list) and turn it into a datetime + most_recent_date = datetime.datetime.strptime(existing_dates[-1], DATE_FORMAT) + + return most_recent_date + +def initialize_ee(): + ''' + Initialize ee module + ''' + # get GEE credentials from env file + GEE_JSON = os.environ.get("GEE_JSON") + _CREDENTIAL_FILE = 'credentials.json' + GEE_SERVICE_ACCOUNT = os.environ.get("GEE_SERVICE_ACCOUNT") + with open(_CREDENTIAL_FILE, 'w') as f: + f.write(GEE_JSON) + auth = ee.ServiceAccountCredentials(GEE_SERVICE_ACCOUNT, _CREDENTIAL_FILE) + ee.Initialize(auth) + +def updateResourceWatch(): + ''' + This function should update Resource Watch to reflect the new data. + This may include updating the 'last update date', flushing the tile cache, and updating any dates on layers + ''' + # Get the most recent date from the data in the GEE collection + most_recent_date = get_most_recent_date(EE_COLLECTION) + # Get the current 'last update date' from the dataset on Resource Watch + current_date = getLastUpdate(DATASET_ID) + # If the most recent date from the GEE collection does not match the 'last update date' on the RW API, update it + if current_date != most_recent_date: + logging.info('Updating last update date and flushing cache.') + # Update dataset's last update date on Resource Watch + lastUpdateDate(DATASET_ID, most_recent_date) + # get layer ids and flush tile cache for each + layer_ids = getLayerIDs(DATASET_ID) + for layer_id in layer_ids: + flushTileCache(layer_id) + +def main(): + logging.basicConfig(stream=sys.stderr, level=logging.INFO) + logging.info('STARTING') + + # Initialize eeUtil and ee modules + eeUtil.initJson() + initialize_ee() + + # Clear the GEE collection, if specified above + if CLEAR_COLLECTION_FIRST: + if eeUtil.exists(EE_COLLECTION): + eeUtil.removeAsset(EE_COLLECTION, recursive=True) + + # Check if collection exists, create it if it does not + # If it exists return the list of assets currently in the collection + existing_assets = checkCreateCollection(EE_COLLECTION) + # Get a list of the dates of data we already have in the collection + existing_dates = [getDateTimeString(a) for a in existing_assets] + + # Fetch, process, and upload the new data + new_assets = processNewData(existing_dates) + # Get the dates of the new data we have added + new_dates = [getDateTimeString(a) for a in new_assets] + + logging.info('Previous assets: {}, new: {}, max: {}'.format( + len(existing_dates), len(new_dates), MAX_ASSETS)) + + # Delete excess assets + deleteExcessAssets(existing_dates+new_dates, MAX_ASSETS) + + # Update Resource Watch + updateResourceWatch() + + logging.info('SUCCESS') diff --git a/cli_076_surface_temperature_monthly_avg/start.sh b/cli_076_surface_temperature_monthly_avg/start.sh new file mode 100644 index 00000000..48ce1a7c --- /dev/null +++ b/cli_076_surface_temperature_monthly_avg/start.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +#Change the NAME variable with the name of your script +NAME=cli_076 +LOG=${LOG:-udp://localhost} + +docker build -t $NAME --build-arg NAME=$NAME . +docker run \ + --log-driver=syslog \ + --log-opt syslog-address=$LOG \ + --log-opt tag=$NAME \ + --env-file .env \ + --rm $NAME \ + python main.py + + + # /bin/bash diff --git a/cli_076_surface_temperature_monthly_avg/time.cron b/cli_076_surface_temperature_monthly_avg/time.cron new file mode 100644 index 00000000..75dd622e --- /dev/null +++ b/cli_076_surface_temperature_monthly_avg/time.cron @@ -0,0 +1 @@ +00 5 1 * * \ No newline at end of file