From 09541816c4f287d34eabd9528e9b039062700915 Mon Sep 17 00:00:00 2001 From: Cyprien Noel Date: Wed, 29 Apr 2015 10:52:45 -0700 Subject: [PATCH] Added DataReader for parallel training with one DB session - Makes sure each solver accesses a different subset of the data - Sequential reading of DB, for performance - Prefetches a configurable amount of data to host memory - Distributes data to solvers in round-robin way for determinism --- include/caffe/data_layers.hpp | 8 +- include/caffe/data_reader.hpp | 82 ++++++++++++++++ src/caffe/data_reader.cpp | 133 ++++++++++++++++++++++++++ src/caffe/layers/data_layer.cpp | 40 +++----- src/caffe/proto/caffe.proto | 5 + src/caffe/test/test_layer_factory.cpp | 14 ++- src/caffe/test/test_upgrade_proto.cpp | 12 +++ src/caffe/util/blocking_queue.cpp | 1 + 8 files changed, 260 insertions(+), 35 deletions(-) create mode 100644 include/caffe/data_reader.hpp create mode 100644 src/caffe/data_reader.cpp diff --git a/include/caffe/data_layers.hpp b/include/caffe/data_layers.hpp index c68c0e794ae..75005624288 100644 --- a/include/caffe/data_layers.hpp +++ b/include/caffe/data_layers.hpp @@ -5,11 +5,11 @@ #include #include -#include "boost/scoped_ptr.hpp" #include "hdf5.h" #include "caffe/blob.hpp" #include "caffe/common.hpp" +#include "caffe/data_reader.hpp" #include "caffe/data_transformer.hpp" #include "caffe/filler.hpp" #include "caffe/internal_thread.hpp" @@ -93,8 +93,7 @@ class BasePrefetchingDataLayer : template class DataLayer : public BasePrefetchingDataLayer { public: - explicit DataLayer(const LayerParameter& param) - : BasePrefetchingDataLayer(param) {} + explicit DataLayer(const LayerParameter& param); virtual ~DataLayer(); virtual void DataLayerSetUp(const vector*>& bottom, const vector*>& top); @@ -107,8 +106,7 @@ class DataLayer : public BasePrefetchingDataLayer { protected: virtual void load_batch(Batch* batch); - shared_ptr db_; - shared_ptr cursor_; + DataReader reader_; }; /** diff --git a/include/caffe/data_reader.hpp b/include/caffe/data_reader.hpp new file mode 100644 index 00000000000..10694e230ea --- /dev/null +++ b/include/caffe/data_reader.hpp @@ -0,0 +1,82 @@ +#ifndef CAFFE_DATA_READER_HPP_ +#define CAFFE_DATA_READER_HPP_ + +#include +#include +#include + +#include "caffe/common.hpp" +#include "caffe/internal_thread.hpp" +#include "caffe/util/blocking_queue.hpp" +#include "caffe/util/db.hpp" + +namespace caffe { + +/** + * @brief Reads data from a source to queues available to data layers. + * A single reading thread is created per source, even if multiple solvers + * are running in parallel, e.g. for multi-GPU training. This makes sure + * databases are read sequentially, and that each solver accesses a different + * subset of the database. Data is distributed to solvers in a round-robin + * way to keep parallel training deterministic. + */ +class DataReader { + public: + explicit DataReader(const LayerParameter& param); + ~DataReader(); + + inline BlockingQueue& free() const { + return queues_->free_; + } + inline BlockingQueue& full() const { + return queues_->full_; + } + + protected: + // Queue pairs are shared between a body and its readers + class QueuePair { + public: + explicit QueuePair(int size); + ~QueuePair(); + + BlockingQueue free_; + BlockingQueue full_; + + DISABLE_COPY_AND_ASSIGN(QueuePair); + }; + + // A single body is created per source + class Body : public InternalThread { + public: + explicit Body(const LayerParameter& param); + virtual ~Body(); + + protected: + void InternalThreadEntry(); + void read_one(db::Cursor* cursor, int index); + + const LayerParameter param_; + vector > reader_queues_; + + friend class DataReader; + + DISABLE_COPY_AND_ASSIGN(Body); + }; + + // A source is uniquely identified by its layer name + path, in case + // the same database is read from two different locations in the net. + static inline string source_key(const LayerParameter& param) { + return param.name() + ":" + param.data_param().source(); + } + + const shared_ptr queues_; + shared_ptr body_; + + static map > bodies_; + +DISABLE_COPY_AND_ASSIGN(DataReader); +}; + +} // namespace caffe + +#endif // CAFFE_DATA_READER_HPP_ diff --git a/src/caffe/data_reader.cpp b/src/caffe/data_reader.cpp new file mode 100644 index 00000000000..24c3945b443 --- /dev/null +++ b/src/caffe/data_reader.cpp @@ -0,0 +1,133 @@ +#include +#include +#include +#include + +#include "caffe/common.hpp" +#include "caffe/data_layers.hpp" +#include "caffe/data_reader.hpp" +#include "caffe/proto/caffe.pb.h" + +namespace caffe { + +using boost::weak_ptr; + +map > DataReader::bodies_; +static boost::mutex bodies_mutex_; + +// TODO single solver until multi-gpu merge +static const int solver_count = 1; + +DataReader::DataReader(const LayerParameter& param) + : queues_(new QueuePair( // + param.data_param().prefetch() * param.data_param().batch_size())) { + // Get or create a body + boost::mutex::scoped_lock lock(bodies_mutex_); + string key = source_key(param); + weak_ptr& weak = bodies_[key]; + body_ = weak.lock(); + if (!body_) { + body_.reset(new Body(param)); + bodies_[key] = weak_ptr(body_); + } + body_->reader_queues_.push_back(queues_); + // Check a single net is trained at a time per process, whether single + // or multi solver. This might also happen if two data layers have same + // name and same source. + CHECK(body_->reader_queues_.size() <= solver_count); +} + +DataReader::~DataReader() { + string key = source_key(body_->param_); + body_.reset(); + boost::mutex::scoped_lock lock(bodies_mutex_); + if (bodies_[key].expired()) { + bodies_.erase(key); + } +} + +// + +DataReader::QueuePair::QueuePair(int size) { + // Initialize the free queue with requested number of datums + for (int i = 0; i < size; ++i) { + free_.push(new Datum()); + } +} + +DataReader::QueuePair::~QueuePair() { + Datum* datum; + while (free_.try_pop(&datum)) { + delete datum; + } + while (full_.try_pop(&datum)) { + delete datum; + } +} + +// + +DataReader::Body::Body(const LayerParameter& param) + : param_(param), + reader_queues_() { + StartInternalThread(); +} + +DataReader::Body::~Body() { + StopInternalThread(); +} + +void DataReader::Body::InternalThreadEntry() { + shared_ptr db(db::GetDB(param_.data_param().backend())); + db->Open(param_.data_param().source(), db::READ); + shared_ptr cursor(db->NewCursor()); + try { + // Synchronize with main thread to make sure we see at least one queue + { + boost::mutex::scoped_lock lock(bodies_mutex_); + CHECK_GE(reader_queues_.size(), 1); + } + // To ensure deterministic runs, only start running once all solvers + // are ready. But solvers need to peek on one item during initialization, + // so to allow the root solver to start before the other solvers are + // created, read one item. + int index = 0; + if (param_.phase() == TRAIN) { + read_one(cursor.get(), index++); + + // Wait on remaining solvers + while (!must_stop()) { + usleep(100 * 1000); + boost::mutex::scoped_lock lock(bodies_mutex_); + if (reader_queues_.size() == solver_count) { + break; + } + } + } + // Main loop + while (!must_stop()) { + if (index == reader_queues_.size()) { + index = 0; + } + read_one(cursor.get(), index++); + } + } catch (boost::thread_interrupted&) { + // Interrupted exception is expected on shutdown + } +} + +void DataReader::Body::read_one(db::Cursor* cursor, int index) { + Datum* datum = reader_queues_[index]->free_.pop(); + // TODO deserialize in-place instead of copy? + datum->ParseFromString(cursor->value()); + reader_queues_[index]->full_.push(datum); + + // go to the next iter + cursor->Next(); + if (!cursor->valid()) { + DLOG(INFO) << "Restarting data prefetching from start."; + cursor->SeekToFirst(); + } +} + +} // namespace caffe diff --git a/src/caffe/layers/data_layer.cpp b/src/caffe/layers/data_layer.cpp index 321dbba18e1..25571379f57 100644 --- a/src/caffe/layers/data_layer.cpp +++ b/src/caffe/layers/data_layer.cpp @@ -11,11 +11,15 @@ #include "caffe/proto/caffe.pb.h" #include "caffe/util/benchmark.hpp" #include "caffe/util/io.hpp" -#include "caffe/util/math_functions.hpp" -#include "caffe/util/rng.hpp" namespace caffe { +template +DataLayer::DataLayer(const LayerParameter& param) + : BasePrefetchingDataLayer(param), + reader_(param) { +} + template DataLayer::~DataLayer() { this->StopInternalThread(); @@ -24,23 +28,8 @@ DataLayer::~DataLayer() { template void DataLayer::DataLayerSetUp(const vector*>& bottom, const vector*>& top) { - // Initialize DB - db_.reset(db::GetDB(this->layer_param_.data_param().backend())); - db_->Open(this->layer_param_.data_param().source(), db::READ); - cursor_.reset(db_->NewCursor()); - - // Check if we should randomly skip a few data points - if (this->layer_param_.data_param().rand_skip()) { - unsigned int skip = caffe_rng_rand() % - this->layer_param_.data_param().rand_skip(); - LOG(INFO) << "Skipping first " << skip << " data points."; - while (skip-- > 0) { - cursor_->Next(); - } - } // Read a data point, and use it to initialize the top blob. - Datum datum; - datum.ParseFromString(cursor_->value()); + Datum& datum = *(reader_.full().peek()); bool force_color = this->layer_param_.data_param().force_encoded_color(); if ((force_color && DecodeDatum(&datum, true)) || @@ -97,8 +86,7 @@ void DataLayer::load_batch(Batch* batch) { const int crop_size = this->layer_param_.transform_param().crop_size(); bool force_color = this->layer_param_.data_param().force_encoded_color(); if (batch_size == 1 && crop_size == 0) { - Datum datum; - datum.ParseFromString(cursor_->value()); + Datum& datum = *(reader_.full().peek()); if (datum.encoded()) { if (force_color) { DecodeDatum(&datum, true); @@ -121,9 +109,7 @@ void DataLayer::load_batch(Batch* batch) { for (int item_id = 0; item_id < batch_size; ++item_id) { timer.Start(); // get a blob - Datum datum; - datum.ParseFromString(cursor_->value()); - + Datum& datum = *(reader_.full().pop("Waiting for data")); cv::Mat cv_img; if (datum.encoded()) { if (force_color) { @@ -153,12 +139,8 @@ void DataLayer::load_batch(Batch* batch) { top_label[item_id] = datum.label(); } trans_time += timer.MicroSeconds(); - // go to the next iter - cursor_->Next(); - if (!cursor_->valid()) { - DLOG(INFO) << "Restarting data prefetching from start."; - cursor_->SeekToFirst(); - } + + reader_.free().push(const_cast(&datum)); } batch_timer.Stop(); DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms."; diff --git a/src/caffe/proto/caffe.proto b/src/caffe/proto/caffe.proto index 5b21cf20028..1aad1bf3c22 100644 --- a/src/caffe/proto/caffe.proto +++ b/src/caffe/proto/caffe.proto @@ -432,6 +432,7 @@ message ConvolutionParameter { } // Message that stores parameters used by DataLayer +// next available ID: 11 (last added: prefetch) message DataParameter { enum DB { LEVELDB = 0; @@ -445,6 +446,7 @@ message DataParameter { // to avoid all asynchronous sgd clients to start at the same point. The skip // point would be set as rand_skip * rand(0,1). Note that rand_skip should not // be larger than the number of keys in the database. + // DEPRECATED. Each solver accesses a different subset of the database. optional uint32 rand_skip = 7 [default = 0]; optional DB backend = 8 [default = LEVELDB]; // DEPRECATED. See TransformationParameter. For data pre-processing, we can do @@ -460,6 +462,9 @@ message DataParameter { optional bool mirror = 6 [default = false]; // Force the encoded image to have 3 color channels optional bool force_encoded_color = 9 [default = false]; + // Prefetch queue (Number of batches to prefetch to host memory, increase if + // data access bandwidth varies). + optional uint32 prefetch = 10 [default = 4]; } // Message that stores parameters used by DropoutLayer diff --git a/src/caffe/test/test_layer_factory.cpp b/src/caffe/test/test_layer_factory.cpp index efb1b37ac42..c86fafd000c 100644 --- a/src/caffe/test/test_layer_factory.cpp +++ b/src/caffe/test/test_layer_factory.cpp @@ -1,11 +1,14 @@ #include #include +#include "boost/scoped_ptr.hpp" #include "gtest/gtest.h" #include "caffe/common.hpp" #include "caffe/layer.hpp" #include "caffe/layer_factory.hpp" +#include "caffe/util/db.hpp" +#include "caffe/util/io.hpp" #include "caffe/test/test_caffe_main.hpp" @@ -21,11 +24,20 @@ TYPED_TEST(LayerFactoryTest, TestCreateLayer) { typename LayerRegistry::CreatorRegistry& registry = LayerRegistry::Registry(); shared_ptr > layer; - LayerParameter layer_param; for (typename LayerRegistry::CreatorRegistry::iterator iter = registry.begin(); iter != registry.end(); ++iter) { // Special case: PythonLayer is checked by pytest if (iter->first == "Python") { continue; } + LayerParameter layer_param; + // Data layers expect a DB + if (iter->first == "Data") { + string tmp; + MakeTempDir(&tmp); + boost::scoped_ptr db(db::GetDB(DataParameter_DB_LEVELDB)); + db->Open(tmp, db::NEW); + db->Close(); + layer_param.mutable_data_param()->set_source(tmp); + } layer_param.set_type(iter->first); layer = LayerRegistry::CreateLayer(layer_param); EXPECT_EQ(iter->first, layer->type()); diff --git a/src/caffe/test/test_upgrade_proto.cpp b/src/caffe/test/test_upgrade_proto.cpp index eec627656ef..006720231a5 100644 --- a/src/caffe/test/test_upgrade_proto.cpp +++ b/src/caffe/test/test_upgrade_proto.cpp @@ -2,12 +2,15 @@ #include #include +#include "boost/scoped_ptr.hpp" #include "google/protobuf/text_format.h" #include "gtest/gtest.h" #include "caffe/blob.hpp" #include "caffe/common.hpp" #include "caffe/layer.hpp" +#include "caffe/util/db.hpp" +#include "caffe/util/io.hpp" #include "caffe/util/upgrade_proto.hpp" #include "caffe/test/test_caffe_main.hpp" @@ -2901,6 +2904,15 @@ TEST_F(NetUpgradeTest, TestUpgradeV1LayerType) { continue; // Empty string isn't actually a valid layer type. } layer_param.set_type(v2_layer_type); + // Data layers expect a DB + if (v2_layer_type == "Data") { + string tmp; + MakeTempDir(&tmp); + boost::scoped_ptr db(db::GetDB(DataParameter_DB_LEVELDB)); + db->Open(tmp, db::NEW); + db->Close(); + layer_param.mutable_data_param()->set_source(tmp); + } layer = LayerRegistry::CreateLayer(layer_param); EXPECT_EQ(v2_layer_type, layer->type()); } diff --git a/src/caffe/util/blocking_queue.cpp b/src/caffe/util/blocking_queue.cpp index e017260dcfe..737e99542d2 100644 --- a/src/caffe/util/blocking_queue.cpp +++ b/src/caffe/util/blocking_queue.cpp @@ -86,5 +86,6 @@ size_t BlockingQueue::size() const { template class BlockingQueue*>; template class BlockingQueue*>; +template class BlockingQueue; } // namespace caffe