Skip to content

Commit

Permalink
[SPARK-44289][SPARK-43874][SPARK-43869][SPARK-43607][PS] Support `ind…
Browse files Browse the repository at this point in the history
…exer_between_time` for pandas 2.0.0 & enabling more tests

### What changes were proposed in this pull request?

This PR proposes to support `DatetimeIndex.indexer_between_time` to support pandas 2.0.0 and above. See pandas-dev/pandas#43248 for more detail.

This PR also enables bunch of tests for `Series`, `Index` and `GroupBy`.

### Why are the changes needed?

To match the behavior with latest pandas.

### Does this PR introduce _any_ user-facing change?

`DatetimeIndex.indexer_between_time` now has the same behavior with the latest pandas.

### How was this patch tested?

Enabling & updating the existing UTs and doctests.

Closes apache#42533 from itholic/enable-many-tests.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
itholic authored and vpolet committed Aug 24, 2023
1 parent a1c5f58 commit 14fadcd
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 142 deletions.
7 changes: 4 additions & 3 deletions python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4165,6 +4165,7 @@ def value_counts(
Examples
--------
>>> import numpy as np
>>> df = ps.DataFrame({'A': [1, 2, 2, 3, 3, 3],
... 'B': [1, 1, 2, 3, 3, np.nan]},
... columns=['A', 'B'])
Expand All @@ -4183,7 +4184,7 @@ def value_counts(
2 1.0 1
2.0 1
3 3.0 2
Name: B, dtype: int64
Name: count, dtype: int64
Don't include counts of NaN when dropna is False.
Expand All @@ -4195,7 +4196,7 @@ def value_counts(
2.0 1
3 3.0 2
NaN 1
Name: B, dtype: int64
Name: count, dtype: int64
"""
warnings.warn(
"The resulting Series will have a fixed name of 'count' from 4.0.0.",
Expand Down Expand Up @@ -4232,7 +4233,7 @@ def value_counts(
psser._internal.data_fields[0].copy(name=name)
for psser, name in zip(groupkeys, groupkey_names)
],
column_labels=[self._agg_columns[0]._column_label],
column_labels=[("count",)],
data_spark_columns=[scol_for(sdf, agg_column)],
)
return first_series(DataFrame(internal))
Expand Down
20 changes: 14 additions & 6 deletions python/pyspark/pandas/indexes/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,24 +730,32 @@ def indexer_between_time(
Examples
--------
>>> psidx = ps.date_range("2000-01-01", periods=3, freq="T") # doctest: +SKIP
>>> psidx # doctest: +SKIP
>>> psidx = ps.date_range("2000-01-01", periods=3, freq="T")
>>> psidx
DatetimeIndex(['2000-01-01 00:00:00', '2000-01-01 00:01:00',
'2000-01-01 00:02:00'],
dtype='datetime64[ns]', freq=None)
>>> psidx.indexer_between_time("00:01", "00:02").sort_values() # doctest: +SKIP
>>> psidx.indexer_between_time("00:01", "00:02").sort_values()
Index([1, 2], dtype='int64')
>>> psidx.indexer_between_time("00:01", "00:02", include_end=False) # doctest: +SKIP
>>> psidx.indexer_between_time("00:01", "00:02", include_end=False)
Index([1], dtype='int64')
>>> psidx.indexer_between_time("00:01", "00:02", include_start=False) # doctest: +SKIP
>>> psidx.indexer_between_time("00:01", "00:02", include_start=False)
Index([2], dtype='int64')
"""

def pandas_between_time(pdf) -> ps.DataFrame[int]: # type: ignore[no-untyped-def]
return pdf.between_time(start_time, end_time, include_start, include_end)
if include_start and include_end:
inclusive = "both"
elif not include_start and not include_end:
inclusive = "neither"
elif include_start and not include_end:
inclusive = "left"
elif not include_start and include_end:
inclusive = "right"
return pdf.between_time(start_time, end_time, inclusive=inclusive)

psdf = self.to_frame()[[]]
id_column_name = verify_temp_column_name(psdf, "__id_column__")
Expand Down
51 changes: 9 additions & 42 deletions python/pyspark/pandas/tests/computation/test_cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@


class FrameCovMixin:
@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43809): Enable DataFrameSlowTests.test_cov for pandas 2.0.0.",
)
def test_cov(self):
# SPARK-36396: Implement DataFrame.cov

Expand Down Expand Up @@ -66,12 +62,8 @@ def test_cov(self):
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))

# extension dtype
if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", "Float64", "float"]
boolean_dtypes = ["boolean", "bool"]
else:
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "float"]
boolean_dtypes = ["boolean", "bool"]
numeric_dtypes = ["Int8", "Int16", "Int32", "Int64", "Float32", "Float64", "float"]
boolean_dtypes = ["boolean", "bool"]

sers = [pd.Series([1, 2, 3, None], dtype=dtype) for dtype in numeric_dtypes]
sers += [pd.Series([True, False, True, None], dtype=dtype) for dtype in boolean_dtypes]
Expand All @@ -81,44 +73,19 @@ def test_cov(self):
pdf.columns = [dtype for dtype in numeric_dtypes + boolean_dtypes] + ["decimal"]
psdf = ps.from_pandas(pdf)

if LooseVersion(pd.__version__) >= LooseVersion("1.2"):
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=3), psdf.cov(min_periods=3), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4))
else:
test_types = [
"Int8",
"Int16",
"Int32",
"Int64",
"float",
"boolean",
"bool",
]
expected = pd.DataFrame(
data=[
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[1.0, 1.0, 1.0, 1.0, 1.0, 0.0000000, 0.0000000],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.3333333, 0.3333333],
[0.0, 0.0, 0.0, 0.0, 0.0, 0.3333333, 0.3333333],
],
index=test_types,
columns=test_types,
)
self.assert_eq(expected, psdf.cov(), almost=True)
self.assert_eq(pdf.cov(numeric_only=True), psdf.cov(), almost=True)

# string column
pdf = pd.DataFrame(
[(1, 2, "a", 1), (0, 3, "b", 1), (2, 0, "c", 9), (1, 1, "d", 1)],
columns=["a", "b", "c", "d"],
)
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov(), almost=True)
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
self.assert_eq(pdf.cov(numeric_only=True), psdf.cov(), almost=True)
self.assert_eq(
pdf.cov(numeric_only=True, min_periods=4), psdf.cov(min_periods=4), almost=True
)
self.assert_eq(pdf.cov(numeric_only=True, min_periods=5), psdf.cov(min_periods=5))

# nan
np.random.seed(42)
Expand All @@ -132,7 +99,7 @@ def test_cov(self):
# return empty DataFrame
pdf = pd.DataFrame([("1", "2"), ("0", "3"), ("2", "0"), ("1", "1")], columns=["a", "b"])
psdf = ps.from_pandas(pdf)
self.assert_eq(pdf.cov(), psdf.cov())
self.assert_eq(pdf.cov(numeric_only=True), psdf.cov())


class FrameCovTests(FrameCovMixin, ComparisonTestBase, SQLTestUtils):
Expand Down
20 changes: 8 additions & 12 deletions python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ def test_add(self):
for psser in self.pssers:
self.assertRaises(TypeError, lambda: self.psser + psser)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43571): Enable DateOpsTests.test_sub for pandas 2.0.0.",
)
def test_sub(self):
self.assertRaises(TypeError, lambda: self.psser - "x")
self.assertRaises(TypeError, lambda: self.psser - 1)
self.assert_eq(
(self.pser - self.some_date).dt.days,
(self.pser - self.some_date).apply(lambda x: x.days),
self.psser - self.some_date,
)
pdf, psdf = self.pdf, self.psdf
for col in self.df_cols:
if col == "date":
self.assert_eq((pdf["date"] - pdf[col]).dt.days, psdf["date"] - psdf[col])
self.assert_eq(
(pdf["date"] - pdf[col]).apply(lambda x: x.days), psdf["date"] - psdf[col]
)
else:
self.assertRaises(TypeError, lambda: psdf["date"] - psdf[col])
pdf, psdf = self.date_pdf, self.date_psdf
self.assert_eq((pdf["this"] - pdf["that"]).dt.days, psdf["this"] - psdf["that"])
self.assert_eq(
(pdf["this"] - pdf["that"]).apply(lambda x: x.days), psdf["this"] - psdf["that"]
)

def test_mul(self):
self.assertRaises(TypeError, lambda: self.psser * "x")
Expand Down Expand Up @@ -128,15 +128,11 @@ def test_radd(self):
self.assertRaises(TypeError, lambda: 1 + self.psser)
self.assertRaises(TypeError, lambda: self.some_date + self.psser)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43570): Enable DateOpsTests.test_rsub for pandas 2.0.0.",
)
def test_rsub(self):
self.assertRaises(TypeError, lambda: "x" - self.psser)
self.assertRaises(TypeError, lambda: 1 - self.psser)
self.assert_eq(
(self.some_date - self.pser).dt.days,
(self.some_date - self.pser).apply(lambda x: x.days),
self.some_date - self.psser,
)

Expand Down
12 changes: 2 additions & 10 deletions python/pyspark/pandas/tests/groupby/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ def pdf(self):
def psdf(self):
return ps.from_pandas(self.pdf)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-44289): Enable GroupbyAggregateTests.test_aggregate for pandas 2.0.0.",
)
def test_aggregate(self):
pdf = pd.DataFrame(
{"A": [1, 1, 2, 2], "B": [1, 2, 3, 4], "C": [0.362, 0.227, 1.267, -0.562]}
Expand Down Expand Up @@ -173,12 +169,8 @@ def sort(df):
stats_psdf = psdf.groupby(10).agg({20: ["min", "max"], 30: "sum"})
stats_pdf = pdf.groupby(10).agg({20: ["min", "max"], 30: "sum"})
self.assert_eq(
stats_psdf.sort_values(by=[(20, "min"), (20, "max"), (30, "sum")]).reset_index(
drop=True
),
stats_pdf.sort_values(by=[(20, "min"), (20, "max"), (30, "sum")]).reset_index(
drop=True
),
stats_psdf.reset_index(drop=True),
stats_pdf.reset_index(drop=True),
)

def test_aggregate_func_str_list(self):
Expand Down
11 changes: 3 additions & 8 deletions python/pyspark/pandas/tests/groupby/test_apply_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def pdf(self):
def psdf(self):
return ps.from_pandas(self.pdf)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43708): Enable GroupByTests.test_apply " "for pandas 2.0.0.",
)
def test_apply(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
Expand Down Expand Up @@ -87,14 +83,17 @@ def test_apply(self):
self.assert_eq(
psdf.groupby(psdf.b // 5).apply(lambda x: x + x.min()).sort_index(),
pdf.groupby(pdf.b // 5).apply(lambda x: x + x.min()).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby(psdf.b // 5)["a"].apply(lambda x: x + x.min()).sort_index(),
pdf.groupby(pdf.b // 5)["a"].apply(lambda x: x + x.min()).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby(psdf.b // 5)[["a"]].apply(lambda x: x + x.min()).sort_index(),
pdf.groupby(pdf.b // 5)[["a"]].apply(lambda x: x + x.min()).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby(psdf.b // 5)[["a"]].apply(len).sort_index(),
Expand Down Expand Up @@ -139,10 +138,6 @@ def test_apply(self):
pdf.groupby([("x", "a"), ("x", "b")]).apply(len).sort_index(),
)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43706): Enable GroupByTests.test_apply_without_shortcut " "for pandas 2.0.0.",
)
def test_apply_without_shortcut(self):
with option_context("compute.shortcut_limit", 0):
self.test_apply()
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/pandas/tests/groupby/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,6 @@ def test_unique(self):
for act, exp in zip(actual, expect):
self.assertTrue(sorted(act) == sorted(exp))

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43444): Enable GroupBySlowTests.test_value_counts for pandas 2.0.0.",
)
def test_value_counts(self):
pdf = pd.DataFrame(
{"A": [np.nan, 2, 2, 3, 3, 3], "B": [1, 1, 2, 3, 3, np.nan]}, columns=["A", "B"]
Expand All @@ -785,6 +781,7 @@ def test_value_counts(self):
self.assert_eq(
psdf.groupby("A")["B"].value_counts(dropna=False).sort_index(),
pdf.groupby("A")["B"].value_counts(dropna=False).sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby("A", dropna=False)["B"].value_counts(dropna=False).sort_index(),
Expand All @@ -804,6 +801,7 @@ def test_value_counts(self):
pdf.groupby("A")["B"]
.value_counts(sort=True, ascending=False, dropna=False)
.sort_index(),
almost=True,
)
self.assert_eq(
psdf.groupby("A")["B"]
Expand All @@ -812,6 +810,7 @@ def test_value_counts(self):
pdf.groupby("A")["B"]
.value_counts(sort=True, ascending=True, dropna=False)
.sort_index(),
almost=True,
)
self.assert_eq(
psdf.B.rename().groupby(psdf.A).value_counts().sort_index(),
Expand Down
24 changes: 0 additions & 24 deletions python/pyspark/pandas/tests/indexes/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,10 +1577,6 @@ def test_asof(self):
psmidx = ps.MultiIndex.from_tuples([("a", "a"), ("a", "b"), ("a", "c")])
self.assertRaises(NotImplementedError, lambda: psmidx.asof(("a", "b")))

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43608): Enable IndexesTests.test_union for pandas 2.0.0.",
)
def test_union(self):
# Index
pidx1 = pd.Index([1, 2, 3, 4])
Expand All @@ -1593,13 +1589,6 @@ def test_union(self):
self.assert_eq(psidx1.union(psidx2), pidx1.union(pidx2))
self.assert_eq(psidx2.union(psidx1), pidx2.union(pidx1))
self.assert_eq(psidx1.union(psidx3), pidx1.union(pidx3))
# Deprecated case, but adding to track if pandas stop supporting union
# as a set operation. It should work fine until stop supporting anyway.
# No longer supported from pandas 2.0.0.
if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
self.assert_eq(psidx1 | psidx2, ps.Index([3, 4], dtype="int64"))
else:
self.assert_eq(pidx1 | pidx2, psidx1 | psidx2)

self.assert_eq(psidx1.union([3, 4, 5, 6]), pidx1.union([3, 4, 5, 6]), almost=True)
self.assert_eq(psidx2.union([1, 2, 3, 4]), pidx2.union([1, 2, 3, 4]), almost=True)
Expand Down Expand Up @@ -1904,10 +1893,6 @@ def test_hasnans(self):
psmidx = ps.Index([("a", 1), ("b", 2)])
self.assertRaises(NotImplementedError, lambda: psmidx.hasnans())

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43607): Enable IndexesTests.test_intersection for pandas 2.0.0.",
)
def test_intersection(self):
pidx = pd.Index([1, 2, 3, 4], name="Koalas")
psidx = ps.from_pandas(pidx)
Expand All @@ -1919,15 +1904,6 @@ def test_intersection(self):
self.assert_eq(
(pidx + 1).intersection(pidx_other), (psidx + 1).intersection(psidx_other).sort_values()
)
# Deprecated case, but adding to track if pandas stop supporting intersection
# as a set operation. It should work fine until stop supporting anyway.
# No longer supported from pandas 2.0.0.
if LooseVersion(pd.__version__) >= LooseVersion("2.0.0"):
self.assert_eq(
(psidx & psidx_other).sort_values(), ps.Index([3, 1, 7, 1], dtype="int64")
)
else:
self.assert_eq(pidx & pidx_other, (psidx & psidx_other).sort_values())

pidx_other_different_name = pd.Index([3, 4, 5, 6], name="Databricks")
psidx_other_different_name = ps.from_pandas(pidx_other_different_name)
Expand Down
5 changes: 0 additions & 5 deletions python/pyspark/pandas/tests/indexes/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ def test_strftime(self):
psidx.strftime(date_format="%B %d, %Y"), pidx.strftime(date_format="%B %d, %Y")
)

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43644): Enable DatetimeIndexTests.test_indexer_between_time "
"for pandas 2.0.0.",
)
def test_indexer_between_time(self):
for psidx, pidx in self.idx_pairs:
self.assert_eq(
Expand Down
9 changes: 4 additions & 5 deletions python/pyspark/pandas/tests/indexes/test_reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ def df_pair(self):
psdf = ps.from_pandas(pdf)
return pdf, psdf

@unittest.skipIf(
LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
"TODO(SPARK-43811): Enable DataFrameTests.test_reindex for pandas 2.0.0.",
)
def test_reindex(self):
index = pd.Index(["A", "B", "C", "D", "E"])
columns = pd.Index(["numbers"])
Expand All @@ -64,9 +60,12 @@ def test_reindex(self):
psdf.reindex(["A", "B", "C"], columns=["numbers", "2", "3"]).sort_index(),
)

# We manually test this due to the bug in pandas.
expected_result = ps.DataFrame([1.0, 2.0, 3.0], index=ps.Index(["A", "B", "C"]))
expected_result.columns = pd.Index(["numbers"], name="cols")
self.assert_eq(
pdf.reindex(["A", "B", "C"], index=["numbers", "2", "3"]).sort_index(),
psdf.reindex(["A", "B", "C"], index=["numbers", "2", "3"]).sort_index(),
expected_result,
)

self.assert_eq(
Expand Down
Loading

0 comments on commit 14fadcd

Please sign in to comment.