Skip to content

Commit

Permalink
Added DataReader for parallel training with one DB session
Browse files Browse the repository at this point in the history
- Makes sure each solver accesses a different subset of the data
- Sequential reading of DB, for performance
- Prefetches a configurable amount of data to host memory
- Distributes data to solvers in round-robin way for determinism
  • Loading branch information
cypof committed Apr 29, 2015
1 parent 2f439ac commit 0954181
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 35 deletions.
8 changes: 3 additions & 5 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
#include <utility>
#include <vector>

#include "boost/scoped_ptr.hpp"
#include "hdf5.h"

#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
Expand Down Expand Up @@ -93,8 +93,7 @@ class BasePrefetchingDataLayer :
template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
explicit DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param) {}
explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
Expand All @@ -107,8 +106,7 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;
DataReader reader_;
};

/**
Expand Down
82 changes: 82 additions & 0 deletions include/caffe/data_reader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef CAFFE_DATA_READER_HPP_
#define CAFFE_DATA_READER_HPP_

#include <map>
#include <string>
#include <vector>

#include "caffe/common.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {

/**
* @brief Reads data from a source to queues available to data layers.
* A single reading thread is created per source, even if multiple solvers
* are running in parallel, e.g. for multi-GPU training. This makes sure
* databases are read sequentially, and that each solver accesses a different
* subset of the database. Data is distributed to solvers in a round-robin
* way to keep parallel training deterministic.
*/
class DataReader {
public:
explicit DataReader(const LayerParameter& param);
~DataReader();

inline BlockingQueue<Datum*>& free() const {
return queues_->free_;
}
inline BlockingQueue<Datum*>& full() const {
return queues_->full_;
}

protected:
// Queue pairs are shared between a body and its readers
class QueuePair {
public:
explicit QueuePair(int size);
~QueuePair();

BlockingQueue<Datum*> free_;
BlockingQueue<Datum*> full_;

DISABLE_COPY_AND_ASSIGN(QueuePair);
};

// A single body is created per source
class Body : public InternalThread {
public:
explicit Body(const LayerParameter& param);
virtual ~Body();

protected:
void InternalThreadEntry();
void read_one(db::Cursor* cursor, int index);

const LayerParameter param_;
vector<shared_ptr<QueuePair> > reader_queues_;

friend class DataReader;

DISABLE_COPY_AND_ASSIGN(Body);
};

// A source is uniquely identified by its layer name + path, in case
// the same database is read from two different locations in the net.
static inline string source_key(const LayerParameter& param) {
return param.name() + ":" + param.data_param().source();
}

const shared_ptr<QueuePair> queues_;
shared_ptr<Body> body_;

static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;

DISABLE_COPY_AND_ASSIGN(DataReader);
};

} // namespace caffe

#endif // CAFFE_DATA_READER_HPP_
133 changes: 133 additions & 0 deletions src/caffe/data_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#include <boost/thread.hpp>
#include <map>
#include <string>
#include <vector>

#include "caffe/common.hpp"
#include "caffe/data_layers.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/proto/caffe.pb.h"

namespace caffe {

using boost::weak_ptr;

map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
static boost::mutex bodies_mutex_;

// TODO single solver until multi-gpu merge
static const int solver_count = 1;

DataReader::DataReader(const LayerParameter& param)
: queues_(new QueuePair( //
param.data_param().prefetch() * param.data_param().batch_size())) {
// Get or create a body
boost::mutex::scoped_lock lock(bodies_mutex_);
string key = source_key(param);
weak_ptr<Body>& weak = bodies_[key];
body_ = weak.lock();
if (!body_) {
body_.reset(new Body(param));
bodies_[key] = weak_ptr<Body>(body_);
}
body_->reader_queues_.push_back(queues_);
// Check a single net is trained at a time per process, whether single
// or multi solver. This might also happen if two data layers have same
// name and same source.
CHECK(body_->reader_queues_.size() <= solver_count);
}

DataReader::~DataReader() {
string key = source_key(body_->param_);
body_.reset();
boost::mutex::scoped_lock lock(bodies_mutex_);
if (bodies_[key].expired()) {
bodies_.erase(key);
}
}

//

DataReader::QueuePair::QueuePair(int size) {
// Initialize the free queue with requested number of datums
for (int i = 0; i < size; ++i) {
free_.push(new Datum());
}
}

DataReader::QueuePair::~QueuePair() {
Datum* datum;
while (free_.try_pop(&datum)) {
delete datum;
}
while (full_.try_pop(&datum)) {
delete datum;
}
}

//

DataReader::Body::Body(const LayerParameter& param)
: param_(param),
reader_queues_() {
StartInternalThread();
}

DataReader::Body::~Body() {
StopInternalThread();
}

void DataReader::Body::InternalThreadEntry() {
shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
db->Open(param_.data_param().source(), db::READ);
shared_ptr<db::Cursor> cursor(db->NewCursor());
try {
// Synchronize with main thread to make sure we see at least one queue
{
boost::mutex::scoped_lock lock(bodies_mutex_);
CHECK_GE(reader_queues_.size(), 1);
}
// To ensure deterministic runs, only start running once all solvers
// are ready. But solvers need to peek on one item during initialization,
// so to allow the root solver to start before the other solvers are
// created, read one item.
int index = 0;
if (param_.phase() == TRAIN) {
read_one(cursor.get(), index++);

// Wait on remaining solvers
while (!must_stop()) {
usleep(100 * 1000);
boost::mutex::scoped_lock lock(bodies_mutex_);
if (reader_queues_.size() == solver_count) {
break;
}
}
}
// Main loop
while (!must_stop()) {
if (index == reader_queues_.size()) {
index = 0;
}
read_one(cursor.get(), index++);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
}

void DataReader::Body::read_one(db::Cursor* cursor, int index) {
Datum* datum = reader_queues_[index]->free_.pop();
// TODO deserialize in-place instead of copy?
datum->ParseFromString(cursor->value());
reader_queues_[index]->full_.push(datum);

// go to the next iter
cursor->Next();
if (!cursor->valid()) {
DLOG(INFO) << "Restarting data prefetching from start.";
cursor->SeekToFirst();
}
}

} // namespace caffe
40 changes: 11 additions & 29 deletions src/caffe/layers/data_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/benchmark.hpp"
#include "caffe/util/io.hpp"
#include "caffe/util/math_functions.hpp"
#include "caffe/util/rng.hpp"

namespace caffe {

template <typename Dtype>
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param),
reader_(param) {
}

template <typename Dtype>
DataLayer<Dtype>::~DataLayer() {
this->StopInternalThread();
Expand All @@ -24,23 +28,8 @@ DataLayer<Dtype>::~DataLayer() {
template <typename Dtype>
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
// Initialize DB
db_.reset(db::GetDB(this->layer_param_.data_param().backend()));
db_->Open(this->layer_param_.data_param().source(), db::READ);
cursor_.reset(db_->NewCursor());

// Check if we should randomly skip a few data points
if (this->layer_param_.data_param().rand_skip()) {
unsigned int skip = caffe_rng_rand() %
this->layer_param_.data_param().rand_skip();
LOG(INFO) << "Skipping first " << skip << " data points.";
while (skip-- > 0) {
cursor_->Next();
}
}
// Read a data point, and use it to initialize the top blob.
Datum datum;
datum.ParseFromString(cursor_->value());
Datum& datum = *(reader_.full().peek());

bool force_color = this->layer_param_.data_param().force_encoded_color();
if ((force_color && DecodeDatum(&datum, true)) ||
Expand Down Expand Up @@ -97,8 +86,7 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
const int crop_size = this->layer_param_.transform_param().crop_size();
bool force_color = this->layer_param_.data_param().force_encoded_color();
if (batch_size == 1 && crop_size == 0) {
Datum datum;
datum.ParseFromString(cursor_->value());
Datum& datum = *(reader_.full().peek());
if (datum.encoded()) {
if (force_color) {
DecodeDatum(&datum, true);
Expand All @@ -121,9 +109,7 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
for (int item_id = 0; item_id < batch_size; ++item_id) {
timer.Start();
// get a blob
Datum datum;
datum.ParseFromString(cursor_->value());

Datum& datum = *(reader_.full().pop("Waiting for data"));
cv::Mat cv_img;
if (datum.encoded()) {
if (force_color) {
Expand Down Expand Up @@ -153,12 +139,8 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
top_label[item_id] = datum.label();
}
trans_time += timer.MicroSeconds();
// go to the next iter
cursor_->Next();
if (!cursor_->valid()) {
DLOG(INFO) << "Restarting data prefetching from start.";
cursor_->SeekToFirst();
}

reader_.free().push(const_cast<Datum*>(&datum));
}
batch_timer.Stop();
DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
Expand Down
5 changes: 5 additions & 0 deletions src/caffe/proto/caffe.proto
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ message ConvolutionParameter {
}

// Message that stores parameters used by DataLayer
// next available ID: 11 (last added: prefetch)
message DataParameter {
enum DB {
LEVELDB = 0;
Expand All @@ -445,6 +446,7 @@ message DataParameter {
// to avoid all asynchronous sgd clients to start at the same point. The skip
// point would be set as rand_skip * rand(0,1). Note that rand_skip should not
// be larger than the number of keys in the database.
// DEPRECATED. Each solver accesses a different subset of the database.
optional uint32 rand_skip = 7 [default = 0];
optional DB backend = 8 [default = LEVELDB];
// DEPRECATED. See TransformationParameter. For data pre-processing, we can do
Expand All @@ -460,6 +462,9 @@ message DataParameter {
optional bool mirror = 6 [default = false];
// Force the encoded image to have 3 color channels
optional bool force_encoded_color = 9 [default = false];
// Prefetch queue (Number of batches to prefetch to host memory, increase if
// data access bandwidth varies).
optional uint32 prefetch = 10 [default = 4];
}

// Message that stores parameters used by DropoutLayer
Expand Down
14 changes: 13 additions & 1 deletion src/caffe/test/test_layer_factory.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include <map>
#include <string>

#include "boost/scoped_ptr.hpp"
#include "gtest/gtest.h"

#include "caffe/common.hpp"
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
#include "caffe/util/db.hpp"
#include "caffe/util/io.hpp"

#include "caffe/test/test_caffe_main.hpp"

Expand All @@ -21,11 +24,20 @@ TYPED_TEST(LayerFactoryTest, TestCreateLayer) {
typename LayerRegistry<Dtype>::CreatorRegistry& registry =
LayerRegistry<Dtype>::Registry();
shared_ptr<Layer<Dtype> > layer;
LayerParameter layer_param;
for (typename LayerRegistry<Dtype>::CreatorRegistry::iterator iter =
registry.begin(); iter != registry.end(); ++iter) {
// Special case: PythonLayer is checked by pytest
if (iter->first == "Python") { continue; }
LayerParameter layer_param;
// Data layers expect a DB
if (iter->first == "Data") {
string tmp;
MakeTempDir(&tmp);
boost::scoped_ptr<db::DB> db(db::GetDB(DataParameter_DB_LEVELDB));
db->Open(tmp, db::NEW);
db->Close();
layer_param.mutable_data_param()->set_source(tmp);
}
layer_param.set_type(iter->first);
layer = LayerRegistry<Dtype>::CreateLayer(layer_param);
EXPECT_EQ(iter->first, layer->type());
Expand Down
Loading

0 comments on commit 0954181

Please sign in to comment.