Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support inferring batch size from tensor argument inputs #4617

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions dali/pipeline/executor/executor.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,32 @@

namespace dali {

inline bool HasTensorArgInputs(const ArgumentWorkspace& argument_ws) {
return begin(argument_ws) != end(argument_ws);
}

/**
* @brief Takes the batch size from any of the op's tensor inputs.
*
* If no inputs were specified, a batch size inferred from
* the stage queue is used instead.
*
* Assumes that most of the operators expect uniform batch
* size between all inputs and outputs. The notable exception
* of split and merge operators cannot rely on this value.
*/
inline int InferBatchSizeFromInput(const Workspace &ws, int stage_batch_size) {
if (ws.NumInput() > 0) {
return ws.GetInputBatchSize(0);
}
const ArgumentWorkspace &argument_ws = ws;
if (HasTensorArgInputs(argument_ws)) {
auto [name, arg] = *begin(argument_ws);
return arg.tvec->num_samples();
}
return stage_batch_size;
}

template <typename WorkspacePolicy, typename QueuePolicy>
void Executor<WorkspacePolicy, QueuePolicy>::PreRun() {
auto batch_size = InferBatchSize(batch_size_providers_);
Expand Down Expand Up @@ -97,7 +123,7 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunCPUImpl(size_t iteration_id) {
return;
}

auto batch_size = batch_sizes_cpu_.front();
int stage_batch_size = batch_sizes_cpu_.front();
batch_sizes_cpu_.pop();

// Run the cpu-ops in the thread
Expand All @@ -106,6 +132,7 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunCPUImpl(size_t iteration_id) {
OpNode &op_node = graph_->Node(OpType::CPU, cpu_op_id);
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::CPU>(cpu_idxs, *graph_, cpu_op_id);

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);

DomainTimeRange tr("[DALI][CPU op] " + op_node.instance_name, DomainTimeRange::kBlue1);
Expand Down Expand Up @@ -143,14 +170,15 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunMixedImpl(size_t iteration_id) {
if (device_id_ != CPU_ONLY_DEVICE_ID)
CUDA_CALL(cudaEventSynchronize(mixed_stage_event_));

auto batch_size = batch_sizes_mixed_.front();
int stage_batch_size = batch_sizes_mixed_.front();
batch_sizes_mixed_.pop();

for (int i = 0; i < graph_->NumOp(OpType::MIXED) && !exec_error_; ++i) {
OpNode &op_node = graph_->Node(OpType::MIXED, i);
try {
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::MIXED>(mixed_idxs, *graph_, i);

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);

DomainTimeRange tr("[DALI][Mixed op] " + op_node.instance_name, DomainTimeRange::kOrange);
Expand Down Expand Up @@ -208,14 +236,15 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunGPUImpl(size_t iteration_id) {
// iterations of a stage of the pipeline.
CUDA_CALL(cudaEventSynchronize(gpu_stage_event_));

auto batch_size = batch_sizes_gpu_.front();
int stage_batch_size = batch_sizes_gpu_.front();
batch_sizes_gpu_.pop();

for (int i = 0; i < graph_->NumOp(OpType::GPU) && !exec_error_; ++i) {
OpNode &op_node = graph_->Node(OpType::GPU, i);
try {
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::GPU>(gpu_idxs, *graph_, i);

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);

auto parent_events = ws.ParentEvents();
Expand Down Expand Up @@ -332,11 +361,15 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunHelper(OpNode &op_node, Workspac
}

// Assuming that most operators don't expect empty input, and expect consistent input.
if (ws.NumInput() > 0) {
if (ws.NumInput() > 0 || HasTensorArgInputs(ws)) {
bool all_inputs_empty = true;
for (int i = 0; i < ws.NumInput(); i++) {
all_inputs_empty = all_inputs_empty && ws.GetInputBatchSize(i) == 0;
}
const ArgumentWorkspace &argument_ws = ws;
for (const auto &[name, arg] : argument_ws) {
all_inputs_empty = all_inputs_empty && arg.tvec->num_samples() == 0;
}
if (all_inputs_empty) {
// We skip the execution of this operator and Reset the outputs in case some state was still
// present.
Expand Down
132 changes: 132 additions & 0 deletions dali/test/python/conditionals/test_pipeline_conditionals.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,135 @@ def one_return():
" (both `if` branches). The `else` branch must also have a return"
" statement.")):
one_return()


def _tensor_arg_permute_batch_params():
batch_sizes = [1, 5, 8]
inp0 = [[np.full((2, 2), i, dtype=np.float32) for i in range(batch_size)]
for batch_size in batch_sizes]
mask_batches = [
np.array([i % 2 for i in range(batch_size)], dtype=bool) for batch_size in batch_sizes
]
kwarg_batches = [np.array([pred for pred in mask], dtype=np.int32) for mask in mask_batches]
return (inp0, ), mask_batches, {'indices': kwarg_batches}


def _tensor_arg_transform_per_dim_params(arg_name):

def inner():
batch_sizes = [5, 1, 2, 8]
mask_batches = [
np.array([i % 2 for i in range(batch_size)], dtype=bool) for batch_size in batch_sizes
]
kwarg_batches = [
np.array([[pred, pred] for pred in mask], dtype=np.float32) for mask in mask_batches
]
return tuple(), mask_batches, {arg_name: kwarg_batches}

return inner


def _tensor_arg_rotate_params():
batch_sizes = [3, 1, 2, 4]
mask_batches = [
np.array([i % 2 for i in range(batch_size)], dtype=bool) for batch_size in batch_sizes
]
kwarg_batches = [
np.array([10 + 45 * pred for pred in mask], dtype=np.float32) for mask in mask_batches
]
return tuple(), mask_batches, {'angle': kwarg_batches}


def _tensor_arg_roi_random_crop_params():
batch_sizes = [1, 2, 7, 3]
crop_shape = [[
np.array([100 * i + 50, 200 * i + 50, 3], dtype=np.int32) for i in range(batch_size)
] for batch_size in batch_sizes]
roi_start = [[
np.array([sample[0] // 2, sample[1] // 2, sample[2]], dtype=np.int32) for sample in batch
] for batch in crop_shape]
mask_batches = [
np.array([i % 2 for i in range(batch_size)], dtype=bool) for batch_size in batch_sizes
]
return tuple(), mask_batches, {
'crop_shape': crop_shape,
'roi_start': roi_start,
'roi_end': crop_shape
}


def _tensor_arg_shape_kwarg():
batch_sizes = [1, 2, 3, 16, 5]
shape = [[np.array([1 + 3 * i, 2 * (i + 1) - 1], dtype=np.int32) for i in range(batch_size)]
for batch_size in batch_sizes]
mask_batches = [
np.array([i % 2 for i in range(batch_size)], dtype=bool) for batch_size in batch_sizes
]
return tuple(), mask_batches, {'shape': shape}


# Test operators that infer their batch sizes from the tensor argument inputs
@params(fn.permute_batch, fn.roi_random_crop, fn.transforms.crop, fn.transforms.scale,
fn.transforms.shear, fn.transforms.translation, fn.transforms.rotation,
fn.random.uniform, fn.random.normal, fn.random.coin_flip)
def test_named_tensor_arguments(op):

ops2params = {
fn.permute_batch: _tensor_arg_permute_batch_params,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

permute_batch does have an input - although by original design the output batch size was taken from indices. I don't think there's any special case for this op - or any other means to infer the batch size - which would mean that it takes the batch_size just from the input, not arg. input.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it on purpose here as it calls GetRequestedBatchSize manually as an extra check that GetRequestedBatchSize returns the "split" batch size.

fn.roi_random_crop: _tensor_arg_roi_random_crop_params,
fn.transforms.crop: _tensor_arg_transform_per_dim_params('from_start'),
fn.transforms.scale: _tensor_arg_transform_per_dim_params('scale'),
fn.transforms.shear: _tensor_arg_transform_per_dim_params('angles'),
fn.transforms.translation: _tensor_arg_transform_per_dim_params('offset'),
fn.transforms.rotation: _tensor_arg_rotate_params,
fn.random.uniform: _tensor_arg_shape_kwarg,
fn.random.normal: _tensor_arg_shape_kwarg,
fn.random.coin_flip: _tensor_arg_shape_kwarg,
}

def dummy_source(batches):

def cb():
for batch in batches:
yield batch

return cb

def get_pipeline(op, args_batches, mask_batches, kwargs_batches, num_threads=4, device_id=0):
max_batch_size = max(len(batch) for batch in mask_batches)

@pipeline_def(batch_size=max_batch_size, num_threads=num_threads, device_id=device_id)
def split_pipeline():
args = [fn.external_source(dummy_source(arg_batches)) for arg_batches in args_batches]
mask = fn.external_source(dummy_source(mask_batches))
kwargs = {
kwarg_name: fn.external_source(dummy_source(batches))
for kwarg_name, batches in kwargs_batches.items()
}
kwargs_split = {
kwarg_name: fn._conditional.split(batch, predicate=mask)
for kwarg_name, batch in kwargs.items()
}
split_args = [fn._conditional.split(arg, predicate=mask) for arg in args]
left_args = [left_arg for left_arg, _ in split_args]
right_args = [right_arg for _, right_arg in split_args]
left = op(
*left_args,
**{kwarg_name: left_kwarg
for kwarg_name, (left_kwarg, _) in kwargs_split.items()})
right = op(
*right_args, **{
kwarg_name: right_kwarg
for kwarg_name, (_, right_kwarg) in kwargs_split.items()
})
batch = fn._conditional.merge(left, right, predicate=mask)
return batch

return split_pipeline()

args_batches, mask_batches, kwargs_batches = ops2params[op]()
pipe = get_pipeline(op=op, args_batches=args_batches, mask_batches=mask_batches,
kwargs_batches=kwargs_batches)
pipe.build()
for _ in range(len(mask_batches)):
pipe.run()