Skip to content

Commit

Permalink
Add prefetching support in RALI pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
shobana-mcw committed Jun 2, 2021
1 parent 0e1a43a commit 0d5cf66
Show file tree
Hide file tree
Showing 21 changed files with 105 additions and 71 deletions.
10 changes: 6 additions & 4 deletions rocAL/rocAL/include/cifar10_data_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class CIFAR10DataLoader : public LoaderModule
public:
#if ENABLE_HIP
explicit CIFAR10DataLoader(DeviceResourcesHip dev_resources);
#else
#else
explicit CIFAR10DataLoader(DeviceResources dev_resources);
#endif
#endif
~CIFAR10DataLoader() override;
LoaderModuleStatus load_next() override;
void initialize(ReaderConfig reader_cfg, DecoderConfig decoder_cfg, RaliMemType mem_type, unsigned batch_size, bool keep_orig_size=true) override;
Expand All @@ -46,6 +46,7 @@ class CIFAR10DataLoader : public LoaderModule
std::vector<std::string> get_id() override;
decoded_image_info get_decode_image_info() override;
Timing timing() override;
void set_prefetch_queue_depth(size_t prefetch_queue_depth) override;
private:
void increment_loader_idx();
bool is_out_of_data();
Expand All @@ -59,7 +60,7 @@ class CIFAR10DataLoader : public LoaderModule
#else
const DeviceResources _dev_resources;
#endif
decoded_image_info _raw_img_info; // image info to store the names. In this case the ID of image is stored in _roi_width field
decoded_image_info _raw_img_info; // image info to store the names. In this case the ID of image is stored in _roi_width field
decoded_image_info _output_decoded_img_info;
bool _initialized = false;
RaliMemType _mem_type;
Expand All @@ -72,7 +73,8 @@ class CIFAR10DataLoader : public LoaderModule
std::vector<size_t> _actual_read_size;
std::vector<std::string> _output_names;
CircularBuffer _circ_buff;
const static size_t CIRC_BUFFER_DEPTH = 3; // Used for circular buffer's internal buffer
// const static size_t CIRC_BUFFER_DEPTH = 3; // Used for circular buffer's internal buffer
size_t _prefetch_queue_depth;
TimingDBG _file_load_time, _swap_handle_time;
size_t _loader_idx;
size_t _shard_count = 1;
Expand Down
8 changes: 4 additions & 4 deletions rocAL/rocAL/include/circular_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ class CircularBuffer
{
public:
#if ENABLE_HIP
CircularBuffer(DeviceResourcesHip hipres, size_t buffer_depth );
CircularBuffer(DeviceResourcesHip hipres);
#else
CircularBuffer(DeviceResources ocl, size_t buffer_depth );
CircularBuffer(DeviceResources ocl);
#endif
~CircularBuffer();
void init(RaliMemType output_mem_type, size_t output_mem_size);
void init(RaliMemType output_mem_type, size_t output_mem_size, size_t buff_depth);
void sync();// Syncs device buffers with host
void unblock_reader();// Unblocks the thread currently waiting on a call to get_read_buffer
void unblock_writer();// Unblocks the thread currently waiting on get_write_buffer
Expand All @@ -69,7 +69,7 @@ class CircularBuffer
void increment_write_ptr();
bool full();
bool empty();
const size_t BUFF_DEPTH;
size_t BUFF_DEPTH;
decoded_image_info _last_image_info;
std::queue<decoded_image_info> _circ_image_info;//!< Stores the loaded images names, decoded_width and decoded_height(data is stored in the _circ_buff)
std::mutex _names_buff_lock;
Expand Down
4 changes: 2 additions & 2 deletions rocAL/rocAL/include/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ THE SOFTWARE.

struct Context
{
explicit Context(size_t batch_size, RaliAffinity affinity, int gpu_id , size_t cpu_thread_count, RaliTensorDataType output_tensor_type ):
explicit Context(size_t batch_size, RaliAffinity affinity, int gpu_id , size_t cpu_thread_count, size_t prefetch_queue_depth, RaliTensorDataType output_tensor_type ):
affinity(affinity),
_user_batch_size(batch_size)
{
LOG("Processing on " + STR(((affinity == RaliAffinity::CPU)?" CPU": " GPU")))
master_graph = std::make_shared<MasterGraph>(batch_size, affinity, gpu_id, cpu_thread_count, output_tensor_type);
master_graph = std::make_shared<MasterGraph>(batch_size, affinity, gpu_id, cpu_thread_count, prefetch_queue_depth, output_tensor_type);
_internal_batch_size = master_graph->internal_batch_size();
}
~Context()
Expand Down
9 changes: 5 additions & 4 deletions rocAL/rocAL/include/image_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

#pragma once
#pragma once

#include <string>
#include <thread>
Expand All @@ -38,7 +38,7 @@ class ImageLoader : public LoaderModule {
explicit ImageLoader(DeviceResourcesHip dev_resources);
#else
explicit ImageLoader(DeviceResources dev_resources);
#endif
#endif
~ImageLoader() override;
LoaderModuleStatus load_next() override;
void initialize(ReaderConfig reader_cfg, DecoderConfig decoder_cfg, RaliMemType mem_type, unsigned batch_size, bool keep_orig_size=false) override;
Expand All @@ -52,6 +52,7 @@ class ImageLoader : public LoaderModule {
LoaderModuleStatus set_cpu_sched_policy(struct sched_param sched_policy);
std::vector<std::string> get_id() override;
decoded_image_info get_decode_image_info() override;
void set_prefetch_queue_depth(size_t prefetch_queue_depth) override;
private:
bool is_out_of_data();
void de_init();
Expand All @@ -75,10 +76,10 @@ class ImageLoader : public LoaderModule {
bool _is_initialized;
bool _stopped = false;
bool _loop;//<! If true the reader will wrap around at the end of the media (files/images/...) and wouldn't stop
const static size_t CIRC_BUFFER_DEPTH = 3; // Used for circular buffer's internal buffer
size_t CIRC_BUFFER_DEPTH; // Used for circular buffer's internal buffer
size_t _image_counter = 0;//!< How many images have been loaded already
size_t _remaining_image_count;//!< How many images are there yet to be loaded
bool _decoder_keep_original = false;
std::shared_ptr<RandomBBoxCrop_MetaDataReader> _randombboxcrop_meta_data_reader = nullptr;
std::shared_ptr<RandomBBoxCrop_MetaDataReader> _randombboxcrop_meta_data_reader = nullptr;
};

12 changes: 7 additions & 5 deletions rocAL/rocAL/include/image_loader_sharded.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class ImageLoaderSharded : public LoaderModule
public:
#if ENABLE_HIP
explicit ImageLoaderSharded(DeviceResourcesHip dev_resources);
#else
#else
explicit ImageLoaderSharded(DeviceResources dev_resources);
#endif
#endif
~ImageLoaderSharded() override;
LoaderModuleStatus load_next() override;
void initialize(ReaderConfig reader_cfg, DecoderConfig decoder_cfg, RaliMemType mem_type, unsigned batch_size, bool keep_orig_size=false) override;
Expand All @@ -46,18 +46,20 @@ class ImageLoaderSharded : public LoaderModule
std::vector<std::string> get_id() override;
decoded_image_info get_decode_image_info() override;
Timing timing() override;
void set_prefetch_queue_depth(size_t prefetch_queue_depth) override;
private:
void increment_loader_idx();
#if ENABLE_HIP
#if ENABLE_HIP
const DeviceResourcesHip _dev_resources;
#else
#else
const DeviceResources _dev_resources;
#endif
#endif
bool _initialized = false;
std::vector<std::shared_ptr<ImageLoader>> _loaders;
size_t _loader_idx;
size_t _shard_count = 1;
void fast_forward_through_empty_loaders();
size_t _prefetch_queue_depth;

Image *_output_image;
std::shared_ptr<RandomBBoxCrop_MetaDataReader> _randombboxcrop_meta_data_reader = nullptr;
Expand Down
3 changes: 2 additions & 1 deletion rocAL/rocAL/include/loader_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ enum class LoaderModuleStatus
};

/*! \class LoaderModule The interface defining the API and requirements of loader modules*/
class LoaderModule
class LoaderModule
{
public:
virtual void initialize(ReaderConfig reader_config, DecoderConfig decoder_config, RaliMemType mem_type, unsigned batch_size, bool keep_orig_size) = 0;
Expand All @@ -55,6 +55,7 @@ class LoaderModule
virtual std::vector<std::string> get_id() = 0; // returns the id of the last batch of images/frames loaded
virtual void start_loading() = 0; // starts internal loading thread
virtual decoded_image_info get_decode_image_info() = 0;
virtual void set_prefetch_queue_depth(size_t prefetch_queue_depth) = 0;
// introduce meta data reader
virtual void set_random_bbox_data_reader(std::shared_ptr<RandomBBoxCrop_MetaDataReader> randombboxcrop_meta_data_reader) = 0;
};
Expand Down
9 changes: 7 additions & 2 deletions rocAL/rocAL/include/master_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MasterGraph
{
public:
enum class Status { OK = 0, NOT_RUNNING = 1, NO_MORE_DATA = 2, NOT_IMPLEMENTED };
MasterGraph(size_t batch_size, RaliAffinity affinity, int gpu_id, size_t cpu_threads, RaliTensorDataType output_tensor_data_type);
MasterGraph(size_t batch_size, RaliAffinity affinity, int gpu_id, size_t cpu_threads, size_t prefetch_queue_depth, RaliTensorDataType output_tensor_data_type);
~MasterGraph();
Status reset();
size_t remaining_images_count();
Expand Down Expand Up @@ -135,7 +135,6 @@ class MasterGraph
std::shared_ptr<RandomBBoxCrop_MetaDataReader> _randombboxcrop_meta_data_reader = nullptr;
bool _first_run = true;
bool _processing;//!< Indicates if internal processing thread should keep processing or not
const static unsigned OUTPUT_RING_BUFFER_DEPTH = 3;
const static unsigned SAMPLE_SIZE = sizeof(unsigned char);
int _remaining_images_count;//!< Keeps the count of remaining images yet to be processed for the user,
bool _loop;//!< Indicates if user wants to indefinitely loops through images or not
Expand All @@ -145,6 +144,7 @@ class MasterGraph
bool _output_routine_finished_processing = false;
const RaliTensorDataType _out_data_type;
bool _is_random_bbox_crop = false;
size_t _prefetch_queue_depth;
};

template <typename T>
Expand Down Expand Up @@ -189,6 +189,7 @@ template<> inline std::shared_ptr<ImageLoaderNode> MasterGraph::add_node(const s
THROW("A loader already exists, cannot have more than one loader")
auto node = std::make_shared<ImageLoaderNode>(outputs[0], _device.resources());
_loader_module = node->get_loader_module();
_loader_module->set_prefetch_queue_depth(_prefetch_queue_depth);
_root_nodes.push_back(node);
for(auto& output: outputs)
_image_map.insert(make_pair(output, node));
Expand All @@ -201,6 +202,7 @@ template<> inline std::shared_ptr<ImageLoaderSingleShardNode> MasterGraph::add_n
THROW("A loader already exists, cannot have more than one loader")
auto node = std::make_shared<ImageLoaderSingleShardNode>(outputs[0], _device.resources());
_loader_module = node->get_loader_module();
_loader_module->set_prefetch_queue_depth(_prefetch_queue_depth);
_root_nodes.push_back(node);
for(auto& output: outputs)
_image_map.insert(make_pair(output, node));
Expand All @@ -213,6 +215,7 @@ template<> inline std::shared_ptr<FusedJpegCropNode> MasterGraph::add_node(const
THROW("A loader already exists, cannot have more than one loader")
auto node = std::make_shared<FusedJpegCropNode>(outputs[0], _device.resources());
_loader_module = node->get_loader_module();
_loader_module->set_prefetch_queue_depth(_prefetch_queue_depth);
_loader_module->set_random_bbox_data_reader(_randombboxcrop_meta_data_reader);
_root_nodes.push_back(node);
for(auto& output: outputs)
Expand All @@ -227,6 +230,7 @@ template<> inline std::shared_ptr<FusedJpegCropSingleShardNode> MasterGraph::add
THROW("A loader already exists, cannot have more than one loader")
auto node = std::make_shared<FusedJpegCropSingleShardNode>(outputs[0], _device.resources());
_loader_module = node->get_loader_module();
_loader_module->set_prefetch_queue_depth(_prefetch_queue_depth);
_loader_module->set_random_bbox_data_reader(_randombboxcrop_meta_data_reader);
_root_nodes.push_back(node);
for(auto& output: outputs)
Expand All @@ -244,6 +248,7 @@ template<> inline std::shared_ptr<Cifar10LoaderNode> MasterGraph::add_node(const
THROW("A loader already exists, cannot have more than one loader")
auto node = std::make_shared<Cifar10LoaderNode>(outputs[0], _device.resources());
_loader_module = node->get_loader_module();
_loader_module->set_prefetch_queue_depth(_prefetch_queue_depth);
_root_nodes.push_back(node);
for(auto& output: outputs)
_image_map.insert(make_pair(output, node));
Expand Down
10 changes: 5 additions & 5 deletions rocAL/rocAL/include/node_fused_jpeg_crop.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ class FusedJpegCropNode: public Node
/// \param device_resources shard count from user

/// internal_shard_count number of loader/decoders are created and each shard is loaded and decoded using separate and independent resources increasing the parallelism and performance.
#if ENABLE_HIP
#if ENABLE_HIP
FusedJpegCropNode(Image *output, DeviceResourcesHip device_resources_hip);
#else
#else
FusedJpegCropNode(Image *output, DeviceResources device_resources);
#endif
#endif
~FusedJpegCropNode() override;
FusedJpegCropNode() = delete;
///
Expand All @@ -59,8 +59,8 @@ class FusedJpegCropNode: public Node
Parameter<float>* _y_drift;
Parameter<float>* _area_factor;
Parameter<float>* _aspect_ratio;
constexpr static float X_DRIFT_RANGE [2] = {0, 1};
constexpr static float X_DRIFT_RANGE [2] = {0, 1};
constexpr static float Y_DRIFT_RANGE [2] = {0, 1};
constexpr static float AREA_FACTOR_RANGE[2] = {0.08, 0.99};
constexpr static float AREA_FACTOR_RANGE[2] = {0.08, 0.99};
constexpr static float ASPECT_RATIO_RANGE[2] = {0.75, 1.33};
};
8 changes: 4 additions & 4 deletions rocAL/rocAL/include/node_fused_jpeg_crop_single_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ class FusedJpegCropSingleShardNode: public Node
public:
#if ENABLE_HIP
FusedJpegCropSingleShardNode(Image *output, DeviceResourcesHip device_resources);
#else
#else
FusedJpegCropSingleShardNode(Image *output, DeviceResources device_resources);
#endif
#endif
~FusedJpegCropSingleShardNode() override;

/// \param user_shard_count shard count from user
Expand All @@ -35,8 +35,8 @@ class FusedJpegCropSingleShardNode: public Node
Parameter<float>* _y_drift;
Parameter<float>* _area_factor;
Parameter<float>* _aspect_ratio;
constexpr static float X_DRIFT_RANGE [2] = {0, 1};
constexpr static float X_DRIFT_RANGE [2] = {0, 1};
constexpr static float Y_DRIFT_RANGE [2] = {0, 1};
constexpr static float AREA_FACTOR_RANGE[2] = {0.08, 0.99};
constexpr static float AREA_FACTOR_RANGE[2] = {0.08, 0.99};
constexpr static float ASPECT_RATIO_RANGE[2] = {0.75, 1.33};
};
4 changes: 2 additions & 2 deletions rocAL/rocAL/include/node_image_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class ImageLoaderNode: public Node
/// internal_shard_count number of loader/decoders are created and each shard is loaded and decoded using separate and independent resources increasing the parallelism and performance.
#if ENABLE_HIP
ImageLoaderNode(Image *output, DeviceResourcesHip device_resources);
#else
#else
ImageLoaderNode(Image *output, DeviceResources device_resources);
#endif
#endif
~ImageLoaderNode() override;
ImageLoaderNode() = delete;
///
Expand Down
4 changes: 2 additions & 2 deletions rocAL/rocAL/include/node_image_loader_single_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ class ImageLoaderSingleShardNode: public Node
public:
#if ENABLE_HIP
ImageLoaderSingleShardNode(Image *output, DeviceResourcesHip device_resources);
#else
#else
ImageLoaderSingleShardNode(Image *output, DeviceResources device_resources);
#endif
#endif
~ImageLoaderSingleShardNode() override;

/// \param user_shard_count shard count from user
Expand Down
2 changes: 1 addition & 1 deletion rocAL/rocAL/include/rali_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ THE SOFTWARE.
/// \param gpu_id
/// \param cpu_thread_count
/// \return
extern "C" RaliContext RALI_API_CALL raliCreate(size_t batch_size, RaliProcessMode affinity, int gpu_id = 0, size_t cpu_thread_count = 1, RaliTensorOutputType output_tensor_data_type = RaliTensorOutputType::RALI_FP32);
extern "C" RaliContext RALI_API_CALL raliCreate(size_t batch_size, RaliProcessMode affinity, int gpu_id = 0, size_t cpu_thread_count = 1, size_t prefetch_queue_depth = 3, RaliTensorOutputType output_tensor_data_type = RaliTensorOutputType::RALI_FP32);
//extern "C" RaliContext RALI_API_CALL raliCreate(size_t batch_size, RaliProcessMode affinity, int gpu_id = 0, size_t cpu_thread_count = 1);

///
Expand Down
Loading

0 comments on commit 0d5cf66

Please sign in to comment.