Skip to content

Commit

Permalink
Remove default_cuda_stream_priority from native code and deprecate it…
Browse files Browse the repository at this point in the history
… in Python.

Signed-off-by: Michał Zientkiewicz <mzient@gmail.com>
  • Loading branch information
mzient committed Nov 20, 2024
1 parent 7fd3876 commit c56b613
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 193 deletions.
5 changes: 2 additions & 3 deletions dali/pipeline/executor/async_pipelined_executor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,10 +35,9 @@ class DLL_PUBLIC AsyncPipelinedExecutor : public PipelinedExecutor {
DLL_PUBLIC inline AsyncPipelinedExecutor(int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
int default_cuda_stream_priority = 0,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: PipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
max_num_stream, default_cuda_stream_priority, prefetch_queue_depth),
max_num_stream, prefetch_queue_depth),
cpu_thread_(device_id, set_affinity, "CPU executor"),
mixed_thread_(device_id, set_affinity, "Mixed executor"),
gpu_thread_(device_id, set_affinity, "GPU executor") {}
Expand Down
5 changes: 2 additions & 3 deletions dali/pipeline/executor/async_separated_pipelined_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ class DLL_PUBLIC AsyncSeparatedPipelinedExecutor : public SeparatedPipelinedExec
public:
DLL_PUBLIC inline AsyncSeparatedPipelinedExecutor(
int batch_size, int num_thread, int device_id, size_t bytes_per_sample_hint,
bool set_affinity = false, int max_num_stream = -1, int default_cuda_stream_priority = 0,
bool set_affinity = false, int max_num_stream = -1,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: SeparatedPipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint,
set_affinity, max_num_stream, default_cuda_stream_priority,
prefetch_queue_depth),
set_affinity, max_num_stream, prefetch_queue_depth),
cpu_thread_(device_id, set_affinity, "CPU executor"),
mixed_thread_(device_id, set_affinity, "Mixed executor"),
gpu_thread_(device_id, set_affinity, "GPU executor") {}
Expand Down
4 changes: 1 addition & 3 deletions dali/pipeline/executor/executor_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace {
auto MakeExec2Config(int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity,
int max_num_stream,
int default_cuda_stream_priority,
QueueSizes prefetch_queue_depth) {
exec2::Executor2::Config cfg{};
cfg.async_output = false;
Expand Down Expand Up @@ -92,12 +91,11 @@ std::unique_ptr<ExecutorBase> GetExecutor(bool pipelined, bool separated, bool a
int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity,
int max_num_stream,
int default_cuda_stream_priority,
QueueSizes prefetch_queue_depth) {
return GetExecutorImpl(
pipelined, separated, async, dynamic,
batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
max_num_stream, default_cuda_stream_priority, prefetch_queue_depth);
max_num_stream, prefetch_queue_depth);
}

} // namespace dali
1 change: 0 additions & 1 deletion dali/pipeline/executor/executor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ std::unique_ptr<ExecutorBase> GetExecutor(bool pipelined, bool separated, bool a
int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
int default_cuda_stream_priority = 0,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2});

} // namespace dali
Expand Down
3 changes: 1 addition & 2 deletions dali/pipeline/executor/executor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "dali/pipeline/operator/name_utils.h"
#include "dali/pipeline/util/batch_utils.h"
#include "dali/pipeline/util/event_pool.h"
#include "dali/pipeline/util/stream_pool.h"
#include "dali/pipeline/util/thread_pool.h"
#include "dali/pipeline/workspace/iteration_data.h"
#include "dali/pipeline/workspace/workspace_data_factory.h"
Expand Down Expand Up @@ -75,7 +74,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
public:
DLL_PUBLIC inline Executor(int max_batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1, int default_cuda_stream_priority = 0,
int max_num_stream = -1,
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: max_batch_size_(max_batch_size),
device_id_(device_id),
Expand Down
3 changes: 1 addition & 2 deletions dali/pipeline/executor/pipelined_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ class DLL_PUBLIC PipelinedExecutorImpl : public Executor<WorkspacePolicy, QueueP
DLL_PUBLIC inline PipelinedExecutorImpl(int batch_size, int num_thread, int device_id,
size_t bytes_per_sample_hint, bool set_affinity = false,
int max_num_stream = -1,
int default_cuda_stream_priority = 0,
QueueSizes prefetch_queue_depth = {2, 2})
: Executor<WorkspacePolicy, QueuePolicy>(batch_size, num_thread, device_id,
bytes_per_sample_hint, set_affinity, max_num_stream,
default_cuda_stream_priority, prefetch_queue_depth) {
prefetch_queue_depth) {
}

DLL_PUBLIC ~PipelinedExecutorImpl() override = default;
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/operator/op_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ OpSchema::OpSchema(const std::string &name) : name_(name) {
AddInternalArg("max_batch_size", "Max batch size", -1);
AddInternalArg("device", "Device on which the Op is run", std::string("cpu"));
AddInternalArg("inplace", "Whether Op can be run in place", false);
AddInternalArg("default_cuda_stream_priority", "Default cuda stream priority", 0);
AddInternalArg("default_cuda_stream_priority", "Default cuda stream priority", 0); // deprecated
AddInternalArg("checkpointing", "Setting to `true` enables checkpointing", false);

AddOptionalArg("seed", R"code(Random seed.
Expand Down
7 changes: 2 additions & 5 deletions dali/pipeline/operator/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ class DLL_PUBLIC OperatorBase {
inline explicit OperatorBase(const OpSpec &spec)
: spec_(spec),
num_threads_(spec.GetArgument<int>("num_threads")),
max_batch_size_(spec.GetArgument<int>("max_batch_size")),
default_cuda_stream_priority_(spec.GetArgument<int>("default_cuda_stream_priority")) {
max_batch_size_(spec.GetArgument<int>("max_batch_size")) {
DALI_ENFORCE(num_threads_ > 0, "Invalid value for argument num_threads.");
DALI_ENFORCE(max_batch_size_ > 0, "Invalid value for argument max_batch_size.");
}
Expand Down Expand Up @@ -239,16 +238,14 @@ class DLL_PUBLIC OperatorBase {
const OpSpec spec_;
int num_threads_;
int max_batch_size_;
int default_cuda_stream_priority_;

std::unordered_map<std::string, std::any> diagnostics_;
};

#define USE_OPERATOR_MEMBERS() \
using OperatorBase::spec_; \
using OperatorBase::num_threads_; \
using OperatorBase::max_batch_size_; \
using OperatorBase::default_cuda_stream_priority_
using OperatorBase::max_batch_size_;

/**
* @brief Class defining an operator using specific backend.
Expand Down
20 changes: 5 additions & 15 deletions dali/pipeline/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,19 @@ void InitializeMemoryResources() {
Pipeline::Pipeline(int max_batch_size, int num_threads, int device_id, int64_t seed,
bool pipelined_execution, int prefetch_queue_depth,
bool async_execution, bool dynamic_execution, size_t bytes_per_sample_hint,
bool set_affinity, int max_num_stream, int default_cuda_stream_priority) {
bool set_affinity, int max_num_stream) {
InitializeMemoryResources();
Init(max_batch_size, num_threads, device_id, seed, pipelined_execution, separated_execution_,
async_execution, dynamic_execution, bytes_per_sample_hint, set_affinity, max_num_stream,
default_cuda_stream_priority, QueueSizes{prefetch_queue_depth});
QueueSizes{prefetch_queue_depth});
}

Pipeline::Pipeline(const string &serialized_pipe,
int batch_size, int num_threads, int device_id,
bool pipelined_execution, int prefetch_queue_depth,
bool async_execution, bool dynamic_execution,
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream,
int default_cuda_stream_priority, int64_t seed) {
int64_t seed) {
InitializeMemoryResources();
dali_proto::PipelineDef def;
DALI_ENFORCE(DeserializePipeline(serialized_pipe, def), "Error parsing serialized pipeline.");
Expand Down Expand Up @@ -142,7 +142,6 @@ Pipeline::Pipeline(const string &serialized_pipe,
bytes_per_sample_hint,
set_affinity,
max_num_stream,
default_cuda_stream_priority,
QueueSizes{prefetch_queue_depth});

// from serialized pipeline, construct new pipeline
Expand Down Expand Up @@ -181,7 +180,7 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
bool pipelined_execution, bool separated_execution,
bool async_execution, bool dynamic_execution,
size_t bytes_per_sample_hint, bool set_affinity, int max_num_stream,
int default_cuda_stream_priority, QueueSizes prefetch_queue_depth) {
QueueSizes prefetch_queue_depth) {
DALI_ENFORCE(device_id == CPU_ONLY_DEVICE_ID || cuInitChecked(),
"You are trying to create a GPU DALI pipeline, while CUDA is not available. "
"Please install CUDA or set `device_id = None` in Pipeline constructor. "
Expand All @@ -200,7 +199,6 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
this->bytes_per_sample_hint_ = bytes_per_sample_hint;
this->set_affinity_ = set_affinity;
this->max_num_stream_ = max_num_stream;
this->default_cuda_stream_priority_ = default_cuda_stream_priority;
this->prefetch_queue_depth_ = prefetch_queue_depth;
this->separated_execution_ = (prefetch_queue_depth.cpu_size != prefetch_queue_depth.gpu_size);
DALI_ENFORCE(max_batch_size_ > 0, "Max batch size must be greater than 0");
Expand All @@ -215,14 +213,6 @@ void Pipeline::Init(int max_batch_size, int num_threads, int device_id, int64_t
std::min(lowest_cuda_stream_priority, highest_cuda_stream_priority);
const auto max_priority_value =
std::max(lowest_cuda_stream_priority, highest_cuda_stream_priority);
DALI_ENFORCE(
default_cuda_stream_priority >= min_priority_value &&
default_cuda_stream_priority <= max_priority_value,
"Provided default cuda stream priority `" + std::to_string(default_cuda_stream_priority) +
"` is outside the priority range [" + std::to_string(min_priority_value) + ", " +
std::to_string(max_priority_value) + "], with lowest priority being `" +
std::to_string(lowest_cuda_stream_priority) + "` and highest priority being `" +
std::to_string(highest_cuda_stream_priority) + "`");

seed_.resize(MAX_SEEDS);
current_seed_ = 0;
Expand Down Expand Up @@ -473,7 +463,7 @@ void Pipeline::Build(std::vector<PipelineOutputDesc> output_descs) {
executor_ =
GetExecutor(pipelined_execution_, separated_execution_, async_execution_, dynamic_execution_,
max_batch_size_, num_threads_, device_id_, bytes_per_sample_hint_, set_affinity_,
max_num_stream_, default_cuda_stream_priority_, prefetch_queue_depth_);
max_num_stream_, prefetch_queue_depth_);
executor_->EnableMemoryStats(enable_memory_stats_);
executor_->EnableCheckpointing(checkpointing_);
executor_->Init();
Expand Down
9 changes: 3 additions & 6 deletions dali/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,19 @@ class DLL_PUBLIC Pipeline {
* configured in the thread pool. Defaults to 'false'.
* @param max_num_stream set an upper limit on the number of cudaStreams
* that can be allocated by the pipeline.
* @param default_cuda_stream_priority CUDA stream priority used by DALI.
* See `cudaStreamCreateWithPriority` in CUDA documentation
*/
DLL_PUBLIC Pipeline(int max_batch_size, int num_threads, int device_id, int64_t seed = -1,
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0, bool set_affinity = false,
int max_num_stream = -1, int default_cuda_stream_priority = 0);
int max_num_stream = -1);

DLL_PUBLIC Pipeline(const string &serialized_pipe,
int max_batch_size = -1, int num_threads = -1, int device_id = -1,
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0, bool set_affinity = false,
int max_num_stream = -1, int default_cuda_stream_priority = 0,
int max_num_stream = -1,
int64_t seed = -1);

virtual DLL_PUBLIC ~Pipeline();
Expand Down Expand Up @@ -587,7 +585,7 @@ class DLL_PUBLIC Pipeline {
void Init(int batch_size, int num_threads, int device_id, int64_t seed, bool pipelined_execution,
bool separated_execution, bool async_execution, bool dynamic_execution,
size_t bytes_per_sample_hint,
bool set_affinity, int max_num_stream, int default_cuda_stream_priority,
bool set_affinity, int max_num_stream,
QueueSizes prefetch_queue_depth = QueueSizes{2});

struct EdgeMeta {
Expand Down Expand Up @@ -716,7 +714,6 @@ class DLL_PUBLIC Pipeline {
size_t bytes_per_sample_hint_ = 0;
int set_affinity_ = 0;
int max_num_stream_ = 0;
int default_cuda_stream_priority_ = 0;
int next_logical_id_ = 0;
int next_internal_logical_id_ = -1;
QueueSizes prefetch_queue_depth_{};
Expand Down
93 changes: 0 additions & 93 deletions dali/pipeline/util/stream_pool.h

This file was deleted.

16 changes: 6 additions & 10 deletions dali/python/backend_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2024,13 +2024,12 @@ PYBIND11_MODULE(backend_impl, m) {
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0,
bool set_affinity = false, int max_num_stream = -1,
int default_cuda_stream_priority = 0) {
bool set_affinity = false, int max_num_stream = -1) {
return std::make_unique<PyPipeline>(
batch_size, num_threads, device_id, seed,
pipelined_execution, prefetch_queue_depth, async_execution, dynamic_execution,
bytes_per_sample_hint, set_affinity,
max_num_stream, default_cuda_stream_priority);
max_num_stream);
}),
"batch_size"_a,
"num_threads"_a,
Expand All @@ -2042,23 +2041,21 @@ PYBIND11_MODULE(backend_impl, m) {
"exec_dynamic"_a = false,
"bytes_per_sample_hint"_a = 0,
"set_affinity"_a = false,
"max_num_stream"_a = -1,
"default_cuda_stream_priority"_a = 0
"max_num_stream"_a = -1
)
// initialize from serialized pipeline
.def(py::init(
[](string serialized_pipe,
int batch_size = -1, int num_threads = -1, int device_id = -1,
bool pipelined_execution = true, int prefetch_queue_depth = 2,
bool async_execution = true, bool dynamic_execution = false,
size_t bytes_per_sample_hint = 0, bool set_affinity = false, int max_num_stream = -1,
int default_cuda_stream_priority = 0) {
size_t bytes_per_sample_hint = 0, bool set_affinity = false, int max_num_stream = -1) {
return std::make_unique<PyPipeline>(
serialized_pipe,
batch_size, num_threads, device_id, pipelined_execution,
prefetch_queue_depth, async_execution, dynamic_execution,
bytes_per_sample_hint, set_affinity,
max_num_stream, default_cuda_stream_priority);
max_num_stream);
}),
"serialized_pipe"_a,
"batch_size"_a = -1,
Expand All @@ -2070,8 +2067,7 @@ PYBIND11_MODULE(backend_impl, m) {
"exec_dynamic"_a = true,
"bytes_per_sample_hint"_a = 0,
"set_affinity"_a = false,
"max_num_stream"_a = -1,
"default_cuda_stream_priority"_a = 0
"max_num_stream"_a = -1
)
.def("AddOperator",
static_cast<int (Pipeline::*)(const OpSpec &, const std::string &)>
Expand Down
Loading

0 comments on commit c56b613

Please sign in to comment.