diff --git a/config.ini.sample b/config.ini.sample index 321bd35..f0c6f21 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -5,19 +5,49 @@ host = localhost port = 5432 # you can write it if necessary password +# table names +stations = stations +raw_stations = raw_stations +timeseries = timeseries +daily_transaction = daily_transaction +clustering = cluster +centroids = centroid [lyon] schema = lyon table = timeserie -daily_transaction = daily_transaction -clustering = clustered_stations -centroids = centroids +srid = 4326 +typename = pvo_patrimoine_voirie.pvostationvelov +# station geoloc variable names +feature_id = idstation +feature_name = nom +feature_address = adresse1 +feature_city = commune +feature_nb_stations = nbbornette +# station availability variables names +feature_avl_id = number +feature_timestamp = last_update +feature_avl_stands = available_bike_stands +feature_avl_bikes = available_bikes +feature_status = status [bordeaux] schema = bordeaux table = timeserie -daily_transaction = daily_transaction -clustering = clustered_stations -centroids = centroids +srid = 2154 +typename = TB_STVEL_P +# station geoloc variable names +feature_id = numstat +feature_name = nom +feature_address = adresse +feature_city = commune +feature_nb_stations = nbsuppor +# station availability variables names +feature_avl_id = ident +feature_timestamp = heure +feature_avl_stands = nbplaces +feature_avl_bikes = nbvelos +feature_status = etat + # API Key -key = QHUHHRI7HD +key = CHANGME diff --git a/jitenshea/tasks/bordeaux.py b/jitenshea/tasks/bordeaux.py deleted file mode 100644 index 5798fa1..0000000 --- a/jitenshea/tasks/bordeaux.py +++ /dev/null @@ -1,553 +0,0 @@ -# coding: utf-8 - -"""Luigi tasks to retrieve and process data for Bordeaux. - -Note: the 'ident' field which should be used for an unique id for each station -is different when you load the layer TB_STVEL_P and CI_VCUB_P. - - - TB_STVEL_P: bicycle-station geoloc - - CI_VCUB_P: bicycle-station real-time occupation data - -So, if you want to merge these data, use the 'numstat' from TB_STVEL_P and -'ident' from CI_VCUB_P. - -See also http://data.bordeaux-metropole.fr/dicopub/#/dico#CI_VCUB_P -""" - - -import os -import zipfile -from datetime import datetime as dt -from datetime import date, timedelta - -from lxml import etree - -import sh - -import requests - -import numpy as np -import pandas as pd -from sklearn.cluster import KMeans - -import luigi -from luigi.contrib.postgres import CopyToTable, PostgresQuery -from luigi.format import UTF8, MixedUnicodeBytes - -from jitenshea import config -from jitenshea.iodb import db, psql_args, shp2pgsql_args -from jitenshea.stats import compute_clusters, train_prediction_model - -# To get shapefile (in a zip). -BORDEAUX_STATION_URL = 'https://data.bordeaux-metropole.fr/files.php?gid=43&format=2' -# Same data as the shapefile but in XML -BORDEAUX_STATION_URL_XML = 'https://data.bordeaux-metropole.fr/wfs?service=wfs&request=GetFeature&version=2.0.0&key={key}&typename=TB_STVEL_P&SRSNAME=EPSG:3945' -BORDEAUX_WFS = 'https://data.bordeaux-metropole.fr/wfs?service=wfs&request=GetFeature&version=2.0.0&key={key}&typename=CI_VCUB_P' -DATADIR = 'datarepo/bordeaux' - - -def yesterday(): - """Return the day before today - """ - return date.today() - timedelta(1) - -def extract_xml_feature(node, namespace='{http://data.bordeaux-metropole.fr/wfs}'): - """Return some attributes from XML/GML file for one specific station - """ - get = lambda x: node.findtext(namespace + x) - return [("gid", int(get("GID"))), - ("ident", int(get("IDENT"))), - ("type", get("TYPE")), - ("nom", get("NOM")), - ("etat", get("ETAT")), - ("nbplaces", int(get('NBPLACES'))), - ("nbvelos", int(get("NBVELOS"))), - ("heure", pd.Timestamp(get("HEURE")))] - -def collect_xml_station(fobj): - """Get bicycle stations from XML before inserted them into a Postgres table - - Also get the Geometry Point(3945) - """ - data = [] - tree = etree.parse(fobj) - wfs_ns = '{http://www.opengis.net/wfs/2.0}' - bm_ns = '{http://data.bordeaux-metropole.fr/wfs}' - elements = (node.find(bm_ns + 'TB_STVEL_P') for node in tree.findall(wfs_ns + 'member')) - for element in elements: - # TODO Get the Geom Point - data.append((element.findtext(bm_ns + "GID"), - element.findtext(bm_ns + "NUMSTAT"), - element.findtext(bm_ns + "IDENT"), - element.findtext(bm_ns + "ADRESSE"), - element.findtext(bm_ns + "COMMUNE"), - # element.findtext(bm_ns + "DATESERV"), - element.findtext(bm_ns + "LIGNCORR"), - element.findtext(bm_ns + "NBSUPPOR"), - element.findtext(bm_ns + "NOM"), - element.findtext(bm_ns + "TARIF"), - element.findtext(bm_ns + "TERMBANC"), - element.findtext(bm_ns + "TYPEA"), - element.findtext(bm_ns + "GEOM"), - element.findtext(bm_ns + "CDATE"), - element.findtext(bm_ns + "MDATE"))) - return data - - -class ShapefilesTask(luigi.Task): - """Task to download a zip files which includes the shapefile - """ - path = os.path.join(DATADIR , 'vcub.zip') - srid = 4326 - - def output(self): - return luigi.LocalTarget(self.path, format=MixedUnicodeBytes) - - def run(self): - with self.output().open('w') as fobj: - resp = requests.get(BORDEAUX_STATION_URL) - resp.raise_for_status() - fobj.write(resp.content) - - -class UnzipTask(luigi.Task): - """Task dedicated to unzip file - - To get trace that the task has be done, the task creates a text file with - the same same of the input zip file with the '.done' suffix. This generated - file contains the path of the zipfile and all extracted files. - """ - path = os.path.join(DATADIR , 'vcub.zip') - - def requires(self): - return ShapefilesTask() - - def output(self): - filepath = os.path.join(DATADIR, "unzip-" + "vcub" + '.done') - return luigi.LocalTarget(filepath) - - def run(self): - with self.output().open('w') as fobj: - fobj.write("unzip {} at {}\n".format("vcub", dt.now())) - zip_ref = zipfile.ZipFile(self.path) - fobj.write("\n".join(elt.filename for elt in zip_ref.filelist)) - fobj.write("\n") - zip_ref.extractall(DATADIR) - zip_ref.close() - - -class CreateSchema(PostgresQuery): - host = config['database']['host'] - database = config['database']['dbname'] - user = config['database']['user'] - password = config['database'].get('password') - schema = luigi.Parameter() - table = luigi.Parameter(default='create_schema') - query = "CREATE SCHEMA IF NOT EXISTS {schema};" - - def run(self): - connection = self.output().connect() - cursor = connection.cursor() - sql = self.query.format(schema=self.schema) - cursor.execute(sql) - # Update marker table - self.output().touch(connection) - # commit and close connection - connection.commit() - connection.close() - - -class ShapefileIntoDB(luigi.Task): - """Dump a shapefile into a table - """ - schema = luigi.Parameter(default=config['bordeaux']["schema"]) - projection = luigi.Parameter(default='2154') - fname = "TB_STVEL_P" - table = "vcub_station" - - def requires(self): - return {"zip": UnzipTask(), - "schema": CreateSchema(schema=self.schema)} - - def output(self): - filepath = '_'.join(['task', 'shp2pgsql', "vcub", "to", - self.schema, self.table, 'proj', self.projection]) - return luigi.LocalTarget(os.path.join(DATADIR, filepath + '.txt')) - - def run(self): - table = self.schema + '.' + self.table - dirname = os.path.abspath(os.path.dirname(self.input()['zip'].path)) - shpfile = os.path.join(dirname, self.fname + '.shp') - shp2args = shp2pgsql_args(self.projection, shpfile, table) - psqlargs = psql_args() - # check if the schema exist. raise if this is not the case - with self.output().open('w') as fobj: - sh.psql(sh.shp2pgsql(shp2args), psqlargs) - fobj.write("shp2pgsql {} at {}\n".format(shpfile, dt.now())) - fobj.write("Create {schema}.{table}\n" - .format(schema=self.schema, table=self.table)) - -class BicycleStationGeoXML(luigi.Task): - """The shapefile from the file.php service seems outdated. - - Download the XML file before to dumpt it into the Database - """ - filename = "vcub.xml" - - def output(self): - return luigi.LocalTarget(os.path.join(DATADIR, self.filename), format=UTF8) - - def run(self): - resp = requests.get(BORDEAUX_STATION_URL_XML.format(key=config['bordeaux']['key'])) - with self.output().open('w') as fobj: - # Note: I hate ISO-8859-1!! - fobj.write(resp.content.decode('latin1') - .encode('utf-8') - .decode('utf-8') - .replace("ISO-8859-1", "UTF-8")) - - -class BicycleStationAvailability(luigi.Task): - """Get in real-time the shared cycle stations avaibility in a XML format. - - Get data every 5 minutes - """ - timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) - path = os.path.join(DATADIR, '{year}', '{month:02d}', '{day:02d}', '{ts}.xml') - - def requires(self): - return ShapefileIntoDB() - - def output(self): - triple = lambda x: (x.year, x.month, x.day) - year, month, day = triple(self.timestamp) - ts = self.timestamp.strftime("%HH%M") # 16H35 - return luigi.LocalTarget(self.path.format(year=year, month=month, day=day, ts=ts), format=UTF8) - - def run(self): - with self.output().open('w') as fobj: - resp = requests.get(BORDEAUX_WFS.format(key=config['bordeaux']['key'])) - fobj.write(resp.content.decode('ISO-8859-1').encode('utf-8').decode('utf-8')) - # data = pd.read_csv(BORDEAUX_WFS.format(key=config['bordeaux']['key'])) - # data.columns = [x.lower() for x in data.columns] - # data['heure'] = data['heure'].apply(lambda x: pd.Timestamp(str(x))) - # data.to_csv(fobj, index=False) - - -class BicycleStationXMLtoCSV(luigi.Task): - """Turn real-time bicycle station XML/WFS data file to a CSV. - """ - timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) - path = os.path.join(DATADIR, '{year}', '{month:02d}', '{day:02d}', '{ts}.csv') - keepcols = ["gid", "ident", "type", "nom", "etat", "nbplaces", "nbvelos", "heure"] - - def output(self): - triple = lambda x: (x.year, x.month, x.day) - year, month, day = triple(self.timestamp) - ts = self.timestamp.strftime("%HH%M") # 16H35 - return luigi.LocalTarget(self.path.format(year=year, month=month, day=day, ts=ts), format=UTF8) - - def requires(self): - return BicycleStationAvailability(self.timestamp) - - def run(self): - with self.input().open() as fobj: - tree = etree.parse(fobj) - # Two XML namespaces - wfs_ns = '{http://www.opengis.net/wfs/2.0}' - bm_ns = '{http://data.bordeaux-metropole.fr/wfs}' - elements = (node.find(bm_ns + 'CI_VCUB_P') for node in tree.findall(wfs_ns + 'member')) - data = [] - for node in elements: - data.append(extract_xml_feature(node)) - df = pd.DataFrame([dict(x) for x in data]) - df = df.sort_values(by="ident") - with self.output().open('w') as fobj: - df[self.keepcols].to_csv(fobj, index=False) - - -class BicycleStationDatabase(CopyToTable): - """Insert VCUB stations data into a PostgreSQL table - """ - timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['bordeaux']['schema'], - tablename=config['bordeaux']['table']) - columns = [('gid', 'INT'), - ('ident', 'INT'), - ('type', 'VARCHAR(5)'), - ('name', 'VARCHAR(200)'), - ('state', 'VARCHAR(12)'), - ('available_stand', 'INT'), - ('available_bike', 'INT'), - ('ts', 'TIMESTAMP')] - - def rows(self): - """overload the rows method to skip the first line (header) - """ - with self.input().open('r') as fobj: - df = pd.read_csv(fobj) - for idx, row in df.iterrows(): - yield row.values - - def requires(self): - return BicycleStationXMLtoCSV(self.timestamp) - - -class AggregateTransaction(luigi.Task): - """Aggregate bicycle-share transactions data into a CSV file. - """ - date = luigi.DateParameter(default=yesterday()) - path = os.path.join(DATADIR, '{year}', '{month:02d}', '{day:02d}', 'transactions.csv') - - def output(self): - triple = lambda x: (x.year, x.month, x.day) - year, month, day = triple(self.date) - return luigi.LocalTarget(self.path.format(year=year, month=month, day=day), format=UTF8) - - def run(self): - query = """SELECT DISTINCT ident, type, state, available_bike, ts - FROM {schema}.{tablename} - WHERE ts >= %(start)s AND ts < %(stop)s - ORDER BY ident,ts;""".format(schema=config["bordeaux"]["schema"], - tablename=config['bordeaux']['table']) - eng = db() - df = pd.io.sql.read_sql_query(query, eng, params={"start": self.date, - "stop": self.date + timedelta(1)}) - transactions = (df.query("state == 'CONNECTEE'") - .groupby("ident")['available_bike'] - .apply(lambda s: s.diff().abs().sum()) - .dropna() - .to_frame() - .reset_index()) - transactions = transactions.rename_axis({"available_bike": "transactions"}, axis=1) - with self.output().open('w') as fobj: - transactions.to_csv(fobj, index=False) - - -class AggregateVCUBTransactionIntoDB(CopyToTable): - """Aggregate bicycle-share transactions data into the database. - """ - date = luigi.DateParameter(default=yesterday()) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['bordeaux']['schema'], - tablename=config['bordeaux']['daily_transaction']) - columns = [('id', 'INT'), - ('number', 'FLOAT'), - ('date', 'DATE')] - - def rows(self): - """overload the rows method to skip the first line (header) and add date value - """ - with self.input().open('r') as fobj: - next(fobj) - for line in fobj: - yield line.strip('\n').split(',') + [self.date] - - def requires(self): - return AggregateTransaction(self.date) - -class BordeauxComputeClusters(luigi.Task): - """Compute clusters corresponding to bike availability in bordeaux stations - between a `start` and an `end` date - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - def outputpath(self): - fname = "kmeans-{}-to-{}.h5".format(self.start, self.stop) - return os.path.join(DATADIR, 'clustering', fname) - - def output(self): - return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes) - - def run(self): - query = ("SELECT gid, ts, available_bike " - "FROM {}.{} " - "WHERE ts >= %(start)s " - "AND ts < %(stop)s;" - "").format(config['bordeaux']['schema'], config['bordeaux']['table']) - eng = db() - df = pd.io.sql.read_sql_query(query, eng, - params={"start": self.start, - "stop": self.stop}) - df.columns = ["station_id", "ts", "nb_bikes"] - clusters = compute_clusters(df) - self.output().makedirs() - path = self.output().path - clusters['labels'].to_hdf(path, '/clusters') - clusters['centroids'].to_hdf(path, '/centroids') - -class BordeauxStoreClustersToDatabase(CopyToTable): - """Read the cluster labels from `DATADIR/bordeaux-clustering.h5` file and store - them into `clustered_stations` - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['bordeaux']['schema'], - tablename=config['bordeaux']['clustering']) - columns = [('station_id', 'INT'), - ('start', 'DATE'), - ('stop', 'DATE'), - ('cluster_id', 'INT')] - - def rows(self): - inputpath = self.input().path - clusters = pd.read_hdf(inputpath, 'clusters') - for _, row in clusters.iterrows(): - modified_row = list(row.values) - modified_row.insert(1, self.stop) - modified_row.insert(1, self.start) - yield modified_row - - def requires(self): - return BordeauxComputeClusters(self.start, self.stop) - - def create_table(self, connection): - if len(self.columns[0]) == 1: - # only names of columns specified, no types - raise NotImplementedError(("create_table() not implemented for %r " - "and columns types not specified") - % self.table) - elif len(self.columns[0]) == 2: - # if columns is specified as (name, type) tuples - coldefs = ','.join('{name} {type}'.format(name=name, type=type) - for name, type in self.columns) - query = ("CREATE TABLE {table} ({coldefs}, " - "PRIMARY KEY (station_id, start, stop));" - "").format(table=self.table, coldefs=coldefs) - connection.cursor().execute(query) - -class BordeauxStoreCentroidsToDatabase(CopyToTable): - """Read the cluster centroids from `DATADIR/bordeaux-clustering.h5` file and - store them into `centroids` - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['bordeaux']['schema'], - tablename=config['bordeaux']['centroids']) - first_columns = [('cluster_id', 'INT'), ('start', 'DATE'), ('stop', 'DATE')] - - @property - def columns(self): - if len(self.first_columns) == 3: - self.first_columns.extend([('h'+str(i), 'DOUBLE PRECISION') - for i in range(24)]) - return self.first_columns - - def rows(self): - inputpath = self.input().path - clusters = pd.read_hdf(inputpath, 'centroids') - for _, row in clusters.iterrows(): - modified_row = list(row.values) - modified_row[0] = int(modified_row[0]) - modified_row.insert(1, self.stop) - modified_row.insert(1, self.start) - yield modified_row - - def requires(self): - return BordeauxComputeClusters(self.start, self.stop) - - def create_table(self, connection): - if len(self.columns[0]) == 1: - # only names of columns specified, no types - raise NotImplementedError(("create_table() not implemented for %r " - "and columns types not specified") - % self.table) - elif len(self.columns[0]) == 2: - # if columns is specified as (name, type) tuples - coldefs = ','.join('{name} {type}'.format(name=name, type=type) - for name, type in self.columns) - query = ("CREATE TABLE {table} ({coldefs}, " - "PRIMARY KEY (cluster_id, start, stop));" - "").format(table=self.table, coldefs=coldefs) - connection.cursor().execute(query) - -class BordeauxClustering(luigi.Task): - """Clustering master task - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - def requires(self): - yield BordeauxStoreClustersToDatabase(self.start, self.stop) - yield BordeauxStoreCentroidsToDatabase(self.start, self.stop) - - -class BordeauxTrainXGBoost(luigi.Task): - """Train a XGBoost model between `start` and `stop` dates to predict bike - availability at each station - - Attributes - ---------- - start : luigi.DateParameter - Training start date - stop : luigi.DataParameter - Training stop date upper bound (actually the end date is computed with - `validation`) - validation : luigi.DateMinuteParameter - Date that bounds the training set and the validation set during the - XGBoost model training - frequency : DateOffset, timedelta or str - Indicates the prediction frequency - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - validation = luigi.DateMinuteParameter(default=dt.now() - timedelta(hours=1)) - frequency = luigi.Parameter(default="30T") - - def outputpath(self): - fname = "{}-to-{}-at-{}-freq-{}.model".format(self.start, self.stop, - self.validation.isoformat(), - self.frequency) - return os.path.join(DATADIR, 'xgboost-model', fname) - - def output(self): - return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes) - - def run(self): - query = ("SELECT DISTINCT ident AS station_id, ts, " - "available_bike AS nb_bikes, " - "available_stand AS nb_stands, " - "available_bike::float / (available_bike::float " - "+ available_stand::float) AS probability " - "FROM {}.{} " - "WHERE ts >= %(start)s " - "AND ts < %(stop)s " - "AND (available_bike > 0 OR available_stand > 0) " - "AND state = 'CONNECTEE'" - "ORDER BY station_id, ts" - ";").format(config['bordeaux']['schema'], - config['bordeaux']['table']) - eng = db() - df = pd.io.sql.read_sql_query(query, eng, - params={"start": self.start, - "stop": self.stop}) - if df.empty: - raise Exception("There is not any data to process in the DataFrame. " - + "Please check the dates.") - prediction_model = train_prediction_model(df, self.validation, self.frequency) - self.output().makedirs() - prediction_model.save_model(self.output().path) diff --git a/jitenshea/tasks/city.py b/jitenshea/tasks/city.py new file mode 100644 index 0000000..96873d1 --- /dev/null +++ b/jitenshea/tasks/city.py @@ -0,0 +1,676 @@ +"""Luigi tasks to retrieve and process bike data + +Supported cities: + +* Bordeaux + - stations URL: https://data.bordeaux-metropole.fr/files.php?gid=43&format=2 + - real-time bike availability URL: https://data.bordeaux-metropole.fr/wfs?service=wfs&request=GetFeature&version=2.0.0&typename=CI_VCUB_P + +* Lyon + - stations URL: https://download.data.grandlyon.com/wfs/grandlyon?service=wfs&request=GetFeature&version=2.0.0&SRSNAME=EPSG:4326&outputFormat=SHAPEZIP&typename=pvo_patrimoine_voirie.pvostationvelov + - real-time bike availability URL: https://download.data.grandlyon.com/ws/rdata/jcd_jcdecaux.jcdvelov/all.json +""" + +import os +import json +import zipfile +from datetime import datetime as dt +from datetime import date, timedelta + +from lxml import etree + +import pandas as pd + +import sh + +import requests + +import luigi +from luigi.contrib.postgres import CopyToTable, PostgresQuery +from luigi.format import UTF8, MixedUnicodeBytes + +from jitenshea import config +from jitenshea.iodb import db, psql_args, shp2pgsql_args +from jitenshea.stats import compute_clusters, train_prediction_model + + +_HERE = os.path.abspath(os.path.dirname(__file__)) +DATADIR = 'datarepo' + +BORDEAUX_STATION_URL = 'https://data.bordeaux-metropole.fr/files.php?gid=43&format=2' +BORDEAUX_BIKEAVAILABILITY_URL = 'https://data.bordeaux-metropole.fr/wfs?service=wfs&request=GetFeature&version=2.0.0&key={key}&typename=CI_VCUB_P' + +LYON_STATION_URL = 'https://download.data.grandlyon.com/wfs/grandlyon?service=wfs&request=GetFeature&version=2.0.0&SRSNAME=EPSG:4326&outputFormat=SHAPEZIP&typename=pvo_patrimoine_voirie.pvostationvelov' +LYON_BIKEAVAILABILITY_URL = 'https://download.data.grandlyon.com/ws/rdata/jcd_jcdecaux.jcdvelov/all.json' + + +def yesterday(): + """Return the day before today + """ + return date.today() - timedelta(1) + + +def extract_xml_feature(node, namespace='{http://data.bordeaux-metropole.fr/wfs}'): + """Return some attributes from XML/GML file for one specific station + """ + get = lambda x: node.findtext(namespace + x) + return [("gid", int(get("GID"))), + ("ident", int(get("IDENT"))), + ("type", get("TYPE")), + ("nom", get("NOM")), + ("etat", get("ETAT")), + ("nbplaces", int(get('NBPLACES'))), + ("nbvelos", int(get("NBVELOS"))), + ("heure", pd.Timestamp(get("HEURE")))] + + +class CreateSchema(PostgresQuery): + host = config['database']['host'] + database = config['database']['dbname'] + user = config['database']['user'] + password = config['database'].get('password') + schema = luigi.Parameter() + table = luigi.Parameter(default='create_schema') + query = "CREATE SCHEMA IF NOT EXISTS {schema};" + + def run(self): + connection = self.output().connect() + cursor = connection.cursor() + sql = self.query.format(schema=self.schema) + cursor.execute(sql) + # Update marker table + self.output().touch(connection) + # commit and close connection + connection.commit() + connection.close() + + +class ShapefilesTask(luigi.Task): + """Task to download a zip files which includes the shapefile + + Need the source: rdata or grandlyon and the layer name (i.e. typename). + """ + city = luigi.Parameter() + + @property + def path(self): + return os.path.join(DATADIR, self.city, + '{}-stations.zip'.format(self.city)) + + @property + def url(self): + if self.city == 'bordeaux': + return BORDEAUX_STATION_URL + elif self.city == 'lyon': + return LYON_STATION_URL + else: + raise ValueError(("{} is an unknown city.".format(self.city))) + + def output(self): + return luigi.LocalTarget(self.path, format=MixedUnicodeBytes) + + def run(self): + with self.output().open('w') as fobj: + resp = requests.get(self.url) + resp.raise_for_status() + fobj.write(resp.content) + + +class UnzipTask(luigi.Task): + """Task dedicated to unzip file + + To get trace that the task has be done, the task creates a text file with + the same same of the input zip file with the '.done' suffix. This generated + file contains the path of the zipfile and all extracted files. + """ + city = luigi.Parameter() + + @property + def path(self): + return os.path.join(DATADIR, self.city, + '{}-stations.zip'.format(self.city)) + + def requires(self): + return ShapefilesTask(self.city) + + def output(self): + filepath = os.path.join(DATADIR, self.city, "unzip.done") + return luigi.LocalTarget(filepath) + + def run(self): + with self.output().open('w') as fobj: + fobj.write("unzip {} stations at {}\n".format(self.city, dt.now())) + zip_ref = zipfile.ZipFile(self.path) + fobj.write("\n".join(elt.filename for elt in zip_ref.filelist)) + fobj.write("\n") + zip_ref.extractall(os.path.dirname(self.input().path)) + zip_ref.close() + + +class ShapefileIntoDB(luigi.Task): + """Dump a shapefile into a table + """ + city = luigi.Parameter() + table = config['database']['raw_stations'] + + @property + def projection(self): + return config[self.city]['srid'] + + @property + def typename(self): + return config[self.city]['typename'] + + def requires(self): + return {"zip": UnzipTask(city=self.city), + "schema": CreateSchema(schema=self.city)} + + def output(self): + filepath = '_'.join(['task', 'shp2pgsql', "to", + self.city, self.table, 'proj', self.projection]) + return luigi.LocalTarget(os.path.join(DATADIR, self.city, + filepath + '.txt')) + + def run(self): + table = self.city + '.' + self.table + dirname = os.path.abspath(os.path.dirname(self.input()['zip'].path)) + shpfile = os.path.join(dirname, self.typename + '.shp') + shp2args = shp2pgsql_args(self.projection, shpfile, table) + psqlargs = psql_args() + with self.output().open('w') as fobj: + sh.psql(sh.shp2pgsql(shp2args), psqlargs) + fobj.write("shp2pgsql {} at {}\n".format(shpfile, dt.now())) + fobj.write("Create {schema}.{table}\n" + .format(schema=self.city, table=self.table)) + + +class NormalizeStationTable(PostgresQuery): + """ + """ + city = luigi.Parameter() + + host = config['database']['host'] + database = config['database']['dbname'] + user = config['database']['user'] + password = None + + query = ("DROP TABLE IF EXISTS {schema}.{tablename}; " + "CREATE TABLE {schema}.{tablename} (" + "id varchar," + "name varchar(250)," + "address varchar(500)," + "city varchar(100)," + "nb_stations int," + "geom geometry(POINT, 4326)" + "); " + "INSERT INTO {schema}.{tablename} " + "SELECT {id} AS id, {name} AS name, " + "{address} AS address, {city} AS city, " + "{nb_stations}::int AS nb_stations, " + "st_transform(st_force2D(geom), 4326) as geom " + "FROM {schema}.{raw_tablename}" + ";") + + @property + def table(self): + return '{schema}.{tablename}'.format( + schema=self.city, + tablename=config['database']['stations']) + + def requires(self): + return ShapefileIntoDB(self.city) + + def run(self): + connection = self.output().connect() + cursor = connection.cursor() + sql = self.query.format(schema=self.city, + tablename=self.table, + raw_tablename=config['database']['raw_stations'], + id=config[self.city]['feature_id'], + name=config[self.city]['feature_name'], + address=config[self.city]['feature_address'], + city=config[self.city]['feature_city'], + nb_stations=config[self.city]['feature_nb_stations']) + cursor.execute(sql) + # Update marker table + self.output().touch(connection) + # commit and close connection + connection.commit() + connection.close() + + +class BikeAvailability(luigi.Task): + """ + """ + city = luigi.Parameter() + timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) + + @property + def path(self): + if self.city == 'bordeaux': + return os.path.join(DATADIR, self.city, '{year}', + '{month:02d}', '{day:02d}', '{ts}.xml') + elif self.city == 'lyon': + return os.path.join(DATADIR, self.city, '{year}', + '{month:02d}', '{day:02d}', '{ts}.json') + else: + raise ValueError(("{} is an unknown city.".format(self.city))) + + @property + def url(self): + if self.city == 'bordeaux': + return BORDEAUX_BIKEAVAILABILITY_URL.format(key=config['bordeaux']['key']) + elif self.city == 'lyon': + return LYON_BIKEAVAILABILITY_URL + else: + raise ValueError(("{} is an unknown city.".format(self.city))) + + def requires(self): + return NormalizeStationTable(self.city) + + def output(self): + triple = lambda x: (x.year, x.month, x.day) + year, month, day = triple(self.timestamp) + ts = self.timestamp.strftime("%HH%M") # 16H35 + return luigi.LocalTarget(self.path.format(year=year, month=month, day=day, ts=ts), format=UTF8) + + def run(self): + resp = requests.get(self.url) + with self.output().open('w') as fobj: + if self.city == 'bordeaux': + fobj.write(resp.content.decode('ISO-8859-1').encode('utf-8').decode('utf-8')) + elif self.city == 'lyon': + data = resp.json() + json.dump(resp.json(), fobj, ensure_ascii=False) + else: + raise ValueError(("{} is an unknown city.".format(self.city))) + + +class AvailabilityToCSV(luigi.Task): + """Turn real-time bike availability to CSV files + """ + city = luigi.Parameter() + timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) + + @property + def path(self): + return os.path.join(DATADIR, self.city, '{year}', + '{month:02d}', '{day:02d}', '{ts}.csv') + + def requires(self): + return BikeAvailability(self.city) + + def output(self): + triple = lambda x: (x.year, x.month, x.day) + year, month, day = triple(self.timestamp) + ts = self.timestamp.strftime('%HH%M') # 16H35 + return luigi.LocalTarget(self.path.format(year=year, month=month, + day=day, ts=ts, format=UTF8)) + + def run(self): + with self.input().open() as fobj: + if self.city == 'bordeaux': + tree = etree.parse(fobj) + wfs_ns = '{http://www.opengis.net/wfs/2.0}' + bm_ns = '{http://data.bordeaux-metropole.fr/wfs}' + elements = (node.find(bm_ns + 'CI_VCUB_P') for node in tree.findall(wfs_ns + 'member')) + data = [] + for node in elements: + data.append(extract_xml_feature(node)) + df = pd.DataFrame([dict(x) for x in data]) + status_key = config[self.city]['feature_status'] + df[status_key] = df[status_key].apply( + lambda x: 'open' if x == 'CONNECTEE' else 'close') + elif self.city == 'lyon': + data = json.load(fobj) + df = pd.DataFrame(data['values'], columns=data['fields']) + status_key = config[self.city]['feature_status'] + df[status_key] = df[status_key].apply( + lambda x: 'open' if x == 'OPEN' else 'close') + else: + raise ValueError(("{} is an unknown city.".format(self.city))) + df = df[[config[self.city]['feature_avl_id'], + config[self.city]['feature_timestamp'], + config[self.city]['feature_avl_stands'], + config[self.city]['feature_avl_bikes'], + config[self.city]['feature_status']]] + df.columns = ["id", "timestamp", "available_stands", + "available_bikes", "status"] + df = df.sort_values(by="id") + with self.output().open('w') as fobj: + df.to_csv(fobj, index=False) + + +class AvailabilityToDB(CopyToTable): + """Insert bike availability data into a PostgreSQL table + """ + city = luigi.Parameter() + timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) + + host = config['database']['host'] + database = config['database']['dbname'] + user = config['database']['user'] + password = None + + columns = [('id', 'VARCHAR'), + ('timestamp', 'TIMESTAMP'), + ('available_stands', 'INT'), + ('available_bikes', 'INT'), + ('status', 'VARCHAR(12)')] + + @property + def table(self): + return '{schema}.{tablename}'.format( + schema=self.city, + tablename=config['database']['timeseries']) + + def rows(self): + """overload the rows method to skip the first line (header) + """ + with self.input().open('r') as fobj: + df = pd.read_csv(fobj) + for idx, row in df.iterrows(): + yield row.values + + def requires(self): + return AvailabilityToCSV(self.city, self.timestamp) + + def rows(self): + """overload the rows method to skip the first line (header) + """ + with self.input().open('r') as fobj: + df = pd.read_csv(fobj) + for idx, row in df.iterrows(): + if row.status == 'None' or row.available_stands == 'None': + continue + yield row.values + + +class AggregateTransaction(luigi.Task): + """Aggregate shared-bike transactions data into a CSV file (one transaction + = one bike taken, or one bike dropped off). + """ + city = luigi.Parameter() + date = luigi.DateParameter(default=yesterday()) + + @property + def path(self): + return os.path.join(DATADIR, self.city, '{year}', + '{month:02d}', '{day:02d}', 'transactions.csv') + + def output(self): + triple = lambda x: (x.year, x.month, x.day) + year, month, day = triple(self.date) + return luigi.LocalTarget(self.path.format(year=year, month=month, day=day), format=UTF8) + + def run(self): + query = ("SELECT DISTINCT * FROM {schema}.{tablename} " + "WHERE timestamp >= %(start)s AND timestamp < %(stop)s " + "ORDER BY timestamp, id" + ";").format(schema=self.city, + tablename=config['database']['timeseries']) + eng = db() + query_params = {"start": self.date, + "stop": self.date + timedelta(1)} + df = pd.io.sql.read_sql_query(query, eng, params=query_params) + transactions = (df.query("status == 'open'") + .groupby("id")['available_bikes'] + .apply(lambda s: s.diff().abs().sum()) + .dropna() + .to_frame() + .reset_index()) + transactions = transactions.rename_axis({"available_bikes": "transactions"}, axis=1) + with self.output().open('w') as fobj: + transactions.to_csv(fobj, index=False) + + +class TransactionsIntoDB(CopyToTable): + """Copy shared-bike transaction data into the database + """ + city = luigi.Parameter() + date = luigi.DateParameter(default=yesterday()) + + host = config['database']['host'] + database = config['database']['dbname'] + user = config['database']['user'] + password = None + + columns = [('id', 'VARCHAR'), + ('number', 'FLOAT'), + ('date', 'DATE')] + + @property + def table(self): + return '{schema}.{tablename}'.format( + schema=self.city, + tablename=config['database']['daily_transaction']) + + def rows(self): + """overload the rows method to skip the first line (header) and add date value + """ + with self.input().open('r') as fobj: + next(fobj) + for line in fobj: + yield line.strip('\n').split(',') + [self.date] + + def requires(self): + return AggregateTransaction(self.city, self.date) + + +class ComputeClusters(luigi.Task): + """Compute clusters corresponding to bike availability in `city` stations + between a `start` and an `end` date + """ + city = luigi.Parameter() + start = luigi.DateParameter(default=yesterday()) + stop = luigi.DateParameter(default=date.today()) + + def outputpath(self): + fname = "kmeans-{}-to-{}.h5".format(self.start, self.stop) + return os.path.join(DATADIR, self.city, 'clustering', fname) + + def output(self): + return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes) + + def run(self): + query = ("SELECT id, timestamp, available_bikes " + "FROM {}.timeseries " + "WHERE timestamp >= %(start)s " + "AND timestamp < %(stop)s " + "AND status = 'open';" + "").format(self.city) + eng = db() + df = pd.io.sql.read_sql_query(query, eng, + params={"start": self.start, + "stop": self.stop}) + df.columns = ["station_id", "ts", "nb_bikes"] + print(df) + clusters = compute_clusters(df) + self.output().makedirs() + path = self.output().path + clusters['labels'].to_hdf(path, '/clusters') + clusters['centroids'].to_hdf(path, '/centroids') + + +class StoreClustersToDatabase(CopyToTable): + """Read the cluster labels from `DATADIR//clustering.h5` file and store + them into `clustered_stations` + + """ + city = luigi.Parameter() + start = luigi.DateParameter(default=yesterday()) + stop = luigi.DateParameter(default=date.today()) + + host = config['database']['host'] + database = config['database']['dbname'] + user = config['database']['user'] + password = None + + columns = [('station_id', 'VARCHAR'), + ('start', 'DATE'), + ('stop', 'DATE'), + ('cluster_id', 'INT')] + + @property + def table(self): + return '{schema}.{tablename}'.format( + schema=self.city, + tablename=config['database']['clustering']) + + def rows(self): + inputpath = self.input().path + clusters = pd.read_hdf(inputpath, 'clusters') + for _, row in clusters.iterrows(): + modified_row = list(row.values) + modified_row.insert(1, self.stop) + modified_row.insert(1, self.start) + yield modified_row + + def requires(self): + return ComputeClusters(self.city, self.start, self.stop) + + def create_table(self, connection): + if len(self.columns[0]) == 1: + # only names of columns specified, no types + raise NotImplementedError(("create_table() not implemented for %r " + "and columns types not specified") + % self.table) + elif len(self.columns[0]) == 2: + # if columns is specified as (name, type) tuples + coldefs = ','.join('{name} {type}'.format(name=name, type=type) + for name, type in self.columns) + query = ("CREATE TABLE {table} ({coldefs}, " + "PRIMARY KEY (station_id, start, stop));" + "").format(table=self.table, coldefs=coldefs) + connection.cursor().execute(query) + + +class StoreCentroidsToDatabase(CopyToTable): + """Read the cluster centroids from `DATADIR//clustering.h5` file and + store them into `centroids` + + """ + city = luigi.Parameter() + start = luigi.DateParameter(default=yesterday()) + stop = luigi.DateParameter(default=date.today()) + + host = config['database']['host'] + database = config['database']['dbname'] + user = config['database']['user'] + password = None + first_columns = [('cluster_id', 'INT'), + ('start', 'DATE'), + ('stop', 'DATE')] + + @property + def columns(self): + if len(self.first_columns) == 3: + self.first_columns.extend([('h{:02d}'.format(i), 'DOUBLE PRECISION') + for i in range(24)]) + return self.first_columns + + @property + def table(self): + return '{schema}.{tablename}'.format( + schema=self.city, + tablename=config['database']['centroids']) + + def rows(self): + inputpath = self.input().path + clusters = pd.read_hdf(inputpath, 'centroids') + for _, row in clusters.iterrows(): + modified_row = list(row.values) + modified_row[0] = int(modified_row[0]) + modified_row.insert(1, self.stop) + modified_row.insert(1, self.start) + yield modified_row + + def requires(self): + return ComputeClusters(self.city, self.start, self.stop) + + def create_table(self, connection): + if len(self.columns[0]) == 1: + # only names of columns specified, no types + raise NotImplementedError(("create_table() not implemented for %r " + "and columns types not specified") + % self.table) + elif len(self.columns[0]) == 2: + # if columns is specified as (name, type) tuples + coldefs = ','.join('{name} {type}'.format(name=name, type=type) + for name, type in self.columns) + query = ("CREATE TABLE {table} ({coldefs}, " + "PRIMARY KEY (cluster_id, start, stop));" + "").format(table=self.table, coldefs=coldefs) + connection.cursor().execute(query) + + +class Clustering(luigi.Task): + """Clustering master task + + """ + city = luigi.Parameter() + start = luigi.DateParameter(default=yesterday()) + stop = luigi.DateParameter(default=date.today()) + + def requires(self): + yield StoreClustersToDatabase(self.city, self.start, self.stop) + yield StoreCentroidsToDatabase(self.city, self.start, self.stop) + + +class TrainXGBoost(luigi.Task): + """Train a XGBoost model between `start` and `stop` dates to predict bike + availability at each station in `city` + + Attributes + ---------- + city : luigi.Parameter + City of interest, *e.g.* Bordeaux or Lyon + start : luigi.DateParameter + Training start date + stop : luigi.DataParameter + Training stop date upper bound (actually the end date is computed with + `validation`) + validation : luigi.DateMinuteParameter + Date that bounds the training set and the validation set during the + XGBoost model training + frequency : DateOffset, timedelta or str + Indicates the prediction frequency + """ + city = luigi.Parameter() + start = luigi.DateParameter(default=yesterday()) + stop = luigi.DateParameter(default=date.today()) + validation = luigi.DateMinuteParameter(default=dt.now() - timedelta(hours=1)) + frequency = luigi.Parameter(default="30T") + + def outputpath(self): + fname = "{}-to-{}-at-{}-freq-{}.model".format(self.start, self.stop, + self.validation.isoformat(), + self.frequency) + return os.path.join(DATADIR, self.city, 'xgboost-model', fname) + + def output(self): + return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes) + + def run(self): + query = ("SELECT DISTINCT id AS station_id, timestamp AS ts, " + "available_bikes AS nb_bikes, available_stands AS nb_stands, " + "available_bikes::float / (available_bikes::float " + "+ available_stands::float) AS probability " + "FROM {schema}.{tablename} " + "WHERE timestamp >= %(start)s " + "AND timestamp < %(stop)s " + "AND (available_bikes > 0 OR available_stands > 0) " + "AND (status = 'open')" + "ORDER BY id, timestamp" + ";").format(schema=self.city, + tablename=config['database']['timeseries']) + eng = db() + df = pd.io.sql.read_sql_query(query, eng, + params={"start": self.start, + "stop": self.stop}) + if df.empty: + raise Exception("There is not any data to process in the DataFrame. " + + "Please check the dates.") + prediction_model = train_prediction_model(df, self.validation, self.frequency) + self.output().makedirs() + prediction_model.save_model(self.output().path) diff --git a/jitenshea/tasks/lyon.py b/jitenshea/tasks/lyon.py deleted file mode 100644 index 874680e..0000000 --- a/jitenshea/tasks/lyon.py +++ /dev/null @@ -1,515 +0,0 @@ -# coding: utf-8 - -"""Luigi tasks to retrieve and process data for Lyon - -Higly inspired from the Luigi tasks of the Tempus demo with the GrandLyon open datasets. -https://gitlab.com/Oslandia/tempus_demos -but dedicated to the bicycle-sharing data -""" - -import os -import json -import zipfile -from datetime import datetime as dt -from datetime import date, timedelta - -import numpy as np -from sklearn.cluster import KMeans - -import sh - -import requests - -import luigi -from luigi.contrib.postgres import CopyToTable, PostgresQuery -from luigi.format import UTF8, MixedUnicodeBytes - -import pandas as pd - -from jitenshea import config -from jitenshea.iodb import db, psql_args, shp2pgsql_args -from jitenshea.stats import compute_clusters, train_prediction_model - -_HERE = os.path.abspath(os.path.dirname(__file__)) -WFS_RDATA_URL = "https://download.data.grandlyon.com/wfs/rdata" -WFS_GRANDLYON_URL = "https://download.data.grandlyon.com/wfs/grandlyon" -DEFAULT_PARAMS = {'SERVICE': 'WFS', - 'VERSION': '2.0.0', - 'request': 'GetFeature'} -DATADIR = 'datarepo/lyon' - - -def params_factory(projection, output_format, dataname): - """return a new dict for HTTP query params - - Used for the wfs http query to get some data. - """ - res = {"SRSNAME": 'EPSG:' + projection, - "outputFormat": output_format, - "typename": dataname} - res.update(DEFAULT_PARAMS) - return res - -def yesterday(): - """Return the day before today - """ - return date.today() - timedelta(1) - - -class ShapefilesTask(luigi.Task): - """Task to download a zip files which includes the shapefile - - Need the source: rdata or grandlyon and the layer name (i.e. typename). - """ - source = luigi.Parameter() - typename = luigi.Parameter() - path = os.path.join(DATADIR , '{typename}.zip') - srid = 4326 - - def output(self): - return luigi.LocalTarget(self.path.format(typename=self.typename), - format=MixedUnicodeBytes) - - def run(self): - if self.source == 'rdata': - url = WFS_RDATA_URL - elif self.source == 'grandlyon': - url = WFS_GRANDLYON_URL - else: - raise Exception("source {} not supported".format(self.source)) - params = params_factory(str(self.srid), 'SHAPEZIP', self.typename) - with self.output().open('w') as fobj: - resp = requests.get(url, params=params) - resp.raise_for_status() - fobj.write(resp.content) - - -class UnzipTask(luigi.Task): - """Task dedicated to unzip file - - To get trace that the task has be done, the task creates a text file with - the same same of the input zip file with the '.done' suffix. This generated - file contains the path of the zipfile and all extracted files. - """ - source = luigi.Parameter(default='grandlyon') - typename = luigi.Parameter() - path = os.path.join(DATADIR , '{typename}.zip') - - def requires(self): - return ShapefilesTask(self.source, self.typename) - - def output(self): - filepath = os.path.join(DATADIR, "unzip-" + self.typename + '.done') - return luigi.LocalTarget(filepath) - - def run(self): - dirname = os.path.dirname(self.input().path) - with self.output().open('w') as fobj: - fobj.write("unzip {} at {}\n".format(self.typename, dt.now())) - zip_ref = zipfile.ZipFile(os.path.join(dirname, self.typename + ".zip"), 'r') - fobj.write("\n".join(elt.filename for elt in zip_ref.filelist)) - fobj.write("\n") - zip_ref.extractall(dirname) - zip_ref.close() - - -class CreateSchema(PostgresQuery): - host = config['database']['host'] - database = config['database']['dbname'] - user = config['database']['user'] - password = config['database'].get('password') - schema = luigi.Parameter() - table = luigi.Parameter(default='create_schema') - query = "CREATE SCHEMA IF NOT EXISTS {schema};" - - def run(self): - connection = self.output().connect() - cursor = connection.cursor() - sql = self.query.format(schema=self.schema) - cursor.execute(sql) - # Update marker table - self.output().touch(connection) - # commit and close connection - connection.commit() - connection.close() - - -class ShapefileIntoDB(luigi.Task): - """Dump a shapefile into a table - """ - source = luigi.Parameter(default="grandlyon") - typename = luigi.Parameter() - # table = luigi.Parameter() - schema = luigi.Parameter(default=config['lyon']["schema"]) - projection = luigi.Parameter(default='4326') - - @property - def table(self): - if '.' in self.typename: - return self.typename.split('.')[-1] - return self.typename - - def requires(self): - return {"zip": UnzipTask(source=self.source, typename=self.typename), - "schema": CreateSchema(schema=self.schema)} - - def output(self): - filepath = '_'.join(['task', 'shp2pgsql', self.typename, "to", - self.schema, self.table, 'proj', self.projection]) - return luigi.LocalTarget(os.path.join(DATADIR, filepath + '.txt')) - - def run(self): - table = self.schema + '.' + self.table - dirname = os.path.abspath(os.path.dirname(self.input()['zip'].path)) - shpfile = os.path.join(dirname, self.typename + '.shp') - shp2args = shp2pgsql_args(self.projection, shpfile, table) - psqlargs = psql_args() - # check if the schema exist. raise if this is not the case - with self.output().open('w') as fobj: - sh.psql(sh.shp2pgsql(shp2args), psqlargs) - fobj.write("shp2pgsql {} at {}\n".format(shpfile, dt.now())) - fobj.write("Create {schema}.{table}\n" - .format(schema=self.schema, table=self.table)) - - -class VelovStationAvailability(luigi.Task): - """Get in real-time the shared cycle stations avaibility in a JSON format. - - Get data every 5 minutes - """ - timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) - path = os.path.join(DATADIR, '{year}', '{month:02d}', '{day:02d}', '{ts}.json') - - def requires(self): - return ShapefileIntoDB(typename='pvo_patrimoine_voirie.pvostationvelov') - - def output(self): - triple = lambda x: (x.year, x.month, x.day) - year, month, day = triple(self.timestamp) - ts = self.timestamp.strftime("%HH%M") # 16H35 - return luigi.LocalTarget(self.path.format(year=year, month=month, day=day, ts=ts), format=UTF8) - - def run(self): - url = 'https://download.data.grandlyon.com/ws/rdata/jcd_jcdecaux.jcdvelov/all.json' - with self.output().open('w') as fobj: - resp = requests.get(url) - resp.raise_for_status - data = resp.json() - json.dump(resp.json(), fobj, ensure_ascii=False) - - -class VelovStationJSONtoCSV(luigi.Task): - """Turn real-time velov station data JSON file to a CSV. - """ - timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) - path = os.path.join(DATADIR, '{year}', '{month:02d}', '{day:02d}', '{ts}.csv') - keepcols = ['number', 'last_update', 'bike_stands', 'available_bike_stands', - 'available_bikes', 'availabilitycode', 'availability', 'bonus', - 'status'] - - def output(self): - triple = lambda x: (x.year, x.month, x.day) - year, month, day = triple(self.timestamp) - ts = self.timestamp.strftime("%HH%M") # 16H35 - return luigi.LocalTarget(self.path.format(year=year, month=month, day=day, ts=ts), format=UTF8) - - def requires(self): - return VelovStationAvailability(self.timestamp) - - def run(self): - with self.input().open() as fobj: - data = json.load(fobj) - df = pd.DataFrame(data['values'], columns=data['fields']) - with self.output().open('w') as fobj: - df[self.keepcols].to_csv(fobj, index=False) - - -class VelovStationDatabase(CopyToTable): - """Insert Velov stations data into a PostgreSQL table - """ - timestamp = luigi.DateMinuteParameter(default=dt.now(), interval=5) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['lyon']['schema'], - tablename=config['lyon']['table']) - - columns = [('number', 'INT'), - ('last_update', 'TIMESTAMP'), - ('bike_stands', 'INT'), - ('available_bike_stands', 'INT'), - ('available_bikes', 'INT'), - ('availabilitycode', 'INT'), - ('availability', 'VARCHAR(20)'), - ('bonus', 'VARCHAR(12)'), - ('status', 'VARCHAR(12)')] - - def rows(self): - """overload the rows method to skip the first line (header) - """ - with self.input().open('r') as fobj: - df = pd.read_csv(fobj) - for idx, row in df.iterrows(): - if row.status == 'None' or row.available_bike_stands == 'None': - continue - yield row.values - - def requires(self): - return VelovStationJSONtoCSV(self.timestamp) - - -class AggregateTransaction(luigi.Task): - """Aggregate bicycle-share transactions data into a CSV file. - """ - date = luigi.DateParameter(default=yesterday()) - path = os.path.join(DATADIR, '{year}', '{month:02d}', '{day:02d}', 'transactions.csv') - - def output(self): - triple = lambda x: (x.year, x.month, x.day) - year, month, day = triple(self.date) - return luigi.LocalTarget(self.path.format(year=year, month=month, day=day), format=UTF8) - - def run(self): - query = """SELECT DISTINCT * FROM {schema}.{tablename} - WHERE last_update >= %(start)s AND last_update < %(stop)s - ORDER BY last_update,number""".format(schema=config["lyon"]["schema"], - tablename=config['lyon']['table']) - eng = db() - df = pd.io.sql.read_sql_query(query, eng, params={"start": self.date, - "stop": self.date + timedelta(1)}) - transactions = (df.query("status == 'OPEN'") - .groupby("number")['available_bikes'] - .apply(lambda s: s.diff().abs().sum()) - .dropna() - .to_frame() - .reset_index()) - transactions = transactions.rename_axis({"available_bikes": "transactions"}, axis=1) - with self.output().open('w') as fobj: - transactions.to_csv(fobj, index=False) - - -class AggregateLyonTransactionIntoDB(CopyToTable): - """Aggregate bicycle-share transactions data into the database. - """ - date = luigi.DateParameter(default=yesterday()) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['lyon']['schema'], - tablename=config['lyon']['daily_transaction']) - columns = [('id', 'INT'), - ('number', 'FLOAT'), - ('date', 'DATE')] - - def rows(self): - """overload the rows method to skip the first line (header) and add date value - """ - with self.input().open('r') as fobj: - next(fobj) - for line in fobj: - yield line.strip('\n').split(',') + [self.date] - - def requires(self): - return AggregateTransaction(self.date) - -class LyonComputeClusters(luigi.Task): - """Compute clusters corresponding to bike availability in lyon stations - between a `start` and an `end` date - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - def outputpath(self): - fname = "kmeans-{}-to-{}.h5".format(self.start, self.stop) - return os.path.join(DATADIR, 'clustering', fname) - - def output(self): - return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes) - - def run(self): - query = ("SELECT number, last_update, available_bikes " - "FROM {}.{} " - "WHERE last_update >= %(start)s " - "AND last_update < %(stop)s;" - "").format(config['lyon']['schema'], config['lyon']['table']) - eng = db() - df = pd.io.sql.read_sql_query(query, eng, - params={"start": self.start, - "stop": self.stop}) - df.columns = ["station_id", "ts", "nb_bikes"] - clusters = compute_clusters(df) - self.output().makedirs() - path = self.output().path - clusters['labels'].to_hdf(path, '/clusters') - clusters['centroids'].to_hdf(path, '/centroids') - -class LyonStoreClustersToDatabase(CopyToTable): - """Read the cluster labels from `DATADIR/lyon-clustering.h5` file and store - them into `clustered_stations` - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['lyon']['schema'], - tablename=config['lyon']['clustering']) - columns = [('station_id', 'INT'), - ('start', 'DATE'), - ('stop', 'DATE'), - ('cluster_id', 'INT')] - - def rows(self): - inputpath = self.input().path - clusters = pd.read_hdf(inputpath, 'clusters') - for _, row in clusters.iterrows(): - modified_row = list(row.values) - modified_row.insert(1, self.stop) - modified_row.insert(1, self.start) - yield modified_row - - def requires(self): - return LyonComputeClusters(self.start, self.stop) - - def create_table(self, connection): - if len(self.columns[0]) == 1: - # only names of columns specified, no types - raise NotImplementedError(("create_table() not implemented for %r " - "and columns types not specified") - % self.table) - elif len(self.columns[0]) == 2: - # if columns is specified as (name, type) tuples - coldefs = ','.join('{name} {type}'.format(name=name, type=type) - for name, type in self.columns) - query = ("CREATE TABLE {table} ({coldefs}, " - "PRIMARY KEY (station_id, start, stop));" - "").format(table=self.table, coldefs=coldefs) - connection.cursor().execute(query) - -class LyonStoreCentroidsToDatabase(CopyToTable): - """Read the cluster centroids from `DATADIR/lyon-clustering.h5` file and store - them into `centroids` - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - host = 'localhost' - database = config['database']['dbname'] - user = config['database']['user'] - password = None - table = '{schema}.{tablename}'.format(schema=config['lyon']['schema'], - tablename=config['lyon']['centroids']) - first_columns = [('cluster_id', 'INT'), - ('start', 'DATE'), - ('stop', 'DATE')] - - @property - def columns(self): - if len(self.first_columns) == 3: - self.first_columns.extend([('h'+str(i), 'DOUBLE PRECISION') - for i in range(24)]) - return self.first_columns - - def rows(self): - inputpath = self.input().path - clusters = pd.read_hdf(inputpath, 'centroids') - for _, row in clusters.iterrows(): - modified_row = list(row.values) - modified_row[0] = int(modified_row[0]) - modified_row.insert(1, self.stop) - modified_row.insert(1, self.start) - yield modified_row - - def requires(self): - return LyonComputeClusters(self.start, self.stop) - - def create_table(self, connection): - if len(self.columns[0]) == 1: - # only names of columns specified, no types - raise NotImplementedError(("create_table() not implemented for %r " - "and columns types not specified") - % self.table) - elif len(self.columns[0]) == 2: - # if columns is specified as (name, type) tuples - coldefs = ','.join('{name} {type}'.format(name=name, type=type) - for name, type in self.columns) - query = ("CREATE TABLE {table} ({coldefs}, " - "PRIMARY KEY (cluster_id, start, stop));" - "").format(table=self.table, coldefs=coldefs) - connection.cursor().execute(query) - - -class LyonClustering(luigi.Task): - """Clustering master task - - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - - def requires(self): - yield LyonStoreClustersToDatabase(self.start, self.stop) - yield LyonStoreCentroidsToDatabase(self.start, self.stop) - - -class LyonTrainXGBoost(luigi.Task): - """Train a XGBoost model between `start` and `stop` dates to predict bike - availability at each station - - Attributes - ---------- - start : luigi.DateParameter - Training start date - stop : luigi.DataParameter - Training stop date upper bound (actually the end date is computed with - `validation`) - validation : luigi.DateMinuteParameter - Date that bounds the training set and the validation set during the - XGBoost model training - frequency : DateOffset, timedelta or str - Indicates the prediction frequency - """ - start = luigi.DateParameter(default=yesterday()) - stop = luigi.DateParameter(default=date.today()) - validation = luigi.DateMinuteParameter(default=date.today()-timedelta(hours=1)) - frequency = luigi.Parameter(default="30T") - - def outputpath(self): - fname = "{}-to-{}-at-{}-freq-{}.model".format(self.start, self.stop, - self.validation.isoformat(), - self.frequency) - return os.path.join(DATADIR, 'xgboost-model', fname) - - def output(self): - return luigi.LocalTarget(self.outputpath(), format=MixedUnicodeBytes) - - def run(self): - query = ("SELECT DISTINCT number AS station_id, last_update AS ts, " - "available_bikes AS nb_bikes, " - "available_bike_stands AS nb_stands, " - "available_bikes::float / (available_bikes::float " - "+ available_bike_stands::float) AS probability " - "FROM {}.{} " - "WHERE last_update >= %(start)s " - "AND last_update < %(stop)s " - "AND (available_bikes > 0 OR available_bike_stands > 0) " - "AND status = 'OPEN'" - "ORDER BY station_id, ts" - ";").format(config['lyon']['schema'], config['lyon']['table']) - eng = db() - df = pd.io.sql.read_sql_query(query, eng, - params={"start": self.start, - "stop": self.stop}) - if df.empty: - raise Exception("There is not any data to process in the DataFrame. " - + "Please check the dates.") - prediction_model = train_prediction_model(df, self.validation, self.frequency) - self.output().makedirs() - prediction_model.save_model(self.output().path)