Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32714][PYTHON] Initial pyspark-stubs port. #29591

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
49174d7
Initial import
zero323 Aug 29, 2020
affe0e6
Add py.typed
zero323 Aug 31, 2020
b094d4d
Ignore errors in cloudpickle
zero323 Aug 31, 2020
fa946b6
Type ignore xmlrunner imports
zero323 Aug 31, 2020
3103b5e
Ignore numpy / pandas / scipy imports
zero323 Aug 31, 2020
7ee3a81
Ignore test modules imports
zero323 Aug 31, 2020
683c7d2
Type ignore SkipIf with Optional[str] message
zero323 Aug 31, 2020
c31274c
Type ignore kinesis_requirement_message
zero323 Aug 31, 2020
94ae3d8
Exclude py.typed from rat check
zero323 Aug 31, 2020
8a2a31e
Add license to mypy.ini
zero323 Aug 31, 2020
f12a37c
Use more precise imports for UserDefinedFunction
zero323 Aug 31, 2020
2e346fd
Type ignore test __UDT__
zero323 Aug 31, 2020
6e3c872
Resync with pyspark-stubs
zero323 Aug 31, 2020
f76f4a9
Ignore internal member imports from pyspark.sql.types
zero323 Aug 31, 2020
e918e72
Ignore internal member imports from pyspark.ml.wrapper
zero323 Aug 31, 2020
bc26621
Ignore internal member imports from pyspark.mllib.common
zero323 Aug 31, 2020
87aa2b5
Ignore internal member imports from pyspark.mllib.linalg
zero323 Aug 31, 2020
c5ff0dd
Type ignore Params._dummy
zero323 Aug 31, 2020
1ad399e
Drop cloudpickle.pyi
zero323 Aug 31, 2020
ac93d18
Address style failures
zero323 Sep 1, 2020
203b2a9
Exclude shared.pyi from flake tests
zero323 Sep 2, 2020
13be932
Resync with pyspark-stubs
zero323 Sep 2, 2020
e96414e
Resync with pyspark-stubs
zero323 Sep 3, 2020
3518bd3
Set per-file excludes
zero323 Sep 6, 2020
4ce7ae3
Adjust exclude pattern
zero323 Sep 6, 2020
c4fdda8
Patch examples
zero323 Sep 6, 2020
3a20820
Restore cloudpickle
zero323 Sep 6, 2020
fccc6dd
Explicitly exclude selected pyi files from flake8 tests
zero323 Sep 7, 2020
bdc8e84
Use less specific ignores in pyspark.testing
zero323 Sep 7, 2020
a587107
Use less specific ignores in pyspark.sql
zero323 Sep 7, 2020
7d6732e
Use less specific ignores in examples
zero323 Sep 7, 2020
c6ddfd7
Drop per-file-ignores
zero323 Sep 7, 2020
ded1dae
Drop explicit type hints from examples
zero323 Sep 7, 2020
6f48556
Package py.typed and *pyi
zero323 Sep 7, 2020
af4459f
Resync with pyspark-stubs
zero323 Sep 8, 2020
a8990ca
Resync with pyspark-stubs
zero323 Sep 8, 2020
afe5222
Resync with pyspark-stubs (drop long alias)
zero323 Sep 10, 2020
6aaef20
Resync with pyspark-stubs (drop 'stubs for' comments)
zero323 Sep 10, 2020
19bf189
Resync with pyspark-stubs (add allowMissingColumns to DataFrame.union…
zero323 Sep 14, 2020
601b99a
Resync with pyspark-stubs (drop unused hasSummary and add leafCol)
zero323 Sep 15, 2020
7d359a9
Resync with pyspark-stubs (drop __metaclass__ fields)
zero323 Sep 16, 2020
53107a1
Resync with pyspark-stubs (add Column.withField)
zero323 Sep 16, 2020
b9ac4f8
Drop unued typing.Type imports
zero323 Sep 16, 2020
fab00f1
Resync with pyspark-stubs (revert blockify gmm)
zero323 Sep 23, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,4 @@ GangliaReporter.java
application_1578436911597_0052
config.properties
app-20200706201101-0003
py.typed
2 changes: 1 addition & 1 deletion dev/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ exclude=python/pyspark/cloudpickle/*.py,shared.py,python/docs/source/conf.py,wor

[flake8]
select = E901,E999,F821,F822,F823,F401,F405
exclude = python/pyspark/cloudpickle/*.py,shared.py,python/docs/source/conf.py,work/*/*.py,python/.eggs/*,dist/*,.git/*
exclude = python/pyspark/cloudpickle/*.py,shared.py*,python/docs/source/conf.py,work/*/*.py,python/.eggs/*,dist/*,.git/*,python/out,python/pyspark/sql/pandas/functions.pyi,python/pyspark/sql/column.pyi,python/pyspark/worker.pyi,python/pyspark/java_gateway.pyi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with ignoring these for now, maybe we can re-enable them later on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would require mypy to be added in this PR As well. Otherwise we run the risk that the annotations might go out of sync quickly.

In general I see your point, however I wanted to avoid duplicating your work from #29180 and we cannot setup the environment anyway.

max-line-length = 100
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"} # Change output column name
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"} # type: ignore
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)
paramMapCombined.update(paramMap2) # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
Expand Down
6 changes: 3 additions & 3 deletions examples/src/main/python/ml/fm_classifier_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
print("Test set accuracy = %g" % accuracy)

fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors))
print("Linear: " + str(fmModel.linear))
print("Intercept: " + str(fmModel.intercept))
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
# $example off$

spark.stop()
6 changes: 3 additions & 3 deletions examples/src/main/python/ml/fm_regressor_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))
print("Linear: " + str(fmModel.linear))
print("Intercept: " + str(fmModel.intercept))
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
# $example off$

spark.stop()
8 changes: 6 additions & 2 deletions examples/src/main/python/ml/pipeline_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
rid, text, prob, prediction = row # type: ignore
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
)
)
# $example off$

spark.stop()
4 changes: 2 additions & 2 deletions examples/src/main/python/sql/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@


def dataframe_with_arrow_example(spark):
import numpy as np
import pandas as pd
import numpy as np # type: ignore[import]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Numpy has hinting as well, why ignore it? I got this working on zero323/pyspark-stubs#464

Copy link
Member Author

@zero323 zero323 Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because, AFAIK, released versions are not typed ‒ doesn't seem there were any released versions (nothing for 1.19.1 ‒https://github.com/numpy/numpy/tree/v1.19.1/numpy) since numpy/numpy@11b95d1.

import pandas as pd # type: ignore[import]

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Expand Down
1 change: 1 addition & 0 deletions python/MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ recursive-include deps/data *.data *.txt
recursive-include deps/licenses *.txt
recursive-include deps/examples *.py
recursive-include lib *.zip
recursive-include pyspark *.pyi py.typed
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note:

Is it possible to do it through package_data, i.e.

        package_data={
            '': ['*.pyi', 'py.typed'],
            ...

Doesn't seem to work here...

include README.md
36 changes: 36 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
;
; Licensed to the Apache Software Foundation (ASF) under one or more
; contributor license agreements. See the NOTICE file distributed with
; this work for additional information regarding copyright ownership.
; The ASF licenses this file to You under the Apache License, Version 2.0
; (the "License"); you may not use this file except in compliance with
; the License. You may obtain a copy of the License at
;
; http://www.apache.org/licenses/LICENSE-2.0
;
; Unless required by applicable law or agreed to in writing, software
; distributed under the License is distributed on an "AS IS" BASIS,
; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
; See the License for the specific language governing permissions and
; limitations under the License.
;

[mypy]

[mypy-pyspark.cloudpickle.*]
ignore_errors = True

[mypy-py4j.*]
ignore_missing_imports = True

[mypy-numpy]
ignore_missing_imports = True

[mypy-scipy.*]
ignore_missing_imports = True

[mypy-pandas.*]
ignore_missing_imports = True

[mypy-pyarrow]
ignore_missing_imports = True
73 changes: 73 additions & 0 deletions python/pyspark/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Optional, TypeVar

from pyspark.accumulators import ( # noqa: F401
Accumulator as Accumulator,
AccumulatorParam as AccumulatorParam,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question, why should we do as to the same import?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With MyPy can disable implicit reexport ‒ that's default configuration for typeshed.

If we replaced above with

from pyspark.accumulators import  Accumulator

we couldn't do

from pyspark import Accumulator

Technically speaking we could also provide __all__ in the stubs ‒ I just wasn't aware of that back when I started working on stubs.

)
from pyspark.broadcast import Broadcast as Broadcast # noqa: F401
from pyspark.conf import SparkConf as SparkConf # noqa: F401
from pyspark.context import SparkContext as SparkContext # noqa: F401
from pyspark.files import SparkFiles as SparkFiles # noqa: F401
from pyspark.status import (
StatusTracker as StatusTracker,
SparkJobInfo as SparkJobInfo,
SparkStageInfo as SparkStageInfo,
) # noqa: F401
from pyspark.profiler import ( # noqa: F401
BasicProfiler as BasicProfiler,
Profiler as Profiler,
)
from pyspark.rdd import RDD as RDD, RDDBarrier as RDDBarrier # noqa: F401
from pyspark.serializers import ( # noqa: F401
MarshalSerializer as MarshalSerializer,
PickleSerializer as PickleSerializer,
)
from pyspark.status import ( # noqa: F401
SparkJobInfo as SparkJobInfo,
SparkStageInfo as SparkStageInfo,
StatusTracker as StatusTracker,
)
from pyspark.storagelevel import StorageLevel as StorageLevel # noqa: F401
from pyspark.taskcontext import ( # noqa: F401
BarrierTaskContext as BarrierTaskContext,
BarrierTaskInfo as BarrierTaskInfo,
TaskContext as TaskContext,
)
from pyspark.util import InheritableThread as InheritableThread # noqa: F401

# Compatiblity imports
from pyspark.sql import ( # noqa: F401
SQLContext as SQLContext,
HiveContext as HiveContext,
Row as Row,
)

T = TypeVar("T")
F = TypeVar("F", bound=Callable)

def since(version: str) -> Callable[[T], T]: ...
def copy_func(
f: F,
name: Optional[str] = ...,
sinceversion: Optional[str] = ...,
doc: Optional[str] = ...,
) -> F: ...
def keyword_only(func: F) -> F: ...
27 changes: 27 additions & 0 deletions python/pyspark/_globals.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# NOTE: This dynamically typed stub was automatically generated by stubgen.

from typing import Any

__ALL__: Any

class _NoValueType:
def __new__(cls): ...
def __reduce__(self): ...
33 changes: 33 additions & 0 deletions python/pyspark/_typing.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Iterable, Sized, TypeVar, Union
from typing_extensions import Protocol

F = TypeVar("F", bound=Callable)
T = TypeVar("T", covariant=True)

PrimitiveType = Union[bool, float, int, str]

class SupportsIAdd(Protocol):
def __iadd__(self, other: SupportsIAdd) -> SupportsIAdd: ...

class SupportsOrdering(Protocol):
def __le__(self, other: SupportsOrdering) -> bool: ...

class SizedIterable(Protocol, Sized, Iterable[T]): ...
71 changes: 71 additions & 0 deletions python/pyspark/accumulators.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Generic, Tuple, Type, TypeVar

import socketserver.BaseRequestHandler # type: ignore

from pyspark._typing import SupportsIAdd

T = TypeVar("T")
U = TypeVar("U", bound=SupportsIAdd)

import socketserver as SocketServer

class Accumulator(Generic[T]):
aid: int
accum_param: AccumulatorParam[T]
def __init__(
self, aid: int, value: T, accum_param: AccumulatorParam[T]
) -> None: ...
def __reduce__(
self,
) -> Tuple[
Callable[[int, int, AccumulatorParam[T]], Accumulator[T]],
Tuple[int, int, AccumulatorParam[T]],
]: ...
@property
def value(self) -> T: ...
@value.setter
def value(self, value: T) -> None: ...
def add(self, term: T) -> None: ...
def __iadd__(self, term: T) -> Accumulator[T]: ...

class AccumulatorParam(Generic[T]):
def zero(self, value: T) -> T: ...
def addInPlace(self, value1: T, value2: T) -> T: ...

class AddingAccumulatorParam(AccumulatorParam[U]):
zero_value: U
def __init__(self, zero_value: U) -> None: ...
def zero(self, value: U) -> U: ...
def addInPlace(self, value1: U, value2: U) -> U: ...

class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
def handle(self) -> None: ...

class AccumulatorServer(SocketServer.TCPServer):
auth_token: str
def __init__(
self,
server_address: Tuple[str, int],
RequestHandlerClass: Type[socketserver.BaseRequestHandler],
auth_token: str,
) -> None: ...
server_shutdown: bool
def shutdown(self) -> None: ...
46 changes: 46 additions & 0 deletions python/pyspark/broadcast.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import threading
from typing import Any, Generic, Optional, TypeVar

T = TypeVar("T")

class Broadcast(Generic[T]):
def __init__(
self,
sc: Optional[Any] = ...,
value: Optional[T] = ...,
pickle_registry: Optional[Any] = ...,
path: Optional[Any] = ...,
sock_file: Optional[Any] = ...,
) -> None: ...
def dump(self, value: Any, f: Any) -> None: ...
def load_from_path(self, path: Any): ...
def load(self, file: Any): ...
@property
def value(self) -> T: ...
def unpersist(self, blocking: bool = ...) -> None: ...
def destroy(self, blocking: bool = ...) -> None: ...
def __reduce__(self): ...

class BroadcastPickleRegistry(threading.local):
def __init__(self) -> None: ...
def __iter__(self) -> None: ...
def add(self, bcast: Any) -> None: ...
def clear(self) -> None: ...
Loading