Skip to content

Commit

Permalink
[Typing][C-111] Add type annotations for `python/paddle/base/executor…
Browse files Browse the repository at this point in the history
….py` (#66996)
  • Loading branch information
SigureMo authored Aug 5, 2024
1 parent 2374be6 commit ba16a30
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 45 deletions.
4 changes: 3 additions & 1 deletion python/paddle/base/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.
"""This is definition of dataset class, which is high performance IO."""

from __future__ import annotations

from google.protobuf import text_format

import paddle
Expand Down Expand Up @@ -41,7 +43,7 @@ def __init__(self):
"""Init."""
pass

def create_dataset(self, datafeed_class="QueueDataset"):
def create_dataset(self, datafeed_class="QueueDataset") -> DatasetBase:
"""
Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
the default is "QueueDataset".
Expand Down
154 changes: 110 additions & 44 deletions python/paddle/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import copy
import logging
import os
import sys
import warnings
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Literal, overload

import numpy as np

Expand Down Expand Up @@ -50,14 +53,25 @@
from .trainer_factory import FetchHandlerMonitor, TrainerFactory
from .wrapped_decorator import signature_safe_contextmanager

if TYPE_CHECKING:
from collections.abc import Generator

import numpy.typing as npt

from paddle import Tensor
from paddle._typing import PlaceLike
from paddle._typing.device_like import _Place
from paddle.base.dataset import DatasetBase
from paddle.static import CompiledProgram

__all__ = []

g_scope = core.Scope()
InferNativeConfig = core.NativeConfig
InferAnalysisConfig = core.AnalysisConfig


def global_scope():
def global_scope() -> core.Scope:
"""
:api_attr: Static Graph
Expand All @@ -79,15 +93,15 @@ def global_scope():
return g_scope


def _switch_scope(scope):
def _switch_scope(scope: core.Scope) -> core.Scope:
global g_scope
ex = g_scope
g_scope = scope
return ex


@signature_safe_contextmanager
def scope_guard(scope):
def scope_guard(scope: core.Scope) -> Generator[None, None, None]:
"""
This function switches scope through python `with` statement.
Expand Down Expand Up @@ -1272,7 +1286,9 @@ class Executor:
"""

def __init__(self, place=None):
place: _Place

def __init__(self, place: PlaceLike | None = None) -> None:
if place is None:
expected_place = framework._current_expected_place_()
self.place = expected_place
Expand Down Expand Up @@ -1313,7 +1329,7 @@ def _is_optimizer_op(self, op):
op.all_attrs()[self.op_role_key]
) & int(core.op_proto_and_checker_maker.OpRole.Optimize)

def __del__(self):
def __del__(self) -> None:
# NOTE(Ruibiao): The manually call of clear is required. Because in Python, executor_cache
# may not immediately destructed after Executor instance deleted (so does not the _StandaloneExecutor),
# that brings errors to mkl-dnn unit tests (see ClearMKLDNNCache in interpretercore.cc for why).
Expand Down Expand Up @@ -1628,7 +1644,7 @@ def _update_feed(cls, program, feed):
TODO(panyx0718): Why ParallelExecutor doesn't have close?
'''

def close(self):
def close(self) -> None:
"""
Close the executor. This interface is used for distributed training (PServers mode).
This executor can not be used after calling the interface, because
Expand All @@ -1655,7 +1671,7 @@ def close(self):
del trainer_instance
self._default_executor.close()

def flush(self):
def flush(self) -> None:
"""
flush all trainer param to root_scope
"""
Expand All @@ -1666,6 +1682,51 @@ def flush(self):
del trainer_instance
self.trainer_caches.clear()

@overload
def run(
self,
program: Program | CompiledProgram | None = ...,
feed: dict[str, npt.NDArray[Any]] | list[npt.NDArray[Any]] | None = ...,
fetch_list: list[str | Tensor] | None = ...,
feed_var_name: str = ...,
fetch_var_name: str = ...,
scope: core.Scope | None = ...,
return_numpy: Literal[True] = ...,
use_program_cache: bool = ...,
use_prune: bool = ...,
) -> list[npt.NDArray[Any]]:
...

@overload
def run(
self,
program: Program | CompiledProgram | None = ...,
feed: dict[str, npt.NDArray[Any]] | list[npt.NDArray[Any]] | None = ...,
fetch_list: list[str | Tensor] | None = ...,
feed_var_name: str = ...,
fetch_var_name: str = ...,
scope: core.Scope | None = ...,
return_numpy: Literal[False] = ...,
use_program_cache: bool = ...,
use_prune: bool = ...,
) -> list[Tensor]:
...

@overload
def run(
self,
program: Program | CompiledProgram | None = ...,
feed: dict[str, npt.NDArray[Any]] | list[npt.NDArray[Any]] | None = ...,
fetch_list: list[str | Tensor] | None = ...,
feed_var_name: str = ...,
fetch_var_name: str = ...,
scope: core.Scope | None = ...,
return_numpy: bool = ...,
use_program_cache: bool = ...,
use_prune: bool = ...,
) -> list[Tensor] | list[npt.NDArray[Any]]:
...

def run(
self,
program=None,
Expand Down Expand Up @@ -1753,8 +1814,10 @@ def run(
>>> exe.run(paddle.static.default_startup_program())
>>> x = numpy.random.random(size=(10, 1)).astype('float32')
>>> loss_val, array_val = exe.run(feed={'X': x},
... fetch_list=[loss.name, array.name])
>>> loss_val, array_val = exe.run(
... feed={'X': x},
... fetch_list=[loss.name, array.name] # type: ignore[union-attr]
... )
>>> print(array_val)
>>> # doctest: +SKIP("Random output")
[array(0.16870381, dtype=float32)]
Expand Down Expand Up @@ -1783,17 +1846,20 @@ def run(
>>> exe.run(paddle.static.default_startup_program())
>>> build_strategy = paddle.static.BuildStrategy()
>>> binary = paddle.static.CompiledProgram(
... paddle.static.default_main_program(), build_strategy=build_strategy)
... paddle.static.default_main_program(),
... build_strategy=build_strategy
... )
>>> batch_size = 6
>>> x = np.random.random(size=(batch_size, 1)).astype('float32')
>>> prediction, = exe.run(binary,
... feed={'X': x},
... fetch_list=[prediction.name])
>>> prediction, = exe.run(
... binary,
... feed={'X': x},
... fetch_list=[prediction.name]
... )
>>> # If the user uses two GPU cards to run this python code, the printed result will be
>>> # (6, class_dim). The first dimension value of the printed result is the batch_size.
>>> print("The prediction shape: {}".format(
... np.array(prediction).shape))
>>> print("The prediction shape: {}".format(np.array(prediction).shape))
The prediction shape: (6, 2)
>>> print(prediction)
Expand Down Expand Up @@ -2231,7 +2297,7 @@ def _adjust_pipeline_resource(self, pipeline_opt, dataset, pipeline_num):
dataset.set_thread(pipeline_opt["concurrency_list"][0] * pipeline_num)
return pipeline_num

def split_program_by_device(self, program):
def split_program_by_device(self, program: Program) -> list[int] | None:
ops_list = []
type_list = []
pre = None
Expand Down Expand Up @@ -3110,16 +3176,16 @@ def _run_pipeline(

def infer_from_dataset(
self,
program=None,
dataset=None,
scope=None,
thread=0,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
):
program: Program | CompiledProgram | None = None,
dataset: DatasetBase | None = None,
scope: core.Scope | None = None,
thread: int = 0,
debug: bool = False,
fetch_list: list[Tensor] | None = None,
fetch_info: list[str] | None = None,
print_period: int = 100,
fetch_handler: FetchHandler | None = None,
) -> None:
"""
Infer from a pre-defined Dataset. Dataset is defined in paddle.base.dataset.
Given a program, either a program or compiled program, infer_from_dataset will
Expand Down Expand Up @@ -3189,14 +3255,14 @@ def infer_from_dataset(

def start_heter_trainer(
self,
program=None,
scope=None,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
):
program: Program | None = None,
scope: core.Scope | None = None,
debug: bool = False,
fetch_list: list[Tensor] | None = None,
fetch_info: list[str] | None = None,
print_period: int = 100,
fetch_handler: FetchHandler | None = None,
) -> core.TrainerBase:
scope, trainer = self._prepare_trainer(
program=program,
dataset=None,
Expand Down Expand Up @@ -3233,16 +3299,16 @@ def start_heter_trainer(

def train_from_dataset(
self,
program=None,
dataset=None,
scope=None,
thread=0,
debug=False,
fetch_list=None,
fetch_info=None,
print_period=100,
fetch_handler=None,
):
program: Program | CompiledProgram | None = None,
dataset: DatasetBase | None = None,
scope: core.Scope | None = None,
thread: int = 0,
debug: bool = False,
fetch_list: list[Tensor] | None = None,
fetch_info: list[str] | None = None,
print_period: int = 100,
fetch_handler: FetchHandler | None = None,
) -> None:
"""
Train from a pre-defined Dataset. Dataset is defined in paddle.base.dataset.
Given a program, either a program or compiled program, train_from_dataset will
Expand Down

0 comments on commit ba16a30

Please sign in to comment.