Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask and spark convenience #651

Merged
merged 5 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/api/tsfresh.convenience.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,12 @@ relevant_extraction module
:undoc-members:
:show-inheritance:

bindings module
--------------------------

.. automodule:: tsfresh.convenience.bindings
:members:
:undoc-members:
:show-inheritance:


210 changes: 210 additions & 0 deletions tsfresh/convenience/bindings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
from functools import partial

from tsfresh.feature_extraction.extraction import _do_extraction_on_chunk
from tsfresh.feature_extraction.settings import ComprehensiveFCParameters

import pandas as pd


def _feature_extraction_on_chunk_helper(df, column_id, column_kind,
column_sort, column_value,
default_fc_parameters, kind_to_fc_parameters):
"""
Helper function wrapped around _do_extraction_on_chunk to use the correct format
of the "chunk" and output a pandas dataframe.
Is used e.g. in the convenience functions for dask and spark.

For the definitions of the parameters, please see these convenience functions.
"""
if default_fc_parameters is None and kind_to_fc_parameters is None:
default_fc_parameters = ComprehensiveFCParameters()
elif default_fc_parameters is None and kind_to_fc_parameters is not None:
default_fc_parameters = {}

chunk = df[column_id].iloc[0], df[column_kind].iloc[0], df.sort_values(column_sort)[column_value]
features = _do_extraction_on_chunk(chunk, default_fc_parameters=default_fc_parameters,
kind_to_fc_parameters=kind_to_fc_parameters)
features = pd.DataFrame(features)
features["value"] = features["value"].astype("double")

return features[[column_id, "variable", "value"]]


def dask_feature_extraction_on_chunk(df, column_id, column_kind,
column_sort, column_value,
default_fc_parameters=None, kind_to_fc_parameters=None):
"""
Extract features on a grouped dask dataframe given the column names and the extraction settings.
This wrapper function should only be used if you have a dask dataframe as input.
All format handling (input and output) needs to be done before or after that.

Examples
========

For example if you want to extract features on the robot example dataframe (stored as csv):

Import statements:

>>> from dask import dataframe as dd
>>> from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
>>> from tsfresh.feature_extraction.settings import MinimalFCParameters

Read in the data

>>> df = dd.read_csv("robot.csv")

Prepare the data into correct format.
The format needs to be a grouped dataframe (grouped by time series id and feature kind),
where each group chunk consists of a dataframe with exactly 4 columns: ``column_id``,
``column_kind``, ``column_sort`` and ``column_value``.
You can find the description of the columns in :ref:`data-formats-label`.
Please note: for this function to work you need to have all columns present!
If necessary create the columns and fill them with dummy values.

>>> df = df.melt(id_vars=["id", "time"],
... value_vars=["F_x", "F_y", "F_z", "T_x", "T_y", "T_z"],
... var_name="kind", value_name="value")
>>> df_grouped = df.groupby(["id", "kind"])

Call the feature extraction

>>> features = dask_feature_extraction_on_chunk(df_grouped, column_id="id", column_kind="kind",
... column_sort="time", column_value="value",
... default_fc_parameters=MinimalFCParameters())

Write out the data in a tabular format

>>> features = features.categorize(columns=["variable"])
>>> features = features.reset_index(drop=True) \\
... .pivot_table(index="id", columns="variable", values="value", aggfunc="mean")
>>> features.to_csv("output")


:param df: A dask dataframe grouped by id and kind.
:type df: dask.dataframe.groupby.DataFrameGroupBy

:param default_fc_parameters: mapping from feature calculator names to parameters. Only those names
which are keys in this dict will be calculated. See the class:`ComprehensiveFCParameters` for
more information.
:type default_fc_parameters: dict

:param kind_to_fc_parameters: mapping from kind names to objects of the same type as the ones for
default_fc_parameters. If you put a kind as a key here, the fc_parameters
object (which is the value), will be used instead of the default_fc_parameters. This means
that kinds, for which kind_of_fc_parameters doe not have any entries, will be ignored by
the feature selection.
:type kind_to_fc_parameters: dict

:param column_id: The name of the id column to group by.
:type column_id: str

:param column_sort: The name of the sort column.
:type column_sort: str

:param column_kind: The name of the column keeping record on the kind of the value.
:type column_kind: str

:param column_value: The name for the column keeping the value itself.
:type column_value: str

:return: A dask dataframe with the columns ``column_id``, "variable" and "value". The index is taken
from the grouped dataframe.
:rtype: dask.dataframe.DataFrame (id int64, variable object, value float64)

"""
feature_extraction = partial(_feature_extraction_on_chunk_helper,
column_id=column_id, column_kind=column_kind,
column_sort=column_sort, column_value=column_value,
default_fc_parameters=default_fc_parameters,
kind_to_fc_parameters=kind_to_fc_parameters)
return df.apply(feature_extraction, meta={column_id: 'int64', 'variable': 'object', 'value': 'float64'})


def spark_feature_extraction_on_chunk(df, column_id, column_kind,
column_sort, column_value,
default_fc_parameters, kind_to_fc_parameters=None):
"""
Extract features on a grouped spark dataframe given the column names and the extraction settings.
This wrapper function should only be used if you have a spark dataframe as input.
All format handling (input and output) needs to be done before or after that.

Examples
========

For example if you want to extract features on the robot example dataframe (stored as csv):

Import statements:

>>> from tsfresh.convenience.bindings import spark_feature_extraction_on_chunk
>>> from tsfresh.feature_extraction.settings import MinimalFCParameters

Read in the data

>>> df = spark.read(...)

Prepare the data into correct format.
The format needs to be a grouped dataframe (grouped by time series id and feature kind),
where each group chunk consists of a dataframe with exactly 4 columns: ``column_id``,
``column_kind``, ``column_sort`` and ``column_value``.
You can find the description of the columns in :ref:`data-formats-label`.
Please note: for this function to work you need to have all columns present!
If necessary create the columns and fill them with dummy values.

>>> df = ...
>>> df_grouped = df.groupby(["id", "kind"])

Call the feature extraction

>>> features = spark_feature_extraction_on_chunk(df_grouped, column_id="id", column_kind="kind",
... column_sort="time", column_value="value",
... default_fc_parameters=MinimalFCParameters())

Write out the data in a tabular format

>>> features = features.groupby("id").pivot("variable").sum("value")
>>> features.write.csv("output")


:param df: A spark dataframe grouped by id and kind.
:type df: pyspark.sql.group.GroupedData

:param default_fc_parameters: mapping from feature calculator names to parameters. Only those names
which are keys in this dict will be calculated. See the class:`ComprehensiveFCParameters` for
more information.
:type default_fc_parameters: dict

:param kind_to_fc_parameters: mapping from kind names to objects of the same type as the ones for
default_fc_parameters. If you put a kind as a key here, the fc_parameters
object (which is the value), will be used instead of the default_fc_parameters.
This means that kinds, for which kind_of_fc_parameters doe not have any entries,
will be ignored by the feature selection.
:type kind_to_fc_parameters: dict

:param column_id: The name of the id column to group by.
:type column_id: str

:param column_sort: The name of the sort column.
:type column_sort: str

:param column_kind: The name of the column keeping record on the kind of the value.
:type column_kind: str

:param column_value: The name for the column keeping the value itself.
:type column_value: str

:return: A dask dataframe with the columns ``column_id``, "variable" and "value".
:rtype: pyspark.sql.DataFrame[id: bigint, variable: string, value: double]

"""
from pyspark.sql.functions import pandas_udf, PandasUDFType

feature_extraction = partial(_feature_extraction_on_chunk_helper,
column_id=column_id, column_kind=column_kind,
column_sort=column_sort, column_value=column_value,
default_fc_parameters=default_fc_parameters,
kind_to_fc_parameters=kind_to_fc_parameters)

feature_extraction_udf = pandas_udf(f"{column_id} long, variable string, value double",
PandasUDFType.GROUPED_MAP)(feature_extraction)

return df.apply(feature_extraction_udf)