Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -343,18 +343,13 @@ jobs:
key: docs-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
docs-maven-
- name: Install Python 3.6
uses: actions/setup-python@v2
with:
python-version: 3.6
architecture: x64
- name: Install Python linter dependencies
run: |
# TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes.
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
# Jinja2 3.0.0+ causes error when building with Sphinx.
# See also https://issues.apache.org/jira/browse/SPARK-35375.
python3.6 -m pip install flake8 pydata_sphinx_theme 'mypy==0.910' numpydoc 'jinja2<3.0.0' 'black==21.5b2'
python3.9 -m pip install flake8 pydata_sphinx_theme 'mypy==0.910' numpydoc 'jinja2<3.0.0' 'black==21.5b2'
- name: Install R linter dependencies and SparkR
run: |
apt-get install -y libcurl4-openssl-dev libgit2-dev libssl-dev libxml2-dev
Expand All @@ -373,8 +368,8 @@ jobs:
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
# Jinja2 3.0.0+ causes error when building with Sphinx.
# See also https://issues.apache.org/jira/browse/SPARK-35375.
python3.6 -m pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0'
python3.6 -m pip install sphinx_plotly_directive 'pyarrow<5.0.0' pandas 'plotly>=4.8'
python3.9 -m pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx numpydoc 'jinja2<3.0.0'
python3.9 -m pip install sphinx_plotly_directive 'pyarrow<5.0.0' pandas 'plotly>=4.8'
apt-get update -y
apt-get install -y ruby ruby-dev
Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2'), repos='https://cloud.r-project.org/')"
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/ml/linalg/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class Matrix:
def __init__(
self, numRows: int, numCols: int, isTransposed: bool = ...
) -> None: ...
def toArray(self) -> NoReturn: ...
def toArray(self) -> ndarray: ...

class DenseMatrix(Matrix):
values: Any
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/_typing.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ from typing import List, Tuple, Union
from pyspark.mllib.linalg import Vector
from numpy import ndarray # noqa: F401

VectorLike = Union[Vector, List[float], Tuple[float, ...]]
VectorLike = Union[ndarray, Vector, List[float], Tuple[float, ...]]
2 changes: 1 addition & 1 deletion python/pyspark/mllib/classification.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class LogisticRegressionWithLBFGS:

class SVMModel(LinearClassificationModel):
def __init__(self, weights: Vector, intercept: float) -> None: ...
@overload
@overload # type: ignore
def predict(self, x: VectorLike) -> float64: ...
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't follow why it complains:

python/pyspark/mllib/classification.pyi:93: error: Signature of "predict" incompatible with supertype "LinearClassificationModel"

Presumably because of different variable name? we can't change for compatibility reason.

@overload
def predict(self, x: RDD[VectorLike]) -> RDD[float64]: ...
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/mllib/clustering.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ from pyspark.streaming.dstream import DStream
T = TypeVar("T")

class BisectingKMeansModel(JavaModelWrapper):
centers: List[ndarray]
centers: List[VectorLike]
def __init__(self, java_model: JavaObject) -> None: ...
@property
def clusterCenters(self) -> List[ndarray]: ...
Expand All @@ -62,7 +62,7 @@ class BisectingKMeans:
) -> BisectingKMeansModel: ...

class KMeansModel(Saveable, Loader[KMeansModel]):
centers: List[ndarray]
centers: List[VectorLike]
def __init__(self, centers: List[VectorLike]) -> None: ...
@property
def clusterCenters(self) -> List[ndarray]: ...
Expand Down Expand Up @@ -149,7 +149,7 @@ class StreamingKMeansModel(KMeansModel):
) -> None: ...
@property
def clusterWeights(self) -> List[float64]: ...
centers: ndarray
centers: List[VectorLike]
def update(
self, data: RDD[VectorLike], decayFactor: float, timeUnit: str
) -> StreamingKMeansModel: ...
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/pandas/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import pandas as pd
from pandas.api.types import CategoricalDtype
Expand Down Expand Up @@ -79,7 +79,7 @@ def categories(self) -> pd.Index:
>>> s.cat.categories
Index(['a', 'b', 'c'], dtype='object')
"""
return self._data.dtype.categories
return cast(CategoricalDtype, self._data.dtype).categories

@categories.setter
def categories(self, categories: pd.Index) -> None:
Expand All @@ -106,7 +106,7 @@ def ordered(self) -> bool:
>>> s.cat.ordered
False
"""
return self._data.dtype.ordered
return cast(CategoricalDtype, self._data.dtype).ordered

@property
def codes(self) -> "ps.Series":
Expand Down
16 changes: 10 additions & 6 deletions python/pyspark/pandas/data_type_ops/categorical_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

from itertools import chain
from typing import Any, Union
from typing import Any, Union, cast

import pandas as pd
from pandas.api.types import CategoricalDtype
Expand All @@ -41,8 +41,12 @@ def pretty_name(self) -> str:

def restore(self, col: pd.Series) -> pd.Series:
"""Restore column when to_pandas."""
return pd.Categorical.from_codes(
col, categories=self.dtype.categories, ordered=self.dtype.ordered
return pd.Series(
pd.Categorical.from_codes(
col,
categories=cast(CategoricalDtype, self.dtype).categories,
ordered=cast(CategoricalDtype, self.dtype).ordered,
)
)

def prepare(self, col: pd.Series) -> pd.Series:
Expand All @@ -52,10 +56,10 @@ def prepare(self, col: pd.Series) -> pd.Series:
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, _ = pandas_on_spark_type(dtype)

if isinstance(dtype, CategoricalDtype) and dtype.categories is None:
if isinstance(dtype, CategoricalDtype) and cast(CategoricalDtype, dtype).categories is None:
return index_ops.copy()

categories = index_ops.dtype.categories
categories = cast(CategoricalDtype, index_ops.dtype).categories
if len(categories) == 0:
scol = SF.lit(None)
else:
Expand Down Expand Up @@ -84,7 +88,7 @@ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:


def _non_equality_comparison_input_check(left: IndexOpsLike, right: Any) -> None:
if not left.dtype.ordered:
if not cast(CategoricalDtype, left.dtype).ordered:
raise TypeError("Unordered Categoricals can only compare equality or not.")
if isinstance(right, IndexOpsMixin) and isinstance(right.dtype, CategoricalDtype):
if hash(left.dtype) != hash(right.dtype):
Expand Down
10 changes: 7 additions & 3 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -4966,8 +4966,8 @@ def _assign(self, kwargs: Any) -> "DataFrame":

@staticmethod
def from_records(
data: Union[np.array, List[tuple], dict, pd.DataFrame],
index: Union[str, list, np.array] = None,
data: Union[np.ndarray, List[tuple], dict, pd.DataFrame],
index: Union[str, list, np.ndarray] = None,
exclude: list = None,
columns: list = None,
coerce_float: bool = False,
Expand Down Expand Up @@ -7399,7 +7399,11 @@ def isin(self, values: Union[List, Dict]) -> "DataFrame":
SF.lit(False).alias(self._internal.data_spark_column_names[i])
)
elif is_list_like(values):
values = values.tolist() if isinstance(values, np.ndarray) else list(values)
values = (
cast(np.ndarray, values).tolist()
if isinstance(values, np.ndarray)
else list(values)
)
data_spark_columns += [
self._internal.spark_column_for(label)
.isin(values)
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/pandas/indexes/category.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.
#
from functools import partial
from typing import Any, no_type_check
from typing import Any, no_type_check, cast

import pandas as pd
from pandas.api.types import is_hashable
from pandas.api.types import is_hashable, CategoricalDtype

from pyspark import pandas as ps
from pyspark.pandas.indexes.base import Index
Expand Down Expand Up @@ -156,7 +156,7 @@ def categories(self) -> pd.Index:
>>> idx.categories
Index(['a', 'b', 'c'], dtype='object')
"""
return self.dtype.categories
return cast(CategoricalDtype, self.dtype).categories

@categories.setter
def categories(self, categories: pd.Index) -> None:
Expand All @@ -177,7 +177,7 @@ def ordered(self) -> bool:
>>> idx.ordered
False
"""
return self.dtype.ordered
return cast(CategoricalDtype, self.dtype).ordered

def __getattr__(self, item: str) -> Any:
if hasattr(MissingPandasLikeCategoricalIndex, item):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/pandas/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,14 +1646,14 @@ def _select_rows_by_iterable(
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
sdf = self._internal.spark_frame

if any(isinstance(key, (int, np.int, np.int64, np.int32)) and key < 0 for key in rows_sel):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.int is now same is Python built-in int

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.float == float, etc. too

if any(isinstance(key, (int, np.int64, np.int32)) and key < 0 for key in rows_sel):
offset = sdf.count()
else:
offset = 0

new_rows_sel = []
for key in list(rows_sel):
if not isinstance(key, (int, np.int, np.int64, np.int32)):
if not isinstance(key, (int, np.int64, np.int32)):
raise TypeError(
"cannot do positional indexing with these indexers [{}] of {}".format(
key, type(key)
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def to_numeric_df(psdf: "ps.DataFrame") -> Tuple[pyspark.sql.DataFrame, List[Lab
"""
# TODO, it should be more robust.
accepted_types = {
np.dtype(dt)
np.dtype(dt) # type: ignore
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out how to fix:

python/pyspark/pandas/ml.py:81: error: Value of type variable "_DTypeScalar_co" of "dtype" cannot be "object"

for dt in [np.int8, np.int16, np.int32, np.int64, np.float32, np.float64, np.bool_]
}
numeric_column_labels = [
Expand Down
8 changes: 5 additions & 3 deletions python/pyspark/pandas/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import numpy as np
from typing import Any

from pyspark.pandas._typing import Label # noqa: F401 (SPARK-34943)
from pyspark.pandas._typing import Label, Dtype # noqa: F401 (SPARK-34943)
from pyspark.pandas.utils import lazy_property, default_session
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
Expand All @@ -42,7 +42,7 @@ class PythonModelWrapper(object):

"""

def __init__(self, model_uri: str, return_type_hint: str):
def __init__(self, model_uri: str, return_type_hint: Union[str, type, Dtype]):
self._model_uri = model_uri
self._return_type_hint = return_type_hint

Expand Down Expand Up @@ -109,7 +109,9 @@ def predict(self, data: Union[DataFrame, pd.DataFrame]) -> Union[Series, pd.Seri
raise ValueError("unknown data type: {}".format(type(data).__name__))


def load_model(model_uri: str, predict_type: str = "infer") -> PythonModelWrapper:
def load_model(
model_uri: str, predict_type: Union[str, type, Dtype] = "infer"
) -> PythonModelWrapper:
"""
Loads an MLflow model into an wrapper that can be used both for pandas and pandas-on-Spark
DataFrame.
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,11 @@ def get_dummies(
if drop_first:
values = values[1:]

def column_name(value: str) -> str:
def column_name(v: Any) -> Name:
if prefix is None or cast(List[str], prefix)[i] == "":
return value
return v
else:
return "{}{}{}".format(cast(List[str], prefix)[i], prefix_sep, value)
return "{}{}{}".format(cast(List[str], prefix)[i], prefix_sep, v)

for value in values:
remaining_columns.append(
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,8 +1027,9 @@ def map(self, arg: Union[Dict, Callable]) -> "Series":
current = current.when(self.spark.column == SF.lit(to_replace), value)

if hasattr(arg, "__missing__"):
tmp_val = arg[np._NoValue]
del arg[np._NoValue] # Remove in case it's set in defaultdict.
tmp_val = arg[np._NoValue] # type: ignore
# Remove in case it's set in defaultdict.
del arg[np._NoValue] # type: ignore
current = current.otherwise(SF.lit(tmp_val))
else:
current = current.otherwise(SF.lit(None).cast(self.spark.data_type))
Expand Down
22 changes: 11 additions & 11 deletions python/pyspark/pandas/tests/test_typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,22 @@ def func() -> pd.Series[int]:
self.assertEqual(inferred.dtype, np.int64)
self.assertEqual(inferred.spark_type, LongType())

def func() -> pd.Series[np.float]:
def func() -> pd.Series[float]:
pass

inferred = infer_return_type(func)
self.assertEqual(inferred.dtype, np.float64)
self.assertEqual(inferred.spark_type, DoubleType())

def func() -> "pd.DataFrame[np.float, str]":
def func() -> "pd.DataFrame[np.float_, str]":
pass

expected = StructType([StructField("c0", DoubleType()), StructField("c1", StringType())])
inferred = infer_return_type(func)
self.assertEqual(inferred.dtypes, [np.float64, np.unicode_])
self.assertEqual(inferred.spark_type, expected)

def func() -> "pandas.DataFrame[np.float]":
def func() -> "pandas.DataFrame[float]":
pass

expected = StructType([StructField("c0", DoubleType())])
Expand All @@ -97,15 +97,15 @@ def func() -> "pd.Series[int]":
self.assertEqual(inferred.dtype, np.int64)
self.assertEqual(inferred.spark_type, LongType())

def func() -> pd.DataFrame[np.float, str]:
def func() -> pd.DataFrame[np.float64, str]:
pass

expected = StructType([StructField("c0", DoubleType()), StructField("c1", StringType())])
inferred = infer_return_type(func)
self.assertEqual(inferred.dtypes, [np.float64, np.unicode_])
self.assertEqual(inferred.spark_type, expected)

def func() -> pd.DataFrame[np.float]:
def func() -> pd.DataFrame[np.float_]:
pass

expected = StructType([StructField("c0", DoubleType())])
Expand Down Expand Up @@ -152,15 +152,15 @@ def test_if_pandas_implements_class_getitem(self):
"Type inference from pandas instances is supported with Python 3.7+",
)
def test_infer_schema_with_names_pandas_instances(self):
def func() -> 'pd.DataFrame["a" : np.float, "b":str]': # noqa: F405
def func() -> 'pd.DataFrame["a" : np.float_, "b":str]': # noqa: F405
pass

expected = StructType([StructField("a", DoubleType()), StructField("b", StringType())])
inferred = infer_return_type(func)
self.assertEqual(inferred.dtypes, [np.float64, np.unicode_])
self.assertEqual(inferred.spark_type, expected)

def func() -> "pd.DataFrame['a': np.float, 'b': int]": # noqa: F405
def func() -> "pd.DataFrame['a': float, 'b': int]": # noqa: F405
pass

expected = StructType([StructField("a", DoubleType()), StructField("b", LongType())])
Expand Down Expand Up @@ -206,7 +206,7 @@ def func() -> pd.DataFrame[zip(pdf.columns, pdf.dtypes)]:
)
def test_infer_schema_with_names_pandas_instances_negative(self):
def try_infer_return_type():
def f() -> 'pd.DataFrame["a" : np.float : 1, "b":str:2]': # noqa: F405
def f() -> 'pd.DataFrame["a" : np.float_ : 1, "b":str:2]': # noqa: F405
pass

infer_return_type(f)
Expand All @@ -225,7 +225,7 @@ def f() -> pd.DataFrame[A]:
self.assertRaisesRegex(TypeError, "not understood", try_infer_return_type)

def try_infer_return_type():
def f() -> 'pd.DataFrame["a" : np.float : 1, "b":str:2]': # noqa: F405
def f() -> 'pd.DataFrame["a" : float : 1, "b":str:2]': # noqa: F405
pass

infer_return_type(f)
Expand Down Expand Up @@ -253,7 +253,7 @@ def f() -> pd.Series[pdf.a.dtype]: # type: ignore

def test_infer_schema_with_names_negative(self):
def try_infer_return_type():
def f() -> 'ps.DataFrame["a" : np.float : 1, "b":str:2]': # noqa: F405
def f() -> 'ps.DataFrame["a" : float : 1, "b":str:2]': # noqa: F405
pass

infer_return_type(f)
Expand All @@ -272,7 +272,7 @@ def f() -> ps.DataFrame[A]:
self.assertRaisesRegex(TypeError, "not understood", try_infer_return_type)

def try_infer_return_type():
def f() -> 'ps.DataFrame["a" : np.float : 1, "b":str:2]': # noqa: F405
def f() -> 'ps.DataFrame["a" : np.float_ : 1, "b":str:2]': # noqa: F405
pass

infer_return_type(f)
Expand Down
Loading