Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fluid API magration : array_read, array_write #49022

Merged
merged 5 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ class HybridParallelInferenceHelper:
while_op = layers.While(cond, is_test=True)

# init global lod_tensor_array for generation task
arr = layers.array_write(data, step_idx)
arr = paddle.tensor.array_write(data, step_idx)

with while_op.block():
with paddle.fluid.device_guard(f'{device}:all'):
# read data from global lod_tensor_array
element_in_arr = layers.array_read(array=arr, i=step_idx)
element_in_arr = paddle.tensor.array_read(array=arr, i=step_idx)
# write placehold data to global lod_tensor_array,
# it need for send_v2 of lod_tensor_array
paddle.increment(x=step_idx, value=1.0)
layers.array_write(element_in_arr, i=step_idx, array=arr)
paddle.tensor.array_write(element_in_arr, i=step_idx, array=arr)

with paddle.fluid.device_guard(f'{device}:0'):
... some code
Expand All @@ -77,7 +77,7 @@ class HybridParallelInferenceHelper:
# generate some data in while block and write to global lod_tensor_array
# that they are read in next while step.
# we will using send_v2 to send global lod_tensor_array to other pipeline and sync
layers.array_write(other_var, i=step_idx, array=arr)
paddle.tensor.array_write(other_var, i=step_idx, array=arr)

# update cond and assign to cond_int, we will sync cond_int
layers.assign(layers.cast(cond, dtype="int32"), cond_int)
Expand Down Expand Up @@ -128,17 +128,17 @@ class HybridParallelInferenceHelper:
step_idx = layers.fill_constant(
shape=[1], dtype="int64", value=0, force_cpu=False, name="i")

data = layers.array_write(X, step_idx)
data = paddle.tensor.array_write(X, step_idx)

cond_int = layers.fill_constant(shape=[1], dtype="int64", value=0, force_cpu=False, name="cond_int")
cond = paddle.less_than(x=step_idx, y=max_len)
while_op = layers.While(cond, is_test=True)

with while_op.block():
with paddle.fluid.device_guard(f'{device}:all'):
input = layers.array_read(array=data, i=step_idx)
input = paddle.tensor.array_read(array=data, i=step_idx)
paddle.increment(x=step_idx, value=1.0)
layers.array_write(input, i=step_idx, array=data)
paddle.tensor.array_write(input, i=step_idx, array=data)

with paddle.fluid.device_guard(f'{device}:0'):
param_attr = paddle.ParamAttr(initializer=paddle.nn.initializer.Constant(1.0))
Expand All @@ -152,7 +152,7 @@ class HybridParallelInferenceHelper:
shape=[5, 2], dtype='float32', attr=param_attr, is_bias=False)
hidden2 = paddle.matmul(hidden1, weight2)

layers.array_write(hidden2, i=step_idx, array=data)
paddle.tensor.array_write(hidden2, i=step_idx, array=data)

# update cond and assign to cond_int, we will sync cond_int
paddle.assign(paddle.less_than(x=step_idx, y=max_len), cond)
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ def run(
adam = paddle.optimizer.Adam()
adam.minimize(loss)
i = paddle.zeros(shape=[1], dtype='int64')
array = paddle.fluid.layers.array_write(x=loss, i=i)
array = paddle.tensor.array_write(x=loss, i=i)

# Run the startup program once and only once.
exe.run(paddle.static.default_startup_program())
Expand Down
192 changes: 0 additions & 192 deletions python/paddle/fluid/layers/control_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@

__all__ = [
'Switch',
'array_write',
'array_read',
'StaticRNN',
'Print',
'while_loop',
Expand Down Expand Up @@ -1362,196 +1360,6 @@ def create_var_like(o_var):
return results


def array_write(x, i, array=None):
"""
This OP writes the input ``x`` into the i-th position of the ``array``
:ref:`api_fluid_LoDTensorArray` and returns the modified array.
If ``array`` is none, a new LoDTensorArray will be created and returned.
This OP is often used together with :ref:`api_fluid_layers_array_read` OP.

Args:
x (Variable): The input data to be written into array. It's multi-dimensional
Tensor or LoDTensor. Data type: float32, float64, int32, int64.
i (Variable): 1-D Tensor with shape [1], which represents the position into which
``x`` is written. Data type: int64.
array (LoDTensorArray, optional): The LoDTensorArray into which ``x`` is written.
The default value is None, when a new LoDTensorArray will be created and returned
as a result.

Returns:
Variable: The input ``array`` after ``x`` is written into.

Examples:
.. code-block:: python

import paddle.fluid as fluid
tmp = fluid.layers.fill_constant(shape=[3, 2], dtype='int64', value=5)
i = fluid.layers.fill_constant(shape=[1], dtype='int64', value=10)
# Write tmp into the position of arr with subscript 10 and return arr.
arr = fluid.layers.array_write(tmp, i=i)

# Now, arr is a LoDTensorArray with length 11. We can use array_read OP to read
# the data at subscript 10 and print it out.
item = fluid.layers.array_read(arr, i=i)
input = fluid.layers.Print(item, message="The content of i-th LoDTensor:")
main_program = fluid.default_main_program()
exe = fluid.Executor(fluid.CPUPlace())
exe.run(main_program)

# The printed result is:
# 1570533133 The content of i-th LoDTensor: The place is:CPUPlace
# Tensor[array_read_0.tmp_0]
# shape: [3,2,]
# dtype: l
# data: 5,5,5,5,5,5,

# the output is 2-D Tensor with shape [3,2], which is tmp above.
# dtype is the corresponding C++ data type, which may vary in different environments.
# Eg: if the data type of tensor is int64, then the corresponding C++ data type is int64_t,
# so the dtype value is typeid(int64_t).Name(), which is 'x' on MacOS, 'l' on Linux,
# and '__int64' on Windows. They both represent 64-bit integer variables.

"""
if _non_static_mode():
assert isinstance(
x, Variable
), "The input data 'x' in array_write must be Variable in dygraph mode"
assert isinstance(
i, Variable
), "The index 'i' in array_write must be Variable in dygraph mode"
assert i.shape == [
1
], "The shape of index 'i' should be [1] in dygraph mode"
i = i.numpy().item(0)
if array is None:
array = paddle.tensor.create_array(x.dtype)
assert isinstance(
array, list
), "The 'array' in array_write must be a list in dygraph mode"
assert i <= len(
array
), "The index 'i' should not be greater than the length of 'array' in dygraph mode"
if i < len(array):
array[i] = x
else:
array.append(x)
return array

check_variable_and_dtype(i, 'i', ['int64'], 'array_write')
check_type(x, 'x', (Variable), 'array_write')
helper = LayerHelper('array_write', **locals())
if array is not None:
if (
not isinstance(array, Variable)
or array.type != core.VarDesc.VarType.LOD_TENSOR_ARRAY
):
raise TypeError(
"array should be tensor array vairable in array_write Op"
)
if array is None:
array = helper.create_variable(
name="{0}.out".format(helper.name),
type=core.VarDesc.VarType.LOD_TENSOR_ARRAY,
dtype=x.dtype,
)
helper.append_op(
type='write_to_array',
inputs={'X': [x], 'I': [i]},
outputs={'Out': [array]},
)
return array


def array_read(array, i):
"""
This OP is used to read data at the specified position from the input array
:ref:`api_fluid_LoDTensorArray` . ``array`` is the input array and ``i``
is the specified read position. This OP is often used together with
:ref:`api_fluid_layers_array_write` OP.

Case 1:
::
Input:
The shape of first three tensors are [1], and that of the last one is [1,2]:
array = ([0.6], [0.1], [0.3], [0.4, 0.2])
And:
i = [3]

Output:
output = [0.4, 0.2]

Args:
array (LoDTensorArray): The input LoDTensorArray.
i (Variable): 1-D Tensor, whose shape is [1] and dtype is int64. It represents the
specified read position of ``array``.

Returns:
Variable: The LoDTensor or Tensor that is read at the specified position of ``array``.

Examples:
.. code-block:: python

# First we're going to create a LoDTensorArray, then we're going to write the Tensor into
# the specified position, and finally we're going to read the Tensor at that position.
import paddle.fluid as fluid
arr = fluid.layers.create_array(dtype='float32')
tmp = fluid.layers.fill_constant(shape=[3, 2], dtype='int64', value=5)
i = fluid.layers.fill_constant(shape=[1], dtype='int64', value=10)
# tmp is the Tensor with shape [3,2], and if we write it into the position with subscript 10
# of the empty-array: arr, then the length of arr becomes 11.
arr = fluid.layers.array_write(tmp, i, array=arr)
# Read the data of the position with subscript 10.
item = fluid.layers.array_read(arr, i)

# You can print out the data via executor.
input = fluid.layers.Print(item, message="The LoDTensor of the i-th position:")
main_program = fluid.default_main_program()
exe = fluid.Executor(fluid.CPUPlace())
exe.run(main_program)

# The printed result is:

# 1569588169 The LoDTensor of the i-th position: The place is:CPUPlace
# Tensor[array_read_0.tmp_0]
# shape: [3,2,]
# dtype: l
# data: 5,5,5,5,5,5,

# the output is 2-D Tensor with shape [3,2].
# dtype is the corresponding C++ data type, which may vary in different environments.
# Eg: if the data type of tensor is int64, then the corresponding C++ data type is int64_t,
# so the dtype value is typeid(int64_t).Name(), which is 'x' on MacOS, 'l' on Linux,
# and '__int64' on Windows. They both represent 64-bit integer variables.
"""
if _non_static_mode():
assert isinstance(
array, list
), "The 'array' in array_read must be list in dygraph mode"
assert isinstance(
i, Variable
), "The index 'i' in array_read must be Variable in dygraph mode"
assert i.shape == [
1
], "The shape of index 'i' should be [1] in dygraph mode"
i = i.numpy().item(0)
return array[i]

check_variable_and_dtype(i, 'i', ['int64'], 'array_read')
helper = LayerHelper('array_read', **locals())
if (
not isinstance(array, Variable)
or array.type != core.VarDesc.VarType.LOD_TENSOR_ARRAY
):
raise TypeError("array should be tensor array vairable")
out = helper.create_variable_for_type_inference(dtype=array.dtype)
helper.append_op(
type='read_from_array',
inputs={'X': [array], 'I': [i]},
outputs={'Out': [out]},
)
return out


class ConditionalBlockGuard(BlockGuard):
"""
ConditionalBlockGuard is derived from BlockGuard. It is dedicated for
Expand Down
3 changes: 1 addition & 2 deletions python/paddle/fluid/layers/math_op_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .. import core
from ..framework import Variable, unique_name, static_only
from .layer_function_generator import OpProtoHolder
from .control_flow import array_write
from paddle.fluid.dygraph.base import in_declarative_mode

_supported_int_dtype_ = [
Expand Down Expand Up @@ -246,7 +245,7 @@ def append(self, var):
self.type
)
)
from paddle.tensor.array import array_length
from paddle.tensor.array import array_length, array_write

array_write(x=var, i=array_length(self), array=self)

Expand Down
16 changes: 8 additions & 8 deletions python/paddle/fluid/layers/rnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,10 @@ def _dynamic_decode_declarative(
else:
# inputs and states of all steps must be saved for backward and training
inputs_arrays = map_structure(
lambda x: control_flow.array_write(x, step_idx), initial_inputs
lambda x: paddle.tensor.array_write(x, step_idx), initial_inputs
)
states_arrays = map_structure(
lambda x: control_flow.array_write(x, step_idx), initial_states
lambda x: paddle.tensor.array_write(x, step_idx), initial_states
)

def _maybe_copy(state, new_state, step_mask):
Expand Down Expand Up @@ -1015,11 +1015,11 @@ def _create_array_out_of_while(dtype):
with while_op.block():
if not is_test:
inputs = map_structure(
lambda array: control_flow.array_read(array, step_idx),
lambda array: paddle.tensor.array_read(array, step_idx),
inputs_arrays,
)
states = map_structure(
lambda array: control_flow.array_read(array, step_idx),
lambda array: paddle.tensor.array_read(array, step_idx),
states_arrays,
)
(outputs, next_states, next_inputs, next_finished) = decoder.step(
Expand Down Expand Up @@ -1058,7 +1058,7 @@ def _create_array_out_of_while(dtype):
)

map_structure(
lambda x, x_array: control_flow.array_write(
lambda x, x_array: paddle.tensor.array_write(
x, i=step_idx, array=x_array
),
outputs,
Expand All @@ -1075,14 +1075,14 @@ def _create_array_out_of_while(dtype):
map_structure(tensor.assign, next_states, global_states)
else:
map_structure(
lambda x, x_array: control_flow.array_write(
lambda x, x_array: paddle.tensor.array_write(
x, i=step_idx, array=x_array
),
next_inputs,
inputs_arrays,
)
map_structure(
lambda x, x_array: control_flow.array_write(
lambda x, x_array: paddle.tensor.array_write(
x, i=step_idx, array=x_array
),
next_states,
Expand All @@ -1107,7 +1107,7 @@ def _create_array_out_of_while(dtype):
final_states = global_states
else:
final_states = map_structure(
lambda array: control_flow.array_read(array, step_idx),
lambda array: paddle.tensor.array_read(array, step_idx),
states_arrays,
)

Expand Down
4 changes: 2 additions & 2 deletions python/paddle/fluid/layers/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ def tensor_array_to_tensor(input, axis=1, name=None, use_stack=False):
x1 = fluid.layers.assign(np.random.rand(2, 2).astype("float32"))
i = fluid.layers.fill_constant(shape=[1], dtype="int64", value=0)
array = fluid.layers.create_array(dtype='float32')
fluid.layers.array_write(x0, i, array)
fluid.layers.array_write(x1, i + 1, array)
paddle.tensor.array_write(x0, i, array)
paddle.tensor.array_write(x1, i + 1, array)
output, output_index = fluid.layers.tensor_array_to_tensor(input=array)
"""
if _non_static_mode():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def get_program():
)
pred = mlp_start(input)

input_array = fluid.layers.array_write(pred, i)
input_array = paddle.tensor.array_write(pred, i)
# TODO: check whether this annotation is needed
# auto.shard_tensor(input_array,
# dist_attr={
Expand All @@ -177,7 +177,7 @@ def get_program():
while_op = paddle.static.nn.control_flow.While(cond=cond)
with while_op.block():

pre_input = fluid.layers.array_read(array=input_array, i=i)
pre_input = paddle.tensor.array_read(array=input_array, i=i)
auto.shard_tensor(pre_input, _g_process_mesh, [None, None, None])

mlp_while = MLPLayer(
Expand All @@ -190,10 +190,10 @@ def get_program():

# 更新循环条件
i = paddle.increment(x=i, value=1)
fluid.layers.array_write(cur_pred, array=input_array, i=i)
paddle.tensor.array_write(cur_pred, array=input_array, i=i)
paddle.assign(paddle.less_than(x=i, y=loop_len), cond)

end_pred = fluid.layers.array_read(array=input_array, i=i)
end_pred = paddle.tensor.array_read(array=input_array, i=i)
auto.shard_tensor(end_pred, _g_process_mesh, [None, None, None])

mlp_end = MLPLayer(
Expand Down
Loading