Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark_feature_extraction_on_chunk works only with column_id = "id" #709

Closed
andyvanyperenAM opened this issue Jun 8, 2020 · 5 comments
Closed
Assignees
Labels

Comments

@andyvanyperenAM
Copy link

Hi,

While testing the spark_feature_extraction_on_chunck function, I noticed it works if the column_id is equal to "id", but for any other value it does not work. Obviously, there is an easy workaround to rename the columnname, however I noticed in the code everything is foreseen to work generally.

The error which is thrown:
Py4JJavaError: An error occurred while calling o284.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 124, server, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/<root_to_spark_container>/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/<root_to_spark_container>/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/<root_to_spark_container>/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "", line 1, in
File "/<root_to_spark_container>/pyspark.zip/pyspark/worker.py", line 113, in wrapped
result = f(pd.concat(value_series, axis=1))
File "/<root_to_spark_container>/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/<root_to_spark_container>/tsfresh/tsfresh/lib/python3.7/site-packages/tsfresh/convenience/bindings.py", line 30, in _feature_extraction_on_chunk_helper
return features[[column_id, "variable", "value"]]
File "/<root_to_spark_container>/tsfresh/tsfresh/lib/python3.7/site-packages/pandas/core/frame.py", line 2806, in getitem
indexer = self.loc._get_listlike_indexer(key, axis=1, raise_missing=True)[1]
File "/<root_to_spark_container>/tsfresh/tsfresh/lib/python3.7/site-packages/pandas/core/indexing.py", line 1553, in _get_listlike_indexer
keyarr, indexer, o._get_axis_number(axis), raise_missing=raise_missing
File "/<root_to_spark_container>/tsfresh/tsfresh/lib/python3.7/site-packages/pandas/core/indexing.py", line 1646, in _validate_read_indexer
raise KeyError(f"{not_found} not in index")
KeyError: "['id2'] not in index"

Quick hack in the code could be:
-< tsfresh/convenience/bindings.py: 30: return features[[column_id, "variable", "value"]]
+> tsfresh/convenience/bindings.py: 30: return features[["id", "variable", "value"]]

-> tsfresh/convenience/bindings.py: 210: return df.apply(feature_extraction_udf)
+> tsfresh/convenience/bindings.py: 210: return df.apply(feature_extraction_udf).withColumnRenamed("id",column_id)

But this does not seem the cleanest way. Other suggestions?

  1. OS: Submitting from W10, submitting to an HDFS cluster using spark
  2. version: 0.16.0
  3. data: verified with tsfresh.examples.robot_execution_failures
@nils-braun
Copy link
Collaborator

Hi @andyvanyperenAM !
Thank you very much for the issue.

You are totally correct - that is a bug.

Actually in tsfresh, the function _do_extraction_on_chunk is only used internally, so we chose to just stick with id all the time. This allows us to not need to check, if e.g. the user has chosen a id column which is accidentally named "values" or "variables" (which we also internally need).
So after all, your "hack" might be the best solution (otherwise we need additional checks).

Other possibility would be to always return a dataframe with the columns "id", "variable" and "value". - and do not care about the column_id for the result.
Personally, I do not think this is very bad, as "variable" and "value" are also "artificially" chosen by us. What do you think?

If you want, you can do a PR or I can fix it quickly.

@nils-braun nils-braun self-assigned this Jun 8, 2020
@nils-braun nils-braun added the bug label Jun 8, 2020
andyvanyperenAM added a commit to andyvanyperenAM/tsfresh that referenced this issue Jun 8, 2020
…by-key in the

original DataFrame.
The same column_id appears in the resulting dataset, mimicking as much
as possible a "groupby" operation.
@andyvanyperenAM
Copy link
Author

Hi @nils-braun

Thanks for your prompt reply. As you might see, I did an attempt to fix it, given my suggestion in the original description, however this still does not seem to work for me (only installed this version on the cluster, not locally, that might be the issue?). Could you test it, or do you see the issue with the current proposed fix?

@andyvanyperenAM
Copy link
Author

oh, and I forgot to answer your question: the best option is to keep column_id, since it can be interpreted as a groupby operation, which keeps the original name of the column name for traditional groupby's. This makes much more sense to me than renaming the column to "id".

nils-braun added a commit that referenced this issue Jun 11, 2020
After the pivoting was speed up in #705,
the helper function for the spark and dask bindings also needs to
change.

Additionally, this allows to introduce a fix for #709.
@nils-braun
Copy link
Collaborator

nils-braun commented Jun 11, 2020

Ok, makes sense.
In #705 there was a change to the _do_extraction_on_chunk function. Porting this change to the binding helpers allowed me to use the id column named passed by the user.

For me, the following works now (now = with #712):

# Prepare data
from tsfresh.examples import robot_execution_failures
import os

robot_execution_failures.download_robot_execution_failures()
timeseries, _ = robot_execution_failures.load_robot_execution_failures()

def store_data(data_chunk):
    data_id = data_chunk["id"].iloc[0]

    os.makedirs(f"data/{data_id}", exist_ok=True)
    data_chunk.to_parquet(f"data/{data_id}/data.parquet", index=False, compression="gzip")

timeseries.groupby("id").apply(store_data)

# Extraction
from tsfresh.convenience.bindings import spark_feature_extraction_on_chunk
from tsfresh.feature_extraction.settings import MinimalFCParameters

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame, SparkSession
from typing import Iterable

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet("data/*/data.parquet")
df.printSchema()

def melt(
        df: DataFrame,
        id_vars: Iterable[str], value_vars: Iterable[str],
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""

    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name))
        for c in value_vars))

    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))

    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

df_melted = melt(df, id_vars=["id", "time"],
                 value_vars=["F_x", "F_y", "F_z", "T_x", "T_y", "T_z"],
                 var_name="kind", value_name="value")
df_melted.printSchema()
df_melted = df_melted.withColumnRenamed("id", "not_id")

df_melted.show()

df_grouped = df_melted.groupby(["not_id", "kind"])

features = spark_feature_extraction_on_chunk(df_grouped, column_id="not_id",
                                             column_kind="kind",
                                             column_sort="time",
                                             column_value="value",
                                             default_fc_parameters=MinimalFCParameters())

print(features.take(1))

and prints [Row(not_id=12, variable='F_x__sum_values', value=-6.0)] as expected.

nils-braun added a commit that referenced this issue Jun 12, 2020
* Fix bindings helper function

After the pivoting was speed up in #705,
the helper function for the spark and dask bindings also needs to
change.

Additionally, this allows to introduce a fix for #709.

* Changelog
nils-braun added a commit that referenced this issue Jun 20, 2020
* Fix bindings helper function

After the pivoting was speed up in #705,
the helper function for the spark and dask bindings also needs to
change.

Additionally, this allows to introduce a fix for #709.

* Changelog

* Added test for dask bindings and some small fixes

* Changelog
@nils-braun
Copy link
Collaborator

For me, the problem was solved in #712 - therefore I am closing this issue now.
If the PR did not help, please feel free to comment again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants