Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Typing][C-111] Add type annotations for python/paddle/base/executor.py #66996

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/paddle/base/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.
"""This is definition of dataset class, which is high performance IO."""

from __future__ import annotations

from google.protobuf import text_format

import paddle
Expand Down Expand Up @@ -41,7 +43,7 @@ def __init__(self):
"""Init."""
pass

def create_dataset(self, datafeed_class="QueueDataset"):
def create_dataset(self, datafeed_class="QueueDataset") -> DatasetBase:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafeed_class 忘记标了

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个返回值只是顺手加的,这个文件还没标注

"""
Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
the default is "QueueDataset".
Expand Down
154 changes: 110 additions & 44 deletions python/paddle/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import copy
import logging
import os
import sys
import warnings
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Literal, overload

import numpy as np

Expand Down Expand Up @@ -50,14 +53,25 @@
from .trainer_factory import FetchHandlerMonitor, TrainerFactory
from .wrapped_decorator import signature_safe_contextmanager

if TYPE_CHECKING:
from collections.abc import Generator

import numpy.typing as npt

from paddle import Tensor
from paddle._typing import PlaceLike
from paddle._typing.device_like import _Place
from paddle.base.dataset import DatasetBase
from paddle.static import CompiledProgram

__all__ = []

g_scope = core.Scope()
InferNativeConfig = core.NativeConfig
InferAnalysisConfig = core.AnalysisConfig


def global_scope():
def global_scope() -> core.Scope:
"""
:api_attr: Static Graph

Expand All @@ -79,15 +93,15 @@ def global_scope():
return g_scope


def _switch_scope(scope):
def _switch_scope(scope: core.Scope) -> core.Scope:
global g_scope
ex = g_scope
g_scope = scope
return ex


@signature_safe_contextmanager
def scope_guard(scope):
def scope_guard(scope: core.Scope) -> Generator[None, None, None]:
"""

This function switches scope through python `with` statement.
Expand Down Expand Up @@ -1260,7 +1274,9 @@ class Executor:

"""

def __init__(self, place=None):
place: _Place

def __init__(self, place: PlaceLike | None = None) -> None:
if place is None:
expected_place = framework._current_expected_place_()
self.place = expected_place
Expand Down Expand Up @@ -1301,7 +1317,7 @@ def _is_optimizer_op(self, op):
op.all_attrs()[self.op_role_key]
) & int(core.op_proto_and_checker_maker.OpRole.Optimize)

def __del__(self):
def __del__(self) -> None:
# NOTE(Ruibiao): The manually call of clear is required. Because in Python, executor_cache
# may not immediately destructed after Executor instance deleted (so does not the _StandaloneExecutor),
# that brings errors to mkl-dnn unit tests (see ClearMKLDNNCache in interpretercore.cc for why).
Expand Down Expand Up @@ -1616,7 +1632,7 @@ def _update_feed(cls, program, feed):
TODO(panyx0718): Why ParallelExecutor doesn't have close?
'''

def close(self):
def close(self) -> None:
"""
Close the executor. This interface is used for distributed training (PServers mode).
This executor can not be used after calling the interface, because
Expand All @@ -1643,7 +1659,7 @@ def close(self):
del trainer_instance
self._default_executor.close()

def flush(self):
def flush(self) -> None:
"""
flush all trainer param to root_scope
"""
Expand All @@ -1654,6 +1670,51 @@ def flush(self):
del trainer_instance
self.trainer_caches.clear()

@overload
def run(
self,
program: Program | CompiledProgram | None = ...,
feed: dict[str, npt.NDArray[Any]] | list[npt.NDArray[Any]] | None = ...,
fetch_list: list[str | Tensor] | None = ...,
feed_var_name: str = ...,
fetch_var_name: str = ...,
scope: core.Scope | None = ...,
return_numpy: Literal[True] = ...,
use_program_cache: bool = ...,
use_prune: bool = ...,
) -> list[npt.NDArray[Any]]:
...

@overload
def run(
self,
program: Program | CompiledProgram | None = ...,
feed: dict[str, npt.NDArray[Any]] | list[npt.NDArray[Any]] | None = ...,
fetch_list: list[str | Tensor] | None = ...,
feed_var_name: str = ...,
fetch_var_name: str = ...,
scope: core.Scope | None = ...,
return_numpy: Literal[False] = ...,
use_program_cache: bool = ...,
use_prune: bool = ...,
) -> list[Tensor]:
...

@overload
def run(
self,
program: Program | CompiledProgram | None = ...,
feed: dict[str, npt.NDArray[Any]] | list[npt.NDArray[Any]] | None = ...,
fetch_list: list[str | Tensor] | None = ...,
feed_var_name: str = ...,
fetch_var_name: str = ...,
scope: core.Scope | None = ...,
return_numpy: bool = ...,
use_program_cache: bool = ...,
use_prune: bool = ...,
) -> list[Tensor] | list[npt.NDArray[Any]]:
...

def run(
self,
program=None,
Expand Down Expand Up @@ -1741,8 +1802,10 @@ def run(
>>> exe.run(paddle.static.default_startup_program())

>>> x = numpy.random.random(size=(10, 1)).astype('float32')
>>> loss_val, array_val = exe.run(feed={'X': x},
... fetch_list=[loss.name, array.name])
>>> loss_val, array_val = exe.run(
... feed={'X': x},
... fetch_list=[loss.name, array.name] # type: ignore[union-attr]
... )
>>> print(array_val)
>>> # doctest: +SKIP("Random output")
[array(0.16870381, dtype=float32)]
Expand Down Expand Up @@ -1771,17 +1834,20 @@ def run(
>>> exe.run(paddle.static.default_startup_program())
>>> build_strategy = paddle.static.BuildStrategy()
>>> binary = paddle.static.CompiledProgram(
... paddle.static.default_main_program(), build_strategy=build_strategy)
... paddle.static.default_main_program(),
... build_strategy=build_strategy
... )
>>> batch_size = 6
>>> x = np.random.random(size=(batch_size, 1)).astype('float32')

>>> prediction, = exe.run(binary,
... feed={'X': x},
... fetch_list=[prediction.name])
>>> prediction, = exe.run(
... binary,
... feed={'X': x},
... fetch_list=[prediction.name]
... )
>>> # If the user uses two GPU cards to run this python code, the printed result will be
>>> # (6, class_dim). The first dimension value of the printed result is the batch_size.
>>> print("The prediction shape: {}".format(
... np.array(prediction).shape))
>>> print("The prediction shape: {}".format(np.array(prediction).shape))
The prediction shape: (6, 2)

>>> print(prediction)
Expand Down Expand Up @@ -2219,7 +2285,7 @@ def _adjust_pipeline_resource(self, pipeline_opt, dataset, pipeline_num):
dataset.set_thread(pipeline_opt["concurrency_list"][0] * pipeline_num)
return pipeline_num

def split_program_by_device(self, program):
def split_program_by_device(self, program: Program) -> list[int] | None:
ops_list = []
type_list = []
pre = None
Expand Down Expand Up @@ -3098,16 +3164,16 @@ def _run_pipeline(

def infer_from_dataset(
self,
program=None,
dataset=None,
scope=None,
thread=0,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
):
program: Program | CompiledProgram | None = None,
dataset: DatasetBase | None = None,
scope: core.Scope | None = None,
thread: int = 0,
debug: bool = False,
fetch_list: list[Tensor] | None = None,
fetch_info: list[str] | None = None,
print_period: int = 100,
fetch_handler: FetchHandler | None = None,
) -> None:
"""
Infer from a pre-defined Dataset. Dataset is defined in paddle.base.dataset.
Given a program, either a program or compiled program, infer_from_dataset will
Expand Down Expand Up @@ -3177,14 +3243,14 @@ def infer_from_dataset(

def start_heter_trainer(
self,
program=None,
scope=None,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
):
program: Program | None = None,
scope: core.Scope | None = None,
debug: bool = False,
fetch_list: list[Tensor] | None = None,
fetch_info: list[str] | None = None,
print_period: int = 100,
fetch_handler: FetchHandler | None = None,
) -> core.TrainerBase:
scope, trainer = self._prepare_trainer(
program=program,
dataset=None,
Expand Down Expand Up @@ -3221,16 +3287,16 @@ def start_heter_trainer(

def train_from_dataset(
self,
program=None,
dataset=None,
scope=None,
thread=0,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
):
program: Program | CompiledProgram | None = None,
dataset: DatasetBase | None = None,
scope: core.Scope | None = None,
thread: int = 0,
debug: bool = False,
fetch_list: list[Tensor] | None = None,
fetch_info: list[str] | None = None,
print_period: int = 100,
fetch_handler: FetchHandler | None = None,
) -> None:
"""
Train from a pre-defined Dataset. Dataset is defined in paddle.base.dataset.
Given a program, either a program or compiled program, train_from_dataset will
Expand Down