Skip to content

Conversation

@EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Oct 12, 2022

What changes were proposed in this pull request?

Improve the error messages when a Python method provided to DataFrame.groupby(...).applyInPandas / DataFrame.groupby(...).cogroup(...).applyInPandas returns a Pandas DataFrame that does not match the expected schema.

With

gdf = spark.range(2).join(spark.range(3).withColumnRenamed("id", "val")).groupby("id")

Mismatching column names, matching number of columns:

gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, val long").show()
# was: KeyError: 'val'
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Missing: val  Unexpected: v

Mismatching column names, different number of columns:

gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 3]).rename(columns={"val": "v"}), "id long, val long").show()
# was: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
#      Expected: 2 Actual: 3
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Missing: val  Unexpected: foo, v

Expected schema matches but has duplicates (id) so that number of columns match:

gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, id long").show()
# was: java.lang.IllegalArgumentException: not all nodes and buffers were consumed.
#      nodes: [ArrowFieldNode [length=3, nullCount=0]]
#      buffers: [ArrowBuf[304], address:139860828549160, length:0, ArrowBuf[305], address:139860828549160, length:24]
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Unexpected: v 

In case the returned Pandas DataFrame contains no column names (none of the column labels is a string):

gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 3]).rename(columns={"id": 0, "val": 1, "foo": 2}), "id long, val long").show()
# was: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
#      Expected: 2 Actual: 3
# now: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
#      Expected: 2  Actual: 3

Mismatching types (ValueError and TypeError):

gdf.applyInPandas(lambda pdf: pdf, "id int, val string").show()
# was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
# now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
#      The above exception was the direct cause of the following exception:
#      TypeError: Exception thrown when converting pandas.Series (int64) with name 'val' to Arrow Array (string).


gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id int, val double").show()
# was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
# now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
#      The above exception was the direct cause of the following exception:
#      ValueError: Exception thrown when converting pandas.Series (object) with name 'val' to Arrow Array (double).

with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
  gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id int, val double").show()
# was: ValueError: Exception thrown when converting pandas.Series (object) to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
# now: ValueError: Exception thrown when converting pandas.Series (object) with name 'val' to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.

Why are the changes needed?

Existing errors are generic (KeyError) or meaningless (not all nodes and buffers were consumed). The errors should help users in spotting the mismatching columns by naming them.

The schema of the returned Pandas DataFrames can only be checked during processing the DataFrame, so such errors are very expensive. Therefore, they should be expressive.

Does this PR introduce any user-facing change?

This only changes error messages, not behaviour.

How was this patch tested?

Tests all cases of schema mismatch for GroupedData.applyInPandas and PandasCogroupedOps.applyInPandas.

@EnricoMi
Copy link
Contributor Author

The ... Schema: id, val bit can grow arbitrarily large, maybe this should be limited to the first X columns or characters.

If there are multiple calls to applyInPandas in a DataFrame's plan, this should help to spot which one cased the error.

@EnricoMi
Copy link
Contributor Author

@zhengruifeng
Copy link
Contributor

also cc @itholic since it's to improve pyspark error message

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@EnricoMi EnricoMi force-pushed the branch-pyspark-apply-in-pandas-schema-mismatch branch from 4af2a61 to 3d2ebbd Compare October 17, 2022 14:16
@cloud-fan
Copy link
Contributor

Yea I think we should provide better error message for this user error. @HyukjinKwon can you take a look?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can get the name from Series e.g., s.name instead of passing extra n?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, this parameter is ugly as it is only used for the exception, but s.name is 1 instead of mean (test test_apply_in_pandas_returning_incompatible_type) when I apply this change.

Looks like s[field.name] and s[s.columns[i]] returns a Series that thinks it's name is 1. Sad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But s[s.columns[i]].rename(field.name) fixes that. Thanks for the idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@HyukjinKwon
Copy link
Member

Looks pretty fine from a cursory look. @zhengruifeng @xinrong-meng @itholic would be great if you guys find some time for a second look.

Copy link
Contributor

@itholic itholic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good except #38223 (comment).

Nice work!

@EnricoMi EnricoMi force-pushed the branch-pyspark-apply-in-pandas-schema-mismatch branch 3 times, most recently from 7c1da83 to 964d891 Compare October 21, 2022 09:57
@EnricoMi
Copy link
Contributor Author

I have also limited the schema string in the error message to 1024 characters / missing and unexpected columns to 5.

@EnricoMi EnricoMi force-pushed the branch-pyspark-apply-in-pandas-schema-mismatch branch 2 times, most recently from 3c761ec to 6465ca5 Compare October 21, 2022 13:38
@EnricoMi
Copy link
Contributor Author

@HyukjinKwon I have addressed your comment.

@HyukjinKwon
Copy link
Member

Since this touches the core Python worker side, let's make sure to get reviewed from other people like @BryanCutler @viirya @ueshin too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think regex match is fine. this is too long .e.g, just column_with_long_column_name_.* would be good enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is gone

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could leverage r"ValueError: ..." to avoid backslashes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder if this abbreviation is too much (and the character limit) ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., would have to consider having a config to control this too ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I wonder if this abbreviation is too much

Too much logic (better remove) or the limit too high?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I need your input here. Should I remove the abbreviation logic, or even printing the schema at all, to simplify things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I have removed the schema and columns limit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can call assign_cols_by_name(runner_conf) outside of wrapped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could, but what is the benefit of moving that out of wrapped?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapped will be called many times. We want to reduce as much overhead as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, thanks for clarification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 183 to 187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Number of columns of the returned pandas.DataFrame "
"doesn't match specified schema. "
"Expected: {} Actual: {}".format(
len(return_type),
len(result.columns)
"Number of columns of the returned pandas.DataFrame "
"doesn't match specified schema. "
"Expected: {}. Actual: {}".format(
len(return_type),
len(result.columns)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this error message to before PR since that message is always short and easy to read without spaces or periods.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from a curiosity look.

@EnricoMi EnricoMi force-pushed the branch-pyspark-apply-in-pandas-schema-mismatch branch from b2c3fde to 5e57eb3 Compare November 24, 2022 10:26
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Feb 9, 2023

Merged to master.

HyukjinKwon pushed a commit that referenced this pull request Feb 9, 2023
…chema mismatch

### What changes were proposed in this pull request?
Improve the error messages when a Python method provided to `DataFrame.groupby(...).applyInPandas` / `DataFrame.groupby(...).cogroup(...).applyInPandas` returns a Pandas DataFrame that does not match the expected schema.

With
```Python
gdf = spark.range(2).join(spark.range(3).withColumnRenamed("id", "val")).groupby("id")
```

**Mismatching column names, matching number of columns:**
```Python
gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, val long").show()
# was: KeyError: 'val'
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Missing: val  Unexpected: v
```

**Mismatching column names, different number of columns:**
```Python
gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 3]).rename(columns={"val": "v"}), "id long, val long").show()
# was: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
#      Expected: 2 Actual: 3
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Missing: val  Unexpected: foo, v
```

**Expected schema matches but has duplicates (`id`) so that number of columns match:**
```Python
gdf.applyInPandas(lambda pdf: pdf.rename(columns={"val": "v"}), "id long, id long").show()
# was: java.lang.IllegalArgumentException: not all nodes and buffers were consumed.
#      nodes: [ArrowFieldNode [length=3, nullCount=0]]
#      buffers: [ArrowBuf[304], address:139860828549160, length:0, ArrowBuf[305], address:139860828549160, length:24]
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Unexpected: v
```

**In case the returned Pandas DataFrame contains no column names (none of the column labels is a string):**
```Python
gdf.applyInPandas(lambda pdf: pdf.assign(foo=[3, 3, 3]).rename(columns={"id": 0, "val": 1, "foo": 2}), "id long, val long").show()
# was: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
#      Expected: 2 Actual: 3
# now: RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match specified schema.
#      Expected: 2  Actual: 3
```

**Mismatching types (ValueError and TypeError):**
```Python
gdf.applyInPandas(lambda pdf: pdf, "id int, val string").show()
# was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
# now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
#      The above exception was the direct cause of the following exception:
#      TypeError: Exception thrown when converting pandas.Series (int64) with name 'val' to Arrow Array (string).

gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id int, val double").show()
# was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
# now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
#      The above exception was the direct cause of the following exception:
#      ValueError: Exception thrown when converting pandas.Series (object) with name 'val' to Arrow Array (double).

with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": safely}):
  gdf.applyInPandas(lambda pdf: pdf.assign(val=pdf["val"].apply(str)), "id int, val double").show()
# was: ValueError: Exception thrown when converting pandas.Series (object) to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
# now: ValueError: Exception thrown when converting pandas.Series (object) with name 'val' to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
```

### Why are the changes needed?
Existing errors are generic (`KeyError`) or meaningless (`not all nodes and buffers were consumed`). The errors should help users in spotting the mismatching columns by naming them.

The schema of the returned Pandas DataFrames can only be checked during processing the DataFrame, so such errors are very expensive. Therefore, they should be expressive.

### Does this PR introduce _any_ user-facing change?
This only changes error messages, not behaviour.

### How was this patch tested?
Tests all cases of schema mismatch for `GroupedData.applyInPandas` and `PandasCogroupedOps.applyInPandas`.

Closes #38223 from EnricoMi/branch-pyspark-apply-in-pandas-schema-mismatch.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit bdf56c4)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Mar 2, 2023

@HyukjinKwon why has this been reverted in branch-3.4? a86324c

CI looked pretty green: https://github.com/apache/spark/actions/runs/4129838944/jobs/7135888364

This contribution had been finished in November, see above comments. Though branch-3.4 has been cut, this should still have gone into the 3.4.0 release.

xinrong-meng pushed a commit that referenced this pull request Jul 18, 2023
…das for schema mismatch

### What changes were proposed in this pull request?
Similar to #38223, improve the error messages when a Python method provided to `DataFrame.mapInPandas` returns a Pandas DataFrame that does not match the expected schema.

With
```Python
df = spark.range(2).withColumn("v", col("id"))
```

**Mismatching column names:**
```Python
df.mapInPandas(lambda it: it, "id long, val long").show()
# was: KeyError: 'val'
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Missing: val  Unexpected: v
```

**Python function not returning iterator:**
```Python
df.mapInPandas(lambda it: 1, "id long").show()
# was: TypeError: 'int' object is not iterable
# now: TypeError: Return type of the user-defined function should be iterator of pandas.DataFrame, but is <class 'int'>
```

**Python function not returning iterator of pandas.DataFrame:**
```Python
df.mapInPandas(lambda it: [1], "id long").show()
# was: TypeError: Return type of the user-defined function should be Pandas.DataFrame, but is <class 'int'>
# now: TypeError: Return type of the user-defined function should be iterator of pandas.DataFrame, but is iterator of <class 'int'>
# sometimes: ValueError: A field of type StructType expects a pandas.DataFrame, but got: <class 'list'>
# now: TypeError: Return type of the user-defined function should be iterator of pandas.DataFrame, but is iterator of <class 'list'>
```

**Mismatching types (ValueError and TypeError):**
```Python
df.mapInPandas(lambda it: it, "id int, v string").show()
# was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
# now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
#      The above exception was the direct cause of the following exception:
#      TypeError: Exception thrown when converting pandas.Series (int64) with name 'v' to Arrow Array (string).

df.mapInPandas(lambda it: [pdf.assign(v=pdf["v"].apply(str)) for pdf in it], "id int, v double").show()
# was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
# now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
#      The above exception was the direct cause of the following exception:
#      ValueError: Exception thrown when converting pandas.Series (object) with name 'v' to Arrow Array (double).

with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
  df.mapInPandas(lambda it: [pdf.assign(v=pdf["v"].apply(str)) for pdf in it], "id int, v double").show()
# was: ValueError: Exception thrown when converting pandas.Series (object) to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
# now: ValueError: Exception thrown when converting pandas.Series (object) with name 'v' to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
```

### Why are the changes needed?
Existing errors are generic (`KeyError`) or meaningless (`'int' object is not iterable`). The errors should help users in spotting the mismatching columns by naming them.

The schema of the returned Pandas DataFrames can only be checked during processing the DataFrame, so such errors are very expensive. Therefore, they should be expressive.

### Does this PR introduce _any_ user-facing change?
This only changes error messages, not behaviour.

### How was this patch tested?
Tests all cases of schema mismatch for `DataFrame.mapInPandas`.

Closes #39952 from EnricoMi/branch-pyspark-map-in-pandas-schema-mismatch.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Aug 4, 2023
…InPandas for schema mismatch

### What changes were proposed in this pull request?
This merges #39952 into 3.5 branch.

Similar to #38223, improve the error messages when a Python method provided to `DataFrame.mapInPandas` returns a Pandas DataFrame that does not match the expected schema.

With
```Python
df = spark.range(2).withColumn("v", col("id"))
```

**Mismatching column names:**
```Python
df.mapInPandas(lambda it: it, "id long, val long").show()
# was: KeyError: 'val'
# now: RuntimeError: Column names of the returned pandas.DataFrame do not match specified schema.
#      Missing: val  Unexpected: v
```

**Python function not returning iterator:**
```Python
df.mapInPandas(lambda it: 1, "id long").show()
# was: TypeError: 'int' object is not iterable
# now: TypeError: Return type of the user-defined function should be iterator of pandas.DataFrame, but is <class 'int'>
```

**Python function not returning iterator of pandas.DataFrame:**
```Python
df.mapInPandas(lambda it: [1], "id long").show()
# was: TypeError: Return type of the user-defined function should be Pandas.DataFrame, but is <class 'int'>
# now: TypeError: Return type of the user-defined function should be iterator of pandas.DataFrame, but is iterator of <class 'int'>
# sometimes: ValueError: A field of type StructType expects a pandas.DataFrame, but got: <class 'list'>
# now: TypeError: Return type of the user-defined function should be iterator of pandas.DataFrame, but is iterator of <class 'list'>
```

**Mismatching types (ValueError and TypeError):**
```Python
df.mapInPandas(lambda it: it, "id int, v string").show()
# was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
# now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64
#      The above exception was the direct cause of the following exception:
#      TypeError: Exception thrown when converting pandas.Series (int64) with name 'v' to Arrow Array (string).

df.mapInPandas(lambda it: [pdf.assign(v=pdf["v"].apply(str)) for pdf in it], "id int, v double").show()
# was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
# now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried to convert to double
#      The above exception was the direct cause of the following exception:
#      ValueError: Exception thrown when converting pandas.Series (object) with name 'v' to Arrow Array (double).

with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}):
  df.mapInPandas(lambda it: [pdf.assign(v=pdf["v"].apply(str)) for pdf in it], "id int, v double").show()
# was: ValueError: Exception thrown when converting pandas.Series (object) to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
# now: ValueError: Exception thrown when converting pandas.Series (object) with name 'v' to Arrow Array (double).
#      It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled
#      by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.
```

### Why are the changes needed?
Existing errors are generic (`KeyError`) or meaningless (`'int' object is not iterable`). The errors should help users in spotting the mismatching columns by naming them.

The schema of the returned Pandas DataFrames can only be checked during processing the DataFrame, so such errors are very expensive. Therefore, they should be expressive.

### Does this PR introduce _any_ user-facing change?
This only changes error messages, not behaviour.

### How was this patch tested?
Tests all cases of schema mismatch for `DataFrame.mapInPandas`.

Closes #42316 from EnricoMi/branch-pyspark-map-in-pandas-schema-mismatch-3.5.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants