From 60d2cb043d4fde85ba99f7db8d976ddb1e60e4c4 Mon Sep 17 00:00:00 2001 From: Cyprien Noel Date: Tue, 21 Apr 2015 17:31:44 -0700 Subject: [PATCH] Data_layer (#1933) update and URI sources --- Makefile | 18 +- Makefile.config.example | 3 + examples/mnist/convert_mnist_data.cpp | 9 + .../siamese/convert_mnist_siamese_data.cpp | 10 + include/caffe/common.hpp | 21 +- include/caffe/data_layers.hpp | 57 +++-- include/caffe/internal_thread.hpp | 24 +- include/caffe/scheme.hpp | 206 ++++++++++++++++ include/caffe/syncedmem.hpp | 4 + include/caffe/util/blocking_queue.hpp | 50 ++++ include/caffe/util/db.hpp | 6 + include/caffe/util/io.hpp | 43 ++++ src/caffe/common.cpp | 13 +- src/caffe/internal_thread.cpp | 63 +++-- src/caffe/layers/base_data_layer.cpp | 93 +++++--- src/caffe/layers/base_data_layer.cu | 15 +- src/caffe/layers/data_layer.cpp | 180 ++++++++------ src/caffe/layers/hdf5_data_layer.cpp | 5 + src/caffe/layers/hdf5_data_layer.cu | 4 + src/caffe/layers/hdf5_output_layer.cpp | 4 + src/caffe/layers/hdf5_output_layer.cu | 4 + src/caffe/layers/image_data_layer.cpp | 27 ++- src/caffe/layers/window_data_layer.cpp | 20 +- src/caffe/net.cpp | 20 ++ src/caffe/proto/caffe.proto | 16 +- src/caffe/scheme.cpp | 219 ++++++++++++++++++ src/caffe/syncedmem.cpp | 12 + src/caffe/test/test_data_layer.cpp | 174 ++++++++++---- src/caffe/test/test_data_transformer.cpp | 1 - src/caffe/test/test_hdf5_output_layer.cpp | 4 + src/caffe/test/test_hdf5data_layer.cpp | 4 + src/caffe/test/test_internal_thread.cpp | 4 +- src/caffe/test/test_io.cpp | 35 +++ src/caffe/test/test_upgrade_proto.cpp | 4 +- src/caffe/util/blocking_queue.cpp | 87 +++++++ src/caffe/util/db.cpp | 23 +- src/caffe/util/io.cpp | 81 +++++++ src/caffe/util/upgrade_proto.cpp | 18 +- 38 files changed, 1349 insertions(+), 232 deletions(-) create mode 100644 include/caffe/scheme.hpp create mode 100644 include/caffe/util/blocking_queue.hpp create mode 100644 src/caffe/scheme.cpp create mode 100644 src/caffe/util/blocking_queue.cpp diff --git a/Makefile b/Makefile index db0f531eaa0..948501de86c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT := caffe -CONFIG_FILE := Makefile.config +CONFIG_FILE ?= Makefile.config include $(CONFIG_FILE) BUILD_DIR_LINK := $(BUILD_DIR) @@ -161,8 +161,11 @@ ifneq ($(CPU_ONLY), 1) LIBRARY_DIRS += $(CUDA_LIB_DIR) LIBRARIES := cudart cublas curand endif -LIBRARIES += glog gflags protobuf leveldb snappy \ - lmdb boost_system hdf5_hl hdf5 m \ +ifneq ($(NO_IO_DEPENDENCIES), 1) + LIBRARIES += leveldb lmdb snappy hdf5_hl hdf5 # TODO opencv +endif +LIBRARIES += glog gflags protobuf m \ + boost_system \ opencv_core opencv_highgui opencv_imgproc PYTHON_LIBRARIES := boost_python python2.7 WARNINGS := -Wall -Wno-sign-compare @@ -271,6 +274,8 @@ endif # Debugging ifeq ($(DEBUG), 1) COMMON_FLAGS += -DDEBUG -g -O0 + # Compile issue in DEBUG on MAC (https://svn.boost.org/trac/boost/ticket/9392) + COMMON_FLAGS += -DBOOST_NOINLINE='__attribute__ ((noinline))' NVCCFLAGS += -G else COMMON_FLAGS += -DNDEBUG -O2 @@ -288,10 +293,15 @@ ifeq ($(CPU_ONLY), 1) TEST_OBJS := $(TEST_CXX_OBJS) TEST_BINS := $(TEST_CXX_BINS) ALL_WARNS := $(ALL_CXX_WARNS) - TEST_FILTER := --gtest_filter="-*GPU*" + TEST_FILTER += --gtest_filter="-*GPU*" COMMON_FLAGS += -DCPU_ONLY endif +# No IO dependencies +ifeq ($(NO_IO_DEPENDENCIES), 1) + COMMON_FLAGS += -DNO_IO_DEPENDENCIES +endif + # Python layer support ifeq ($(WITH_PYTHON_LAYER), 1) COMMON_FLAGS += -DWITH_PYTHON_LAYER diff --git a/Makefile.config.example b/Makefile.config.example index 7a8aafd7c9f..4d77f7f90e0 100644 --- a/Makefile.config.example +++ b/Makefile.config.example @@ -7,6 +7,9 @@ # CPU-only switch (uncomment to build without GPU support). # CPU_ONLY := 1 +# Removes IO dependencies and related code: lmdb, leveldb, snappy, hdf5 +# NO_IO_DEPENDENCIES := 1 + # To customize your choice of compiler, uncomment and set the following. # N.B. the default for Linux is g++ and the default for OSX is clang++ # CUSTOM_CXX := g++ diff --git a/examples/mnist/convert_mnist_data.cpp b/examples/mnist/convert_mnist_data.cpp index 2749e4521b6..856e518ced0 100644 --- a/examples/mnist/convert_mnist_data.cpp +++ b/examples/mnist/convert_mnist_data.cpp @@ -6,6 +6,8 @@ // The MNIST dataset could be downloaded at // http://yann.lecun.com/exdb/mnist/ +#ifndef NO_IO_DEPENDENCIES + #include #include #include @@ -196,3 +198,10 @@ int main(int argc, char** argv) { } return 0; } + +#else +#include +int main(int argc, char** argv) { + fprintf(stderr, "NO_IO_DEPENDENCIES false"); +} +#endif diff --git a/examples/siamese/convert_mnist_siamese_data.cpp b/examples/siamese/convert_mnist_siamese_data.cpp index 71c56a0ae61..489f215e42d 100644 --- a/examples/siamese/convert_mnist_siamese_data.cpp +++ b/examples/siamese/convert_mnist_siamese_data.cpp @@ -5,6 +5,9 @@ // convert_mnist_data input_image_file input_label_file output_db_file // The MNIST dataset could be downloaded at // http://yann.lecun.com/exdb/mnist/ + +#ifndef NO_IO_DEPENDENCIES + #include // NOLINT(readability/streams) #include @@ -121,3 +124,10 @@ int main(int argc, char** argv) { } return 0; } + +#else +#include +int main(int argc, char** argv) { + fprintf(stderr, "NO_IO_DEPENDENCIES false"); +} +#endif diff --git a/include/caffe/common.hpp b/include/caffe/common.hpp index 6cf80a37bc1..062c67983b9 100644 --- a/include/caffe/common.hpp +++ b/include/caffe/common.hpp @@ -98,12 +98,12 @@ void GlobalInit(int* pargc, char*** pargv); class Caffe { public: ~Caffe(); - inline static Caffe& Get() { - if (!singleton_.get()) { - singleton_.reset(new Caffe()); - } - return *singleton_; - } + + // Thread local context for Caffe. Moved to common.cpp instead of + // including boost/thread.hpp to avoid a boost/NVCC issues (#1009, #1010) + // on OSX. Also fails on Linux with CUDA 7.0.18. + static Caffe& Get(); + enum Brew { CPU, GPU }; // This random number generator facade hides boost and CUDA rng @@ -149,6 +149,12 @@ class Caffe { static void SetDevice(const int device_id); // Prints the current GPU status. static void DeviceQuery(); + // Parallel training info + inline static int solver_count() { return Get().solver_count_; } + inline static void set_solver_count(int val) { Get().solver_count_ = val; } + inline static bool root_solver() { return Get().solver_index_ == 0; } + inline static int solver_index() { return Get().solver_index_; } + inline static void set_solver_index(int val) { Get().solver_index_ = val; } protected: #ifndef CPU_ONLY @@ -158,7 +164,8 @@ class Caffe { shared_ptr random_generator_; Brew mode_; - static shared_ptr singleton_; + int solver_count_; + int solver_index_; private: // The private constructor to avoid duplicate instantiation. diff --git a/include/caffe/data_layers.hpp b/include/caffe/data_layers.hpp index 2bb9d948169..f3f69a730fe 100644 --- a/include/caffe/data_layers.hpp +++ b/include/caffe/data_layers.hpp @@ -5,9 +5,11 @@ #include #include -#include "boost/scoped_ptr.hpp" +#include "boost/random/uniform_real.hpp" +#include "boost/random/variate_generator.hpp" +#ifndef NO_IO_DEPENDENCIES #include "hdf5.h" - +#endif #include "caffe/blob.hpp" #include "caffe/common.hpp" #include "caffe/data_transformer.hpp" @@ -16,7 +18,10 @@ #include "caffe/layer.hpp" #include "caffe/net.hpp" #include "caffe/proto/caffe.pb.h" +#include "caffe/scheme.hpp" +#include "caffe/util/blocking_queue.hpp" #include "caffe/util/db.hpp" +#include "caffe/util/rng.hpp" namespace caffe { @@ -52,12 +57,17 @@ class BaseDataLayer : public Layer { bool output_labels_; }; +template +class Batch { + public: + Blob data_, label_; +}; + template class BasePrefetchingDataLayer : public BaseDataLayer, public InternalThread { public: - explicit BasePrefetchingDataLayer(const LayerParameter& param) - : BaseDataLayer(param) {} + explicit BasePrefetchingDataLayer(const LayerParameter& param); virtual ~BasePrefetchingDataLayer() {} // LayerSetUp: implements common data layer setup functionality, and calls // DataLayerSetUp to do special data layer setup for individual layer types. @@ -70,22 +80,24 @@ class BasePrefetchingDataLayer : virtual void Forward_gpu(const vector*>& bottom, const vector*>& top); - virtual void CreatePrefetchThread(); - virtual void JoinPrefetchThread(); - // The thread's function - virtual void InternalThreadEntry() {} + // Prefetches batches (asynchronously if to GPU memory) + static const int PREFETCH_COUNT = 3; protected: - Blob prefetch_data_; - Blob prefetch_label_; + virtual void InternalThreadEntry(); + virtual void load_batch(Batch* batch) = 0; + + Batch prefetch_[PREFETCH_COUNT]; + blocking_queue*> prefetch_free_; + blocking_queue*> prefetch_full_; + Blob transformed_data_; }; template -class DataLayer : public BasePrefetchingDataLayer { +class DataLayer: public BasePrefetchingDataLayer { public: - explicit DataLayer(const LayerParameter& param) - : BasePrefetchingDataLayer(param) {} + explicit DataLayer(const LayerParameter& param); virtual ~DataLayer(); virtual void DataLayerSetUp(const vector*>& bottom, const vector*>& top); @@ -96,10 +108,13 @@ class DataLayer : public BasePrefetchingDataLayer { virtual inline int MaxTopBlobs() const { return 2; } protected: - virtual void InternalThreadEntry(); + virtual void load_batch(Batch* batch); + Reader* next_reader(); - shared_ptr db_; - shared_ptr cursor_; + vector > readers_; + boost::uniform_real random_distribution_; + shared_ptr > > + variate_generator_; }; /** @@ -112,6 +127,7 @@ class DummyDataLayer : public Layer { public: explicit DummyDataLayer(const LayerParameter& param) : Layer(param) {} + virtual ~DummyDataLayer() {} virtual void LayerSetUp(const vector*>& bottom, const vector*>& top); // Data layers have no bottoms, so reshaping is trivial. @@ -134,6 +150,8 @@ class DummyDataLayer : public Layer { vector refill_; }; +#ifndef NO_IO_DEPENDENCIES + /** * @brief Provides data to the Net from HDF5 files. * @@ -217,6 +235,8 @@ class HDF5OutputLayer : public Layer { Blob label_blob_; }; +#endif + /** * @brief Provides data to the Net from image files. * @@ -238,7 +258,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer { protected: shared_ptr prefetch_rng_; virtual void ShuffleImages(); - virtual void InternalThreadEntry(); + virtual void load_batch(Batch* batch); vector > lines_; int lines_id_; @@ -254,6 +274,7 @@ class MemoryDataLayer : public BaseDataLayer { public: explicit MemoryDataLayer(const LayerParameter& param) : BaseDataLayer(param), has_new_data_(false) {} + virtual ~MemoryDataLayer() {} virtual void DataLayerSetUp(const vector*>& bottom, const vector*>& top); @@ -310,7 +331,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer { protected: virtual unsigned int PrefetchRand(); - virtual void InternalThreadEntry(); + virtual void load_batch(Batch* batch); shared_ptr prefetch_rng_; vector > > image_database_; diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp index 815ca54605e..cd76aa45775 100644 --- a/include/caffe/internal_thread.hpp +++ b/include/caffe/internal_thread.hpp @@ -14,18 +14,20 @@ namespace caffe { /** * Virtual class encapsulate boost::thread for use in base class * The child class will acquire the ability to run a single thread, - * by reimplementing the virutal function InternalThreadEntry. + * by reimplementing the virtual function InternalThreadEntry. */ class InternalThread { public: - InternalThread() : thread_() {} + InternalThread(); virtual ~InternalThread(); - /** Returns true if the thread was successfully started. **/ - bool StartInternalThread(); + // Caffe's thread local state will be initialized using the current + // thread values, e.g. device id, solver index etc. The random seed + // is initialized using caffe_rng_rand. + void StartInternalThread(); - /** Will not return until the internal thread has exited. */ - bool WaitForInternalThreadToExit(); + // Will not return until the thread has exited. + void StopInternalThread(); bool is_started() const; @@ -34,7 +36,17 @@ class InternalThread { with the code you want your thread to run. */ virtual void InternalThreadEntry() {} + bool must_stop(); + + private: + void entry(); + shared_ptr thread_; + int device_; + Caffe::Brew mode_; + int rand_seed_; + int solver_count_; + int solver_index_; }; } // namespace caffe diff --git a/include/caffe/scheme.hpp b/include/caffe/scheme.hpp new file mode 100644 index 00000000000..a49b67c114a --- /dev/null +++ b/include/caffe/scheme.hpp @@ -0,0 +1,206 @@ +#ifndef CAFFE_SCHEMES_HPP_ +#define CAFFE_SCHEMES_HPP_ + +#include +#include +#include + +#include "caffe/common.hpp" +#include "caffe/util/blocking_queue.hpp" + +namespace caffe { + +using boost::weak_ptr; + +// A URI scheme adds support for an input/output transport. +// E.g. lmdb://path, tcp://host:port, hdfs://path +class Scheme; + +// Reads datums to a queue available to data layers. +class Reader { + public: + ~Reader(); + + inline blocking_queue& free() { + return body_->free_; + } + inline blocking_queue& full() { + return body_->full_; + } + + // Makes sure only one reader is created per source, needed in + // particular for parallel training + class Body : public InternalThread { + public: + Body(const DataParameter& param, int index); + virtual ~Body(); + + protected: + const DataParameter param_; + const int index_; + blocking_queue free_; + blocking_queue full_; + + friend class Reader; + + DISABLE_COPY_AND_ASSIGN(Body); + }; + + protected: + explicit Reader(const shared_ptr& body) + : body_(body) { + } + + shared_ptr body_; + + friend class Scheme; + +DISABLE_COPY_AND_ASSIGN(Reader); +}; + +// TODO Writer for output layers + +class Scheme { + public: + virtual ~Scheme() { + } + + inline const vector& names() const { + return names_; + } + + static void add(const shared_ptr& scheme); + static const shared_ptr& get(const string& name); + + shared_ptr get_reader(const DataParameter& param, int index) const; + + protected: + Scheme() { + } + + virtual Reader::Body* new_reader(const DataParameter& param, int i) const = 0; + + vector names_; + + static map > schemes_; + static map > readers_; + + friend class Reader; + +DISABLE_COPY_AND_ASSIGN(Scheme); +}; + +#ifndef NO_IO_DEPENDENCIES + +class DefaultDatabases : public Scheme { + public: + DefaultDatabases() { + names_.push_back("lmdb"); + names_.push_back("leveldb"); + } + virtual ~DefaultDatabases() { + } + + protected: + class DBReader : public Reader::Body { + public: + DBReader(const DataParameter& param, int index); + virtual ~DBReader() { + } + + virtual void InternalThreadEntry(); + + using Reader::Body::free_; + using Reader::Body::full_; + using Reader::Body::param_; + using Reader::Body::index_; + }; + + virtual Reader::Body* new_reader(const DataParameter& param, int i) const { + return new DBReader(param, i); + } +}; + +#endif + +class FileScheme : public Scheme { + public: + FileScheme() { + names_.push_back("file"); + } + virtual ~FileScheme() { + } + + protected: + class DescriptorReader : public Reader::Body { + public: + DescriptorReader(const DataParameter& param, int index) + : Reader::Body(param, index) { + } + virtual ~DescriptorReader() { + } + + void read_descriptor(int file_descriptor); + + using Reader::Body::free_; + using Reader::Body::full_; + using Reader::Body::param_; + using Reader::Body::index_; + }; + + class FileReader : public DescriptorReader { + public: + FileReader(const DataParameter& param, int index); + virtual ~FileReader() { + } + + virtual void InternalThreadEntry(); + }; + + virtual Reader::Body* new_reader(const DataParameter& param, int i) const { + return new FileReader(param, i); + } +}; + +class SocketScheme : public FileScheme { + public: + SocketScheme() { + names_.clear(); + names_.push_back("tcp"); + names_.push_back("http"); + } + virtual ~SocketScheme() { + } + + protected: + class SocketReader : public DescriptorReader { + public: + SocketReader(const DataParameter& param, int index); + virtual ~SocketReader() { + } + + virtual void InternalThreadEntry(); + }; + + virtual Reader::Body* new_reader(const DataParameter& param, int i) const { + return new SocketReader(param, i); + } +}; + +// TODO hdf5, images + +// Loads default schemes +class DefaultSchemes { + public: + DefaultSchemes() { +#ifndef NO_IO_DEPENDENCIES + Scheme::add(shared_ptr(new DefaultDatabases())); +#endif + Scheme::add(shared_ptr(new FileScheme())); + Scheme::add(shared_ptr(new SocketScheme())); + } +}; + +} // namespace caffe + +#endif // CAFFE_SCHEMES_HPP_ diff --git a/include/caffe/syncedmem.hpp b/include/caffe/syncedmem.hpp index 1b726de9564..4d339bf4e57 100644 --- a/include/caffe/syncedmem.hpp +++ b/include/caffe/syncedmem.hpp @@ -56,6 +56,10 @@ class SyncedMemory { SyncedHead head() { return head_; } size_t size() { return size_; } +#ifndef CPU_ONLY + void async_gpu_push(const cudaStream_t& stream); +#endif + private: void to_cpu(); void to_gpu(); diff --git a/include/caffe/util/blocking_queue.hpp b/include/caffe/util/blocking_queue.hpp new file mode 100644 index 00000000000..96e83a1f105 --- /dev/null +++ b/include/caffe/util/blocking_queue.hpp @@ -0,0 +1,50 @@ +#ifndef CAFFE_UTIL_BLOCKING_QUEUE_H_ +#define CAFFE_UTIL_BLOCKING_QUEUE_H_ + +#include +#include + +#include "caffe/common.hpp" + +namespace caffe { + +template +class blocking_queue { + public: + explicit blocking_queue(); + virtual ~blocking_queue(); + + void push(const T& t); + + bool empty() const; + + bool try_pop(T* t); + + T pop(const string& log_on_wait = ""); + + // Return element without removing it + T peek(); + + inline uint64_t pops() { + return pops_; + } + + protected: + /** + Move synchronization fields out instead of including boost/thread.hpp + to avoid a boost/NVCC issues (#1009, #1010) on OSX. Also fails on + Linux CUDA 7.0.18. + */ + class sync; + + std::queue queue_; + shared_ptr sync_; + time_t last_wait_log_; + uint64_t pops_; + +DISABLE_COPY_AND_ASSIGN(blocking_queue); +}; + +} // namespace caffe + +#endif diff --git a/include/caffe/util/db.hpp b/include/caffe/util/db.hpp index afdb8d2c4f8..86571f71ec5 100644 --- a/include/caffe/util/db.hpp +++ b/include/caffe/util/db.hpp @@ -3,9 +3,11 @@ #include +#ifndef NO_IO_DEPENDENCIES #include "leveldb/db.h" #include "leveldb/write_batch.h" #include "lmdb.h" +#endif #include "caffe/common.hpp" #include "caffe/proto/caffe.pb.h" @@ -49,6 +51,8 @@ class DB { DISABLE_COPY_AND_ASSIGN(DB); }; +#ifndef NO_IO_DEPENDENCIES + class LevelDBCursor : public Cursor { public: explicit LevelDBCursor(leveldb::Iterator* iter) @@ -181,6 +185,8 @@ class LMDB : public DB { MDB_dbi mdb_dbi_; }; +#endif + DB* GetDB(DataParameter::DB backend); DB* GetDB(const string& backend); diff --git a/include/caffe/util/io.hpp b/include/caffe/util/io.hpp index 3a62c3c9fa9..45db3025298 100644 --- a/include/caffe/util/io.hpp +++ b/include/caffe/util/io.hpp @@ -5,8 +5,10 @@ #include #include "google/protobuf/message.h" +#ifndef NO_IO_DEPENDENCIES #include "hdf5.h" #include "hdf5_hl.h" +#endif #include "caffe/blob.hpp" #include "caffe/common.hpp" @@ -140,6 +142,7 @@ cv::Mat DecodeDatumToCVMat(const Datum& datum, bool is_color); void CVMatToDatum(const cv::Mat& cv_img, Datum* datum); +#ifndef NO_IO_DEPENDENCIES template void hdf5_load_nd_dataset_helper( hid_t file_id, const char* dataset_name_, int min_dim, int max_dim, @@ -153,6 +156,46 @@ void hdf5_load_nd_dataset( template void hdf5_save_nd_dataset( const hid_t file_id, const string& dataset_name, const Blob& blob); +#endif + +// Avoiding another dependency for now, but could be swapped for +// something like cpp-netlib +class URI { + public: + URI(const string& uri, bool no_host = false); + const string& scheme() const { return scheme_; } + const string& host() const { return host_; } + const string& port() const { return port_; } + const string& path() const { return path_; } + + protected: + string scheme_; + string host_; + string port_; + string path_; +}; + +// Simple wrapper over C file (protobuf requires a file descriptor) +class File { + public: + File(const string& path, int flags); + ~File(); + int descriptor() { return fd_; } + + protected: + int fd_; +}; + +// Simple wrapper over C socket (protobuf requires a file descriptor) +class Socket { + public: + explicit Socket(const URI& uri); + ~Socket(); + int descriptor() { return fd_; } + + protected: + int fd_; +}; } // namespace caffe diff --git a/src/caffe/common.cpp b/src/caffe/common.cpp index af96cac40aa..c6a695a53a8 100644 --- a/src/caffe/common.cpp +++ b/src/caffe/common.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -7,7 +8,15 @@ namespace caffe { -shared_ptr Caffe::singleton_; +// Make sure each thread can have different values. +static boost::thread_specific_ptr thread_instance_; + +Caffe& Caffe::Get() { + if (!thread_instance_.get()) { + thread_instance_.reset(new Caffe()); + } + return *(thread_instance_.get()); +} // random seeding int64_t cluster_seedgen(void) { @@ -86,7 +95,7 @@ void* Caffe::RNG::generator() { Caffe::Caffe() : cublas_handle_(NULL), curand_generator_(NULL), random_generator_(), - mode_(Caffe::CPU) { + mode_(Caffe::CPU), solver_count_(1), solver_index_() { // Try to create a cublas handler, and report an error if failed (but we will // keep the program running as one might just want to run CPU code). if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) { diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp index c2d19d433b4..37a141600eb 100644 --- a/src/caffe/internal_thread.cpp +++ b/src/caffe/internal_thread.cpp @@ -1,40 +1,73 @@ #include +#include + #include "caffe/internal_thread.hpp" +#include "caffe/util/math_functions.hpp" namespace caffe { +InternalThread::InternalThread() + : thread_(), + device_(), + mode_(), + rand_seed_(), + solver_count_(), + solver_index_() { +} + InternalThread::~InternalThread() { - WaitForInternalThreadToExit(); + StopInternalThread(); } bool InternalThread::is_started() const { return thread_.get() != NULL && thread_->joinable(); } +bool InternalThread::must_stop() { + return thread_->interruption_requested(); +} + +void InternalThread::StartInternalThread() { + StopInternalThread(); + +#ifndef CPU_ONLY + CUDA_CHECK(cudaGetDevice(&device_)); +#endif + mode_ = Caffe::mode(); + rand_seed_ = caffe_rng_rand(); + solver_count_ = Caffe::solver_count(); + solver_index_ = Caffe::solver_index(); -bool InternalThread::StartInternalThread() { - if (!WaitForInternalThreadToExit()) { - return false; - } try { - thread_.reset( - new boost::thread(&InternalThread::InternalThreadEntry, this)); - } catch (...) { - return false; + thread_.reset(new boost::thread(&InternalThread::entry, this)); + } catch (boost::thread_interrupted&) { + } catch (std::exception& e) { + CHECK(false) << e.what(); } - return true; } -/** Will not return until the internal thread has exited. */ -bool InternalThread::WaitForInternalThreadToExit() { +void InternalThread::entry() { +#ifndef CPU_ONLY + CUDA_CHECK(cudaSetDevice(device_)); +#endif + Caffe::set_mode(mode_); + Caffe::set_random_seed(rand_seed_); + Caffe::set_solver_count(solver_count_); + Caffe::set_solver_index(solver_index_); + + InternalThreadEntry(); +} + +void InternalThread::StopInternalThread() { if (is_started()) { + thread_->interrupt(); try { thread_->join(); - } catch (...) { - return false; + } catch (boost::thread_interrupted&) { + } catch (std::exception& e) { + CHECK(false) << e.what(); } } - return true; } } // namespace caffe diff --git a/src/caffe/layers/base_data_layer.cpp b/src/caffe/layers/base_data_layer.cpp index 352200915d7..97f5265bc20 100644 --- a/src/caffe/layers/base_data_layer.cpp +++ b/src/caffe/layers/base_data_layer.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -10,7 +11,8 @@ namespace caffe { template BaseDataLayer::BaseDataLayer(const LayerParameter& param) : Layer(param), - transform_param_(param.transform_param()) { + transform_param_(param.transform_param()), + output_labels_() { } template @@ -28,54 +30,90 @@ void BaseDataLayer::LayerSetUp(const vector*>& bottom, data_transformer_->InitRand(); } +template +BasePrefetchingDataLayer::BasePrefetchingDataLayer( + const LayerParameter& param) + : BaseDataLayer(param), + prefetch_free_(), prefetch_full_() { + for (int i = 0; i < PREFETCH_COUNT; ++i) + prefetch_free_.push(&prefetch_[i]); +} + template void BasePrefetchingDataLayer::LayerSetUp( const vector*>& bottom, const vector*>& top) { BaseDataLayer::LayerSetUp(bottom, top); - // Now, start the prefetch thread. Before calling prefetch, we make two - // cpu_data calls so that the prefetch thread does not accidentally make - // simultaneous cudaMalloc calls when the main thread is running. In some - // GPUs this seems to cause failures if we do not so. - this->prefetch_data_.mutable_cpu_data(); - if (this->output_labels_) { - this->prefetch_label_.mutable_cpu_data(); + + // Before starting the prefetch thread, we make cpu_data and gpu_data + // calls so that the prefetch thread does not accidentally make simultaneous + // cudaMalloc calls when the main thread is running. In some GPUs this + // seems to cause failures if we do not so. + for (int i = 0; i < PREFETCH_COUNT; ++i) { + prefetch_[i].data_.mutable_cpu_data(); + if (this->output_labels_) { + prefetch_[i].label_.mutable_cpu_data(); + } } +#ifndef CPU_ONLY + if (Caffe::mode() == Caffe::GPU) { + for (int i = 0; i < PREFETCH_COUNT; ++i) { + prefetch_[i].data_.mutable_gpu_data(); + if (this->output_labels_) { + prefetch_[i].label_.mutable_gpu_data(); + } + } + } +#endif + DLOG(INFO) << "Initializing prefetch"; - this->CreatePrefetchThread(); + this->data_transformer_->InitRand(); + StartInternalThread(); DLOG(INFO) << "Prefetch initialized."; } template -void BasePrefetchingDataLayer::CreatePrefetchThread() { - this->data_transformer_->InitRand(); - CHECK(StartInternalThread()) << "Thread execution failed"; -} +void BasePrefetchingDataLayer::InternalThreadEntry() { +#ifndef CPU_ONLY + cudaStream_t stream; + if (Caffe::mode() == Caffe::GPU) { + cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking); + } +#endif -template -void BasePrefetchingDataLayer::JoinPrefetchThread() { - CHECK(WaitForInternalThreadToExit()) << "Thread joining failed"; + try { + while (!must_stop()) { + Batch* batch = prefetch_free_.pop(); + load_batch(batch); +#ifndef CPU_ONLY + if (Caffe::mode() == Caffe::GPU) { + batch->data_.data().get()->async_gpu_push(stream); + cudaStreamSynchronize(stream); + } +#endif + prefetch_full_.push(batch); + } + } catch (boost::thread_interrupted&) { + // Interrupted exception is expected on shutdown + } } template void BasePrefetchingDataLayer::Forward_cpu( const vector*>& bottom, const vector*>& top) { - // First, join the thread - JoinPrefetchThread(); - DLOG(INFO) << "Thread joined"; + Batch* batch = prefetch_full_.pop("Data layer prefetch queue empty"); // Reshape to loaded data. - top[0]->Reshape(this->prefetch_data_.num(), this->prefetch_data_.channels(), - this->prefetch_data_.height(), this->prefetch_data_.width()); + top[0]->Reshape(batch->data_.num(), batch->data_.channels(), + batch->data_.height(), batch->data_.width()); // Copy the data - caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(), + caffe_copy(batch->data_.count(), batch->data_.cpu_data(), top[0]->mutable_cpu_data()); DLOG(INFO) << "Prefetch copied"; if (this->output_labels_) { - caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(), - top[1]->mutable_cpu_data()); + caffe_copy(batch->label_.count(), batch->label_.cpu_data(), + top[1]->mutable_cpu_data()); } - // Start a new prefetch thread - DLOG(INFO) << "CreatePrefetchThread"; - CreatePrefetchThread(); + + prefetch_free_.push(batch); } #ifdef CPU_ONLY @@ -83,6 +121,7 @@ STUB_GPU_FORWARD(BasePrefetchingDataLayer, Forward); #endif INSTANTIATE_CLASS(BaseDataLayer); +INSTANTIATE_CLASS(Batch); INSTANTIATE_CLASS(BasePrefetchingDataLayer); } // namespace caffe diff --git a/src/caffe/layers/base_data_layer.cu b/src/caffe/layers/base_data_layer.cu index 775f6c47f7e..52085d007a7 100644 --- a/src/caffe/layers/base_data_layer.cu +++ b/src/caffe/layers/base_data_layer.cu @@ -7,20 +7,19 @@ namespace caffe { template void BasePrefetchingDataLayer::Forward_gpu( const vector*>& bottom, const vector*>& top) { - // First, join the thread - JoinPrefetchThread(); + Batch* batch = prefetch_full_.pop("Data layer prefetch queue empty"); // Reshape to loaded data. - top[0]->Reshape(this->prefetch_data_.num(), this->prefetch_data_.channels(), - this->prefetch_data_.height(), this->prefetch_data_.width()); + top[0]->Reshape(batch->data_.num(), batch->data_.channels(), + batch->data_.height(), batch->data_.width()); // Copy the data - caffe_copy(prefetch_data_.count(), prefetch_data_.cpu_data(), + caffe_copy(batch->data_.count(), batch->data_.gpu_data(), top[0]->mutable_gpu_data()); if (this->output_labels_) { - caffe_copy(prefetch_label_.count(), prefetch_label_.cpu_data(), + caffe_copy(batch->label_.count(), batch->label_.gpu_data(), top[1]->mutable_gpu_data()); } - // Start a new prefetch thread - CreatePrefetchThread(); + + prefetch_free_.push(batch); } INSTANTIATE_LAYER_GPU_FORWARD(BasePrefetchingDataLayer); diff --git a/src/caffe/layers/data_layer.cpp b/src/caffe/layers/data_layer.cpp index 0f2d66776a9..1e49eaae2f5 100644 --- a/src/caffe/layers/data_layer.cpp +++ b/src/caffe/layers/data_layer.cpp @@ -1,7 +1,11 @@ +#include #include #include +#include +#include +#include #include #include @@ -17,106 +21,104 @@ namespace caffe { template -DataLayer::~DataLayer() { - this->JoinPrefetchThread(); +DataLayer::DataLayer(const LayerParameter& param) + : BasePrefetchingDataLayer(param), + random_distribution_(), + variate_generator_() { + const DataParameter& data = param.data_param(); + if (data.probability_size()) { + CHECK_EQ(data.source().size(), data.probability().size()) + << "Invalid DataParameter, there should be one probability per source"; + float sum = 0; + for (int i = 0; i < data.probability().size(); ++i) { + sum += data.probability(i); + } + CHECK_LT(fabsf(sum - 1.0f), 1e-6f) + << "Invalid DataParameter, probabilities do not sum to 1"; + } + for (int i = 0; i < data.source().size(); ++i) { + URI uri(data.source(i)); + const shared_ptr& scheme(Scheme::get(uri.scheme())); + readers_.push_back(scheme->get_reader(data, i)); + } +} + +template +DataLayer::~DataLayer() { + this->StopInternalThread(); } template void DataLayer::DataLayerSetUp(const vector*>& bottom, const vector*>& top) { - // Initialize DB - db_.reset(db::GetDB(this->layer_param_.data_param().backend())); - db_->Open(this->layer_param_.data_param().source(), db::READ); - cursor_.reset(db_->NewCursor()); - - // Check if we should randomly skip a few data points - if (this->layer_param_.data_param().rand_skip()) { - unsigned int skip = caffe_rng_rand() % - this->layer_param_.data_param().rand_skip(); - LOG(INFO) << "Skipping first " << skip << " data points."; - while (skip-- > 0) { - cursor_->Next(); - } - } - // Read a data point, and use it to initialize the top blob. - Datum datum; - datum.ParseFromString(cursor_->value()); + // Look at first data point to initialize the top blob. + Datum* datum = readers_[0].get()->full().peek(); bool force_color = this->layer_param_.data_param().force_encoded_color(); - if ((force_color && DecodeDatum(&datum, true)) || - DecodeDatumNative(&datum)) { + if ((force_color && DecodeDatum(datum, true)) || + DecodeDatumNative(datum)) { LOG(INFO) << "Decoding Datum"; } // image - int crop_size = this->layer_param_.transform_param().crop_size(); + const int crop_size = this->layer_param_.transform_param().crop_size(); + const int batch_size = this->layer_param_.data_param().batch_size(); if (crop_size > 0) { - top[0]->Reshape(this->layer_param_.data_param().batch_size(), - datum.channels(), crop_size, crop_size); - this->prefetch_data_.Reshape(this->layer_param_.data_param().batch_size(), - datum.channels(), crop_size, crop_size); - this->transformed_data_.Reshape(1, datum.channels(), crop_size, crop_size); + top[0]->Reshape(batch_size, datum->channels(), crop_size, crop_size); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) { + this->prefetch_[i].data_.Reshape(batch_size, datum->channels(), + crop_size, crop_size); + } + this->transformed_data_.Reshape(1, datum->channels(), + crop_size, crop_size); } else { - top[0]->Reshape( - this->layer_param_.data_param().batch_size(), datum.channels(), - datum.height(), datum.width()); - this->prefetch_data_.Reshape(this->layer_param_.data_param().batch_size(), - datum.channels(), datum.height(), datum.width()); - this->transformed_data_.Reshape(1, datum.channels(), - datum.height(), datum.width()); + top[0]->Reshape(batch_size, datum->channels(), + datum->height(), datum->width()); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) { + this->prefetch_[i].data_.Reshape(batch_size, datum->channels(), + datum->height(), datum->width()); + } + this->transformed_data_.Reshape(1, datum->channels(), + datum->height(), datum->width()); } LOG(INFO) << "output data size: " << top[0]->num() << "," << top[0]->channels() << "," << top[0]->height() << "," << top[0]->width(); // label if (this->output_labels_) { - vector label_shape(1, this->layer_param_.data_param().batch_size()); + vector label_shape(1, batch_size); top[1]->Reshape(label_shape); - this->prefetch_label_.Reshape(label_shape); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) { + this->prefetch_[i].label_.Reshape(label_shape); + } } } -// This function is used to create a thread that prefetches the data. +// This function is called on prefetch thread template -void DataLayer::InternalThreadEntry() { +void DataLayer::load_batch(Batch* batch) { CPUTimer batch_timer; batch_timer.Start(); double read_time = 0; double trans_time = 0; CPUTimer timer; - CHECK(this->prefetch_data_.count()); + CHECK(batch->data_.count()); CHECK(this->transformed_data_.count()); - // Reshape on single input batches for inputs of varying dimension. const int batch_size = this->layer_param_.data_param().batch_size(); const int crop_size = this->layer_param_.transform_param().crop_size(); bool force_color = this->layer_param_.data_param().force_encoded_color(); - if (batch_size == 1 && crop_size == 0) { - Datum datum; - datum.ParseFromString(cursor_->value()); - if (datum.encoded()) { - if (force_color) { - DecodeDatum(&datum, true); - } else { - DecodeDatumNative(&datum); - } - } - this->prefetch_data_.Reshape(1, datum.channels(), - datum.height(), datum.width()); - this->transformed_data_.Reshape(1, datum.channels(), - datum.height(), datum.width()); - } - - Dtype* top_data = this->prefetch_data_.mutable_cpu_data(); - Dtype* top_label = NULL; // suppress warnings about uninitialized variables - - if (this->output_labels_) { - top_label = this->prefetch_label_.mutable_cpu_data(); - } for (int item_id = 0; item_id < batch_size; ++item_id) { timer.Start(); - // get a blob - Datum datum; - datum.ParseFromString(cursor_->value()); + Reader* reader = next_reader(); + const Datum& datum = *(reader->full().pop("Waiting on data reader")); + + // Reshape on single input batches for inputs of varying dimension. + if (batch_size == 1 && crop_size == 0) { + batch->data_.Reshape(1, datum.channels(), + datum.height(), datum.width()); + this->transformed_data_.Reshape(1, datum.channels(), + datum.height(), datum.width()); + } cv::Mat cv_img; if (datum.encoded()) { @@ -136,7 +138,8 @@ void DataLayer::InternalThreadEntry() { timer.Start(); // Apply data transformations (mirror, scale, crop...) - int offset = this->prefetch_data_.offset(item_id); + Dtype* top_data = batch->data_.mutable_cpu_data(); + int offset = batch->data_.offset(item_id); this->transformed_data_.set_cpu_data(top_data + offset); if (datum.encoded()) { this->data_transformer_->Transform(cv_img, &(this->transformed_data_)); @@ -144,15 +147,11 @@ void DataLayer::InternalThreadEntry() { this->data_transformer_->Transform(datum, &(this->transformed_data_)); } if (this->output_labels_) { - top_label[item_id] = datum.label(); + batch->label_.mutable_cpu_data()[item_id] = datum.label(); } trans_time += timer.MicroSeconds(); - // go to the next iter - cursor_->Next(); - if (!cursor_->valid()) { - DLOG(INFO) << "Restarting data prefetching from start."; - cursor_->SeekToFirst(); - } + + reader->free().push(const_cast(&datum)); } batch_timer.Stop(); DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms."; @@ -160,6 +159,41 @@ void DataLayer::InternalThreadEntry() { DLOG(INFO) << "Transform time: " << trans_time / 1000 << " ms."; } +// This function is called on prefetch thread +template +Reader* DataLayer::next_reader() { + const DataParameter& data = this->layer_param().data_param(); + // Default case without probabilities, try to find a reader with + // data ready, or return first one + if (data.probability_size() == 0) { + for (int i = 0; i < readers_.size(); ++i) { + Reader* reader = readers_[i].get(); + if (!reader->full().empty()) { + return reader; + } + } + } else { + // Create RNG on current thread if first run + if (!variate_generator_) { + variate_generator_.reset( + new boost::variate_generator >( + caffe_rng(), random_distribution_)); + } + // Pick reader randomly with probability + boost::variate_generator >& rng = + *variate_generator_.get(); + float rand = rng(); + for (int i = 0; i < data.probability().size(); ++i) { + rand -= data.probability(i); + if (rand < 0) { + return readers_[i].get(); + } + } + } + // If no data ready, or rounding error on probabilities + return readers_[0].get(); +} + INSTANTIATE_CLASS(DataLayer); REGISTER_LAYER_CLASS(Data); diff --git a/src/caffe/layers/hdf5_data_layer.cpp b/src/caffe/layers/hdf5_data_layer.cpp index 8a782f7e524..20315d1ecf7 100644 --- a/src/caffe/layers/hdf5_data_layer.cpp +++ b/src/caffe/layers/hdf5_data_layer.cpp @@ -6,6 +6,9 @@ :: don't forget to update hdf5_daa_layer.cu accordingly - add ability to shuffle filenames if flag is set */ + +#ifndef NO_IO_DEPENDENCIES + #include // NOLINT(readability/streams) #include #include @@ -165,3 +168,5 @@ INSTANTIATE_CLASS(HDF5DataLayer); REGISTER_LAYER_CLASS(HDF5Data); } // namespace caffe + +#endif diff --git a/src/caffe/layers/hdf5_data_layer.cu b/src/caffe/layers/hdf5_data_layer.cu index 5e3e4ced141..1420e649cbb 100644 --- a/src/caffe/layers/hdf5_data_layer.cu +++ b/src/caffe/layers/hdf5_data_layer.cu @@ -3,6 +3,8 @@ TODO: - only load parts of the file, in accordance with a prototxt param "max_mem" */ +#ifndef NO_IO_DEPENDENCIES + #include #include #include @@ -51,3 +53,5 @@ void HDF5DataLayer::Forward_gpu(const vector*>& bottom, INSTANTIATE_LAYER_GPU_FUNCS(HDF5DataLayer); } // namespace caffe + +#endif diff --git a/src/caffe/layers/hdf5_output_layer.cpp b/src/caffe/layers/hdf5_output_layer.cpp index f63375c3dc6..a55f5869c90 100644 --- a/src/caffe/layers/hdf5_output_layer.cpp +++ b/src/caffe/layers/hdf5_output_layer.cpp @@ -1,3 +1,5 @@ +#ifndef NO_IO_DEPENDENCIES + #include #include "hdf5.h" @@ -75,3 +77,5 @@ INSTANTIATE_CLASS(HDF5OutputLayer); REGISTER_LAYER_CLASS(HDF5Output); } // namespace caffe + +#endif diff --git a/src/caffe/layers/hdf5_output_layer.cu b/src/caffe/layers/hdf5_output_layer.cu index ae497c34fc2..3998138a176 100644 --- a/src/caffe/layers/hdf5_output_layer.cu +++ b/src/caffe/layers/hdf5_output_layer.cu @@ -1,3 +1,5 @@ +#ifndef NO_IO_DEPENDENCIES + #include #include "hdf5.h" @@ -41,3 +43,5 @@ void HDF5OutputLayer::Backward_gpu(const vector*>& top, INSTANTIATE_LAYER_GPU_FUNCS(HDF5OutputLayer); } // namespace caffe + +#endif diff --git a/src/caffe/layers/image_data_layer.cpp b/src/caffe/layers/image_data_layer.cpp index 38ebbd5ec14..50187bbe5ce 100644 --- a/src/caffe/layers/image_data_layer.cpp +++ b/src/caffe/layers/image_data_layer.cpp @@ -17,7 +17,7 @@ namespace caffe { template ImageDataLayer::~ImageDataLayer() { - this->JoinPrefetchThread(); + this->StopInternalThread(); } template @@ -70,11 +70,14 @@ void ImageDataLayer::DataLayerSetUp(const vector*>& bottom, const int batch_size = this->layer_param_.image_data_param().batch_size(); if (crop_size > 0) { top[0]->Reshape(batch_size, channels, crop_size, crop_size); - this->prefetch_data_.Reshape(batch_size, channels, crop_size, crop_size); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) + this->prefetch_[i].data_.Reshape(batch_size, channels, + crop_size, crop_size); this->transformed_data_.Reshape(1, channels, crop_size, crop_size); } else { top[0]->Reshape(batch_size, channels, height, width); - this->prefetch_data_.Reshape(batch_size, channels, height, width); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) + this->prefetch_[i].data_.Reshape(batch_size, channels, height, width); this->transformed_data_.Reshape(1, channels, height, width); } LOG(INFO) << "output data size: " << top[0]->num() << "," @@ -83,7 +86,9 @@ void ImageDataLayer::DataLayerSetUp(const vector*>& bottom, // label vector label_shape(1, batch_size); top[1]->Reshape(label_shape); - this->prefetch_label_.Reshape(label_shape); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) { + this->prefetch_[i].label_.Reshape(label_shape); + } } template @@ -93,15 +98,15 @@ void ImageDataLayer::ShuffleImages() { shuffle(lines_.begin(), lines_.end(), prefetch_rng); } -// This function is used to create a thread that prefetches the data. +// This function is called on prefetch thread template -void ImageDataLayer::InternalThreadEntry() { +void ImageDataLayer::load_batch(Batch* batch) { CPUTimer batch_timer; batch_timer.Start(); double read_time = 0; double trans_time = 0; CPUTimer timer; - CHECK(this->prefetch_data_.count()); + CHECK(batch->data_.count()); CHECK(this->transformed_data_.count()); ImageDataParameter image_data_param = this->layer_param_.image_data_param(); const int batch_size = image_data_param.batch_size(); @@ -115,14 +120,14 @@ void ImageDataLayer::InternalThreadEntry() { if (batch_size == 1 && crop_size == 0 && new_height == 0 && new_width == 0) { cv::Mat cv_img = ReadImageToCVMat(root_folder + lines_[lines_id_].first, 0, 0, is_color); - this->prefetch_data_.Reshape(1, cv_img.channels(), + batch->data_.Reshape(1, cv_img.channels(), cv_img.rows, cv_img.cols); this->transformed_data_.Reshape(1, cv_img.channels(), cv_img.rows, cv_img.cols); } - Dtype* prefetch_data = this->prefetch_data_.mutable_cpu_data(); - Dtype* prefetch_label = this->prefetch_label_.mutable_cpu_data(); + Dtype* prefetch_data = batch->data_.mutable_cpu_data(); + Dtype* prefetch_label = batch->label_.mutable_cpu_data(); // datum scales const int lines_size = lines_.size(); @@ -136,7 +141,7 @@ void ImageDataLayer::InternalThreadEntry() { read_time += timer.MicroSeconds(); timer.Start(); // Apply transformations (mirror, crop...) to the image - int offset = this->prefetch_data_.offset(item_id); + int offset = batch->data_.offset(item_id); this->transformed_data_.set_cpu_data(prefetch_data + offset); this->data_transformer_->Transform(cv_img, &(this->transformed_data_)); trans_time += timer.MicroSeconds(); diff --git a/src/caffe/layers/window_data_layer.cpp b/src/caffe/layers/window_data_layer.cpp index c127d56bc46..f637f2ec6d4 100644 --- a/src/caffe/layers/window_data_layer.cpp +++ b/src/caffe/layers/window_data_layer.cpp @@ -27,7 +27,7 @@ namespace caffe { template WindowDataLayer::~WindowDataLayer() { - this->JoinPrefetchThread(); + this->StopInternalThread(); } template @@ -171,7 +171,9 @@ void WindowDataLayer::DataLayerSetUp(const vector*>& bottom, CHECK_GT(crop_size, 0); const int batch_size = this->layer_param_.window_data_param().batch_size(); top[0]->Reshape(batch_size, channels, crop_size, crop_size); - this->prefetch_data_.Reshape(batch_size, channels, crop_size, crop_size); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) + this->prefetch_[i].data_.Reshape( + batch_size, channels, crop_size, crop_size); LOG(INFO) << "output data size: " << top[0]->num() << "," << top[0]->channels() << "," << top[0]->height() << "," @@ -179,7 +181,9 @@ void WindowDataLayer::DataLayerSetUp(const vector*>& bottom, // label vector label_shape(1, batch_size); top[1]->Reshape(label_shape); - this->prefetch_label_.Reshape(label_shape); + for (int i = 0; i < this->PREFETCH_COUNT; ++i) { + this->prefetch_[i].label_.Reshape(label_shape); + } // data mean has_mean_file_ = this->transform_param_.has_mean_file(); @@ -217,9 +221,9 @@ unsigned int WindowDataLayer::PrefetchRand() { return (*prefetch_rng)(); } -// Thread fetching the data +// This function is called on prefetch thread template -void WindowDataLayer::InternalThreadEntry() { +void WindowDataLayer::load_batch(Batch* batch) { // At each iteration, sample N windows where N*p are foreground (object) // windows and N*(1-p) are background (non-object) windows CPUTimer batch_timer; @@ -227,8 +231,8 @@ void WindowDataLayer::InternalThreadEntry() { double read_time = 0; double trans_time = 0; CPUTimer timer; - Dtype* top_data = this->prefetch_data_.mutable_cpu_data(); - Dtype* top_label = this->prefetch_label_.mutable_cpu_data(); + Dtype* top_data = batch->data_.mutable_cpu_data(); + Dtype* top_label = batch->label_.mutable_cpu_data(); const Dtype scale = this->layer_param_.window_data_param().scale(); const int batch_size = this->layer_param_.window_data_param().batch_size(); const int context_pad = this->layer_param_.window_data_param().context_pad(); @@ -252,7 +256,7 @@ void WindowDataLayer::InternalThreadEntry() { bool use_square = (crop_mode == "square") ? true : false; // zero out batch - caffe_set(this->prefetch_data_.count(), Dtype(0), top_data); + caffe_set(batch->data_.count(), Dtype(0), top_data); const int num_fg = static_cast(static_cast(batch_size) * fg_fraction); diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp index fd00b122630..6f530479715 100644 --- a/src/caffe/net.cpp +++ b/src/caffe/net.cpp @@ -39,6 +39,26 @@ void Net::Init(const NetParameter& in_param) { // the current NetState. NetParameter filtered_param; FilterNet(in_param, &filtered_param); + // Make sure sources are in URI format + for (int layer_id = 0; layer_id < filtered_param.layer_size(); ++layer_id) { + LayerParameter* layer_param = filtered_param.mutable_layer(layer_id); + if (layer_param->has_data_param()) { + const DataParameter& data = layer_param->data_param(); + for (int i = 0; i < data.source().size(); ++i) { + URI uri(data.source(i)); + if (uri.scheme().size() == 0) { + DataParameter_DB db = DataParameter_DB_LEVELDB; + if (data.backend_size() == data.source_size()) { + db = data.backend(i); + } + string uri = db == DataParameter_DB_LMDB ? "lmdb://" : "leveldb://"; + uri += data.source(i); + layer_param->mutable_data_param()->set_source(i, uri); + } + } + layer_param->mutable_data_param()->clear_backend(); + } + } LOG(INFO) << "Initializing net from parameters: " << std::endl << filtered_param.DebugString(); // Create a copy of filtered_param with splits added where necessary. diff --git a/src/caffe/proto/caffe.proto b/src/caffe/proto/caffe.proto index 5b21cf20028..9dcdf985f89 100644 --- a/src/caffe/proto/caffe.proto +++ b/src/caffe/proto/caffe.proto @@ -432,21 +432,25 @@ message ConvolutionParameter { } // Message that stores parameters used by DataLayer +// next available ID: 12 (last added: probability) message DataParameter { + // DEPRECATED enum DB { LEVELDB = 0; LMDB = 1; } // Specify the data source. - optional string source = 1; + repeated string source = 1; // Specify the batch size. optional uint32 batch_size = 4; // The rand_skip variable is for the data layer to skip a few data points // to avoid all asynchronous sgd clients to start at the same point. The skip // point would be set as rand_skip * rand(0,1). Note that rand_skip should not // be larger than the number of keys in the database. + // DEPRECATED. SGD solvers get different examples from each database. optional uint32 rand_skip = 7 [default = 0]; - optional DB backend = 8 [default = LEVELDB]; + // DEPRECATED. Use a URI scheme instead, e.g. lmdb://path + repeated DB backend = 8; // DEPRECATED. See TransformationParameter. For data pre-processing, we can do // simple scaling and subtracting the data mean, if provided. Note that the // mean subtraction is always carried out before scaling. @@ -460,6 +464,14 @@ message DataParameter { optional bool mirror = 6 [default = false]; // Force the encoded image to have 3 color channels optional bool force_encoded_color = 9 [default = false]; + // Prefetch queue (Number of batches to prefetch to host memory + // from each source, increase if read bandwidth has glitches). + optional uint32 prefetch = 10 [default = 4]; + // If multiple sources are given, a probability can be set on each source. + // Samples will be picked from it with given probability, and the label will + // be set to the source index. This allows experimenting with different + // class ratios at runtime without rebuilding datasets. + repeated float probability = 11; } // Message that stores parameters used by DropoutLayer diff --git a/src/caffe/scheme.cpp b/src/caffe/scheme.cpp new file mode 100644 index 00000000000..59c6d885718 --- /dev/null +++ b/src/caffe/scheme.cpp @@ -0,0 +1,219 @@ +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "caffe/common.hpp" +#include "caffe/data_layers.hpp" +#include "caffe/layer.hpp" +#include "caffe/proto/caffe.pb.h" +#include "caffe/util/benchmark.hpp" +#include "caffe/util/io.hpp" +#include "caffe/util/math_functions.hpp" +#include "caffe/util/rng.hpp" + +namespace caffe { + +using boost::weak_ptr; +using google::protobuf::io::FileInputStream; +using google::protobuf::io::CodedInputStream; + +map > Scheme::schemes_; +map > Scheme::readers_; +static boost::mutex scheme_mutex_; + +// Load default schemes +static DefaultSchemes schemes_; + +void Scheme::add(const shared_ptr& scheme) { + boost::mutex::scoped_lock lock(scheme_mutex_); + for (int i = 0; i < scheme->names_.size(); ++i) { + schemes_[scheme->names_[i]] = scheme; + } +} + +const shared_ptr& Scheme::get(const string& name) { + boost::mutex::scoped_lock lock(scheme_mutex_); + const shared_ptr& instance = schemes_[name]; + CHECK(instance) << "Unknown URI scheme: " << name; + return instance; +} + +shared_ptr Scheme::get_reader(const DataParameter& param, + int index) const { + boost::mutex::scoped_lock lock(scheme_mutex_); + const string& source = param.source(index); + weak_ptr weak = readers_[source]; + shared_ptr shared = weak.lock(); + if (!shared) { + shared.reset(new_reader(param, index)); + readers_[source] = weak_ptr(shared); + } + return shared_ptr(new Reader(shared)); +} + +Reader::~Reader() { + boost::mutex::scoped_lock lock(scheme_mutex_); + string source = body_->param_.source(body_->index_); + body_.reset(); + if (Scheme::readers_[source].expired()) + Scheme::readers_.erase(source); +} + +// + +Reader::Body::Body(const DataParameter& param, int index) + : param_(param), + index_(index), + free_(), + full_() { + // Add prefetch datums to layer free queue + int prefetch = param.prefetch() * param.batch_size(); + for (int i = 0; i < prefetch; ++i) { + free_.push(new Datum()); + } +} + +Reader::Body::~Body() { + StopInternalThread(); + Datum* datum; + while (free_.try_pop(&datum)) { + delete datum; + } + while (full_.try_pop(&datum)) { + delete datum; + } +} + +// + +#ifndef NO_IO_DEPENDENCIES + +DefaultDatabases::DBReader::DBReader(const DataParameter& param, int index) + : Reader::Body(param, index) { + StartInternalThread(); +} + +void DefaultDatabases::DBReader::InternalThreadEntry() { + URI uri(param_.source(index_), true); + LOG(INFO)<< "path " << uri.path(); + DataParameter_DB backend = + uri.scheme() == "lmdb" ? DataParameter::LMDB : DataParameter::LEVELDB; + shared_ptr db(db::GetDB(backend)); + db->Open(uri.path(), db::READ); + shared_ptr cursor(db->NewCursor()); + + // Check if we should randomly skip a few data points + if (param_.rand_skip()) { + unsigned int skip = caffe_rng_rand() % param_.rand_skip(); + LOG(INFO)<< "Skipping first " << skip << " data points."; + while (skip-- > 0) { + cursor->Next(); + } + } + try { + while (!must_stop()) { + Datum* datum = free_.pop(); + // TODO deserialize in-place instead of copy? + datum->ParseFromString(cursor->value()); + full_.push(datum); + + // go to the next iter + cursor->Next(); + if (!cursor->valid()) { + DLOG(INFO) << "Restarting data prefetching from start."; + cursor->SeekToFirst(); + } + } + } catch (boost::thread_interrupted&) { + // Interrupted exception is expected on shutdown + } +} + +#endif + +// + +void FileScheme::DescriptorReader::read_descriptor(int file_descriptor) { + FileInputStream input(file_descriptor); + while (!must_stop()) { + Datum* datum = free_.pop(); + CodedInputStream coded(&input); + uint32_t length; + CHECK(coded.ReadVarint32(&length)); + if (!length) { + break; + } + coded.PushLimit(length); + CHECK(datum->ParseFromCodedStream(&coded)); + CHECK((datum->data().size() > 0 || datum->float_data().size() > 0) && + datum->has_label()) << "Received invalid datum"; + full_.push(datum); + } +} + +FileScheme::FileReader::FileReader(const DataParameter& param, int index) + : DescriptorReader(param, index) { + StartInternalThread(); +} + +void FileScheme::FileReader::InternalThreadEntry() { + try { + while (!must_stop()) { + URI uri(param_.source(index_), true); + File file(uri.path(), O_RDONLY); + DLOG(INFO) << "Opened file " << uri.path(); + read_descriptor(file.descriptor()); + } + } catch (boost::thread_interrupted&) { + // Interrupted exception is expected on shutdown + } +} + +// + +SocketScheme::SocketReader::SocketReader(const DataParameter& param, int index) + : DescriptorReader(param, index) { + StartInternalThread(); +} + +void SocketScheme::SocketReader::InternalThreadEntry() { + try { + while (!must_stop()) { + URI uri(param_.source(index_)); + Socket socket(uri); + DLOG(INFO) << "Connected to " << uri.host() << ":" << uri.port(); + if (uri.scheme() == "http") { + string get = "GET " + uri.path() + " HTTP/1.1\r\n\r\n"; + size_t len = get.size(); + CHECK_EQ(write(socket.descriptor(), get.c_str(), len), len); + // Skip headers + for (;;) { + int line = 0; + char c = 0; + while (c != '\n') { + CHECK_EQ(read(socket.descriptor(), &c, 1), 1); + line++; + } + if (line == 2) // Break if line is /r/n + break; + } + } + read_descriptor(socket.descriptor()); + } + } catch (boost::thread_interrupted&) { + // Interrupted exception is expected on shutdown + } +} + +} // namespace caffe diff --git a/src/caffe/syncedmem.cpp b/src/caffe/syncedmem.cpp index 7617ccfb27f..0da7a3bac79 100644 --- a/src/caffe/syncedmem.cpp +++ b/src/caffe/syncedmem.cpp @@ -108,6 +108,18 @@ void* SyncedMemory::mutable_gpu_data() { #endif } +#ifndef CPU_ONLY +void SyncedMemory::async_gpu_push(const cudaStream_t& stream) { + CHECK(head_ == HEAD_AT_CPU); + if (gpu_ptr_ == NULL) { + CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_)); + } + const cudaMemcpyKind put = cudaMemcpyHostToDevice; + CUDA_CHECK(cudaMemcpyAsync(gpu_ptr_, cpu_ptr_, size_, put, stream)); + // Assume caller will synchronize on the stream before use + head_ = SYNCED; +} +#endif } // namespace caffe diff --git a/src/caffe/test/test_data_layer.cpp b/src/caffe/test/test_data_layer.cpp index afe2a40d227..28ed69af742 100644 --- a/src/caffe/test/test_data_layer.cpp +++ b/src/caffe/test/test_data_layer.cpp @@ -9,6 +9,7 @@ #include "caffe/data_layers.hpp" #include "caffe/filler.hpp" #include "caffe/proto/caffe.pb.h" +#include "caffe/scheme.hpp" #include "caffe/util/db.hpp" #include "caffe/util/io.hpp" @@ -24,14 +25,15 @@ class DataLayerTest : public MultiDeviceTest { protected: DataLayerTest() - : backend_(DataParameter_DB_LEVELDB), - blob_top_data_(new Blob()), + : blob_top_data_(new Blob()), blob_top_label_(new Blob()), seed_(1701) {} virtual void SetUp() { - filename_.reset(new string()); - MakeTempDir(filename_.get()); - *filename_ += "/db"; + for (int i = 0; i < SOURCES_COUNT; ++i) { + sources_[i].reset(new string()); + MakeTempDir(sources_[i].get()); + *sources_[i] += "/db"; + } blob_top_vec_.push_back(blob_top_data_); blob_top_vec_.push_back(blob_top_label_); } @@ -39,13 +41,12 @@ class DataLayerTest : public MultiDeviceTest { // Fill the DB with data: if unique_pixels, each pixel is unique but // all images are the same; else each image is unique but all pixels within // an image are the same. - void Fill(const bool unique_pixels, DataParameter_DB backend) { - backend_ = backend; - LOG(INFO) << "Using temporary dataset " << *filename_; + void Fill(const bool unique_pixels, DataParameter_DB backend, int index = 0) { + LOG(INFO) << "Using temporary dataset " << *sources_[index]; scoped_ptr db(db::GetDB(backend)); - db->Open(*filename_, db::NEW); + db->Open(*sources_[index], db::NEW); scoped_ptr txn(db->NewTransaction()); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { Datum datum; datum.set_label(i); datum.set_channels(2); @@ -64,6 +65,9 @@ class DataLayerTest : public MultiDeviceTest { } txn->Commit(); db->Close(); + + string scheme = backend == DataParameter_DB_LMDB ? "lmdb://" : "leveldb://"; + (*sources_[index]).insert(0, scheme); } void TestRead() { @@ -71,9 +75,8 @@ class DataLayerTest : public MultiDeviceTest { LayerParameter param; param.set_phase(TRAIN); DataParameter* data_param = param.mutable_data_param(); - data_param->set_batch_size(5); - data_param->set_source(filename_->c_str()); - data_param->set_backend(backend_); + data_param->set_batch_size(BATCH_SIZE); + data_param->add_source(*sources_[0]); TransformationParameter* transform_param = param.mutable_transform_param(); @@ -92,10 +95,10 @@ class DataLayerTest : public MultiDeviceTest { for (int iter = 0; iter < 100; ++iter) { layer.Forward(blob_bottom_vec_, blob_top_vec_); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { EXPECT_EQ(i, blob_top_label_->cpu_data()[i]); } - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { for (int j = 0; j < 24; ++j) { EXPECT_EQ(scale * i, blob_top_data_->cpu_data()[i * 24 + j]) << "debug: iter " << iter << " i " << i << " j " << j; @@ -107,9 +110,9 @@ class DataLayerTest : public MultiDeviceTest { void TestReshape(DataParameter_DB backend) { const int num_inputs = 5; // Save data of varying shapes. - LOG(INFO) << "Using temporary dataset " << *filename_; + LOG(INFO) << "Using temporary dataset " << *sources_[0]; scoped_ptr db(db::GetDB(backend)); - db->Open(*filename_, db::NEW); + db->Open(*sources_[0], db::NEW); scoped_ptr txn(db->NewTransaction()); for (int i = 0; i < num_inputs; ++i) { Datum datum; @@ -131,13 +134,15 @@ class DataLayerTest : public MultiDeviceTest { txn->Commit(); db->Close(); + string scheme = backend == DataParameter_DB_LMDB ? "lmdb://" : "leveldb://"; + (*sources_[0]).insert(0, scheme); + // Load and check data of various shapes. LayerParameter param; param.set_phase(TEST); DataParameter* data_param = param.mutable_data_param(); data_param->set_batch_size(1); - data_param->set_source(filename_->c_str()); - data_param->set_backend(backend); + data_param->add_source(sources_[0]->c_str()); DataLayer layer(param); layer.SetUp(blob_bottom_vec_, blob_top_vec_); @@ -171,14 +176,14 @@ class DataLayerTest : public MultiDeviceTest { void TestReadCrop(Phase phase) { const Dtype scale = 3; + const int batch = BATCH_SIZE; LayerParameter param; param.set_phase(phase); Caffe::set_random_seed(1701); DataParameter* data_param = param.mutable_data_param(); - data_param->set_batch_size(5); - data_param->set_source(filename_->c_str()); - data_param->set_backend(backend_); + data_param->set_batch_size(batch); + data_param->add_source(*sources_[0]); TransformationParameter* transform_param = param.mutable_transform_param(); @@ -187,22 +192,22 @@ class DataLayerTest : public MultiDeviceTest { DataLayer layer(param); layer.SetUp(blob_bottom_vec_, blob_top_vec_); - EXPECT_EQ(blob_top_data_->num(), 5); + EXPECT_EQ(blob_top_data_->num(), batch); EXPECT_EQ(blob_top_data_->channels(), 2); EXPECT_EQ(blob_top_data_->height(), 1); EXPECT_EQ(blob_top_data_->width(), 1); - EXPECT_EQ(blob_top_label_->num(), 5); + EXPECT_EQ(blob_top_label_->num(), batch); EXPECT_EQ(blob_top_label_->channels(), 1); EXPECT_EQ(blob_top_label_->height(), 1); EXPECT_EQ(blob_top_label_->width(), 1); for (int iter = 0; iter < 2; ++iter) { layer.Forward(blob_bottom_vec_, blob_top_vec_); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < batch; ++i) { EXPECT_EQ(i, blob_top_label_->cpu_data()[i]); } int num_with_center_value = 0; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < batch; ++i) { for (int j = 0; j < 2; ++j) { const Dtype center_value = scale * (j ? 17 : 5); num_with_center_value += @@ -227,9 +232,8 @@ class DataLayerTest : public MultiDeviceTest { LayerParameter param; param.set_phase(TRAIN); DataParameter* data_param = param.mutable_data_param(); - data_param->set_batch_size(5); - data_param->set_source(filename_->c_str()); - data_param->set_backend(backend_); + data_param->set_batch_size(BATCH_SIZE); + data_param->add_source(*sources_[0]); TransformationParameter* transform_param = param.mutable_transform_param(); @@ -244,11 +248,11 @@ class DataLayerTest : public MultiDeviceTest { layer1.SetUp(blob_bottom_vec_, blob_top_vec_); for (int iter = 0; iter < 2; ++iter) { layer1.Forward(blob_bottom_vec_, blob_top_vec_); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { EXPECT_EQ(i, blob_top_label_->cpu_data()[i]); } vector iter_crop_sequence; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { for (int j = 0; j < 2; ++j) { iter_crop_sequence.push_back( blob_top_data_->cpu_data()[i * 2 + j]); @@ -265,10 +269,10 @@ class DataLayerTest : public MultiDeviceTest { layer2.SetUp(blob_bottom_vec_, blob_top_vec_); for (int iter = 0; iter < 2; ++iter) { layer2.Forward(blob_bottom_vec_, blob_top_vec_); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { EXPECT_EQ(i, blob_top_label_->cpu_data()[i]); } - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { for (int j = 0; j < 2; ++j) { EXPECT_EQ(crop_sequence[iter][i * 2 + j], blob_top_data_->cpu_data()[i * 2 + j]) @@ -282,9 +286,8 @@ class DataLayerTest : public MultiDeviceTest { LayerParameter param; param.set_phase(TRAIN); DataParameter* data_param = param.mutable_data_param(); - data_param->set_batch_size(5); - data_param->set_source(filename_->c_str()); - data_param->set_backend(backend_); + data_param->set_batch_size(BATCH_SIZE); + data_param->add_source(*sources_[0]); TransformationParameter* transform_param = param.mutable_transform_param(); @@ -300,11 +303,11 @@ class DataLayerTest : public MultiDeviceTest { layer1.SetUp(blob_bottom_vec_, blob_top_vec_); for (int iter = 0; iter < 2; ++iter) { layer1.Forward(blob_bottom_vec_, blob_top_vec_); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { EXPECT_EQ(i, blob_top_label_->cpu_data()[i]); } vector iter_crop_sequence; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { for (int j = 0; j < 2; ++j) { iter_crop_sequence.push_back( blob_top_data_->cpu_data()[i * 2 + j]); @@ -321,11 +324,11 @@ class DataLayerTest : public MultiDeviceTest { layer2.SetUp(blob_bottom_vec_, blob_top_vec_); for (int iter = 0; iter < 2; ++iter) { layer2.Forward(blob_bottom_vec_, blob_top_vec_); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { EXPECT_EQ(i, blob_top_label_->cpu_data()[i]); } int num_sequence_matches = 0; - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < BATCH_SIZE; ++i) { for (int j = 0; j < 2; ++j) { num_sequence_matches += (crop_sequence[iter][i * 2 + j] == blob_top_data_->cpu_data()[i * 2 + j]); @@ -335,10 +338,89 @@ class DataLayerTest : public MultiDeviceTest { } } + void TestProbabilities() { + LayerParameter param; + DataParameter* data_param = param.mutable_data_param(); + data_param->set_batch_size(BATCH_SIZE); + for (int i = 0; i < SOURCES_COUNT; ++i) { + data_param->add_source(*sources_[i]); + data_param->add_probability(0); + } + + Caffe::set_random_seed(544432); + int counts[SOURCES_COUNT]; + + // Balanced two + data_param->set_probability(0, .5f); + data_param->set_probability(1, .5f); + caffe_memset(sizeof(counts), 0, counts); + probabilities_run(param, counts); + EXPECT_EQ(56, counts[0]); + EXPECT_EQ(59, counts[1]); + EXPECT_EQ(0, counts[2]); + + // Balanced three + data_param->set_probability(0, .33333333f); + data_param->set_probability(1, .33333333f); + data_param->set_probability(2, .33333333f); + caffe_memset(sizeof(counts), 0, counts); + probabilities_run(param, counts); + EXPECT_EQ(36, counts[0]); + EXPECT_EQ(34, counts[1]); + EXPECT_EQ(45, counts[2]); + + // Only one + data_param->set_probability(0, 0); + data_param->set_probability(1, 0); + data_param->set_probability(2, 1); + caffe_memset(sizeof(counts), 0, counts); + probabilities_run(param, counts); + EXPECT_EQ(0, counts[0]); + EXPECT_EQ(0, counts[1]); + EXPECT_EQ(115, counts[2]); + } + + void probabilities_run(LayerParameter param, int* counts) { + const int batch = BATCH_SIZE; + DataLayer layer(param); + layer.SetUp(blob_bottom_vec_, blob_top_vec_); + EXPECT_EQ(blob_top_data_->num(), batch); + EXPECT_EQ(blob_top_data_->channels(), 2); + EXPECT_EQ(blob_top_data_->height(), 3); + EXPECT_EQ(blob_top_data_->width(), 4); + EXPECT_EQ(blob_top_label_->num(), batch); + EXPECT_EQ(blob_top_label_->channels(), 1); + EXPECT_EQ(blob_top_label_->height(), 1); + EXPECT_EQ(blob_top_label_->width(), 1); + + const int examples = 100; + for (int iter = 0; iter < examples / batch; ++iter) { + layer.Forward(blob_bottom_vec_, blob_top_vec_); + } + + for (;;) { + int total = 0; + for (int i = 0; i < SOURCES_COUNT; ++i) { + URI uri(param.data_param().source(i)); + const shared_ptr& scheme = Scheme::get(uri.scheme()); + shared_ptr reader(scheme->get_reader(param.data_param(), i)); + counts[i] = reader->full().pops(); + total += counts[i]; + } + // Wait until prefetch queue refills, for reproducibility + const int prefetch = BasePrefetchingDataLayer::PREFETCH_COUNT; + if (total == examples + prefetch * batch) { + break; + } + usleep(1000); + } + } + virtual ~DataLayerTest() { delete blob_top_data_; delete blob_top_label_; } - DataParameter_DB backend_; - shared_ptr filename_; + static const int BATCH_SIZE = 5; + static const int SOURCES_COUNT = 3; + shared_ptr sources_[SOURCES_COUNT]; Blob* const blob_top_data_; Blob* const blob_top_label_; vector*> blob_bottom_vec_; @@ -424,4 +506,12 @@ TYPED_TEST(DataLayerTest, TestReadCropTestLMDB) { this->TestReadCrop(TEST); } +TYPED_TEST(DataLayerTest, TestTwoProbabilities) { + const bool unique_pixels = true; // all images the same; pixels different + this->Fill(unique_pixels, DataParameter_DB_LMDB, 0); + this->Fill(unique_pixels, DataParameter_DB_LEVELDB, 1); + this->Fill(unique_pixels, DataParameter_DB_LMDB, 2); + this->TestProbabilities(); +} + } // namespace caffe diff --git a/src/caffe/test/test_data_transformer.cpp b/src/caffe/test/test_data_transformer.cpp index 16570e20356..e89c9fdcf00 100644 --- a/src/caffe/test/test_data_transformer.cpp +++ b/src/caffe/test/test_data_transformer.cpp @@ -2,7 +2,6 @@ #include #include "gtest/gtest.h" -#include "leveldb/db.h" #include "caffe/blob.hpp" #include "caffe/common.hpp" diff --git a/src/caffe/test/test_hdf5_output_layer.cpp b/src/caffe/test/test_hdf5_output_layer.cpp index a23034f284a..914b01e37af 100644 --- a/src/caffe/test/test_hdf5_output_layer.cpp +++ b/src/caffe/test/test_hdf5_output_layer.cpp @@ -1,3 +1,5 @@ +#ifndef NO_IO_DEPENDENCIES + #include #include @@ -118,3 +120,5 @@ TYPED_TEST(HDF5OutputLayerTest, TestForward) { } } // namespace caffe + +#endif diff --git a/src/caffe/test/test_hdf5data_layer.cpp b/src/caffe/test/test_hdf5data_layer.cpp index c9b027f88cf..7e42fcf0b7a 100644 --- a/src/caffe/test/test_hdf5data_layer.cpp +++ b/src/caffe/test/test_hdf5data_layer.cpp @@ -1,3 +1,5 @@ +#ifndef NO_IO_DEPENDENCIES + #include #include @@ -133,3 +135,5 @@ TYPED_TEST(HDF5DataLayerTest, TestRead) { } } // namespace caffe + +#endif diff --git a/src/caffe/test/test_internal_thread.cpp b/src/caffe/test/test_internal_thread.cpp index 31882b6db1d..fe7cd36671d 100644 --- a/src/caffe/test/test_internal_thread.cpp +++ b/src/caffe/test/test_internal_thread.cpp @@ -13,9 +13,9 @@ class InternalThreadTest : public ::testing::Test {}; TEST_F(InternalThreadTest, TestStartAndExit) { InternalThread thread; EXPECT_FALSE(thread.is_started()); - EXPECT_TRUE(thread.StartInternalThread()); + thread.StartInternalThread(); EXPECT_TRUE(thread.is_started()); - EXPECT_TRUE(thread.WaitForInternalThreadToExit()); + thread.StopInternalThread(); EXPECT_FALSE(thread.is_started()); } diff --git a/src/caffe/test/test_io.cpp b/src/caffe/test/test_io.cpp index 4ab96311bbc..17dd8cc56bc 100644 --- a/src/caffe/test/test_io.cpp +++ b/src/caffe/test/test_io.cpp @@ -419,4 +419,39 @@ TEST_F(IOTest, TestDecodeDatumToCVMatContentNative) { } } +TEST_F(IOTest, TestURI) { + { + URI uri("file:///absolute/path", true); + EXPECT_EQ("file", uri.scheme()); + EXPECT_EQ("", uri.host()); + EXPECT_EQ("", uri.port()); + EXPECT_EQ("/absolute/path", uri.path()); + } + + { + URI uri("file://relative/path", true); + EXPECT_EQ("file", uri.scheme()); + EXPECT_EQ("", uri.host()); + EXPECT_EQ("", uri.port()); + EXPECT_EQ("relative/path", uri.path()); + } + + { + URI uri("tcp://host/path", false); + EXPECT_EQ("tcp", uri.scheme()); + EXPECT_EQ("host", uri.host()); + EXPECT_EQ("", uri.port()); + EXPECT_EQ("/path", uri.path()); + } +} + } // namespace caffe + + + + + + + + + diff --git a/src/caffe/test/test_upgrade_proto.cpp b/src/caffe/test/test_upgrade_proto.cpp index eec627656ef..4361089d465 100644 --- a/src/caffe/test/test_upgrade_proto.cpp +++ b/src/caffe/test/test_upgrade_proto.cpp @@ -1274,7 +1274,7 @@ TEST_F(NetUpgradeTest, TestSimple) { " name: 'data' " " type: 'Data' " " data_param { " - " source: '/home/jiayq/Data/ILSVRC12/train-leveldb' " + " source: 'leveldb:///home/jiayq/Data/ILSVRC12/train-leveldb' " " batch_size: 256 " " } " " transform_param { " @@ -2541,7 +2541,7 @@ TEST_F(NetUpgradeTest, TestImageNet) { " name: 'data' " " type: 'Data' " " data_param { " - " source: '/home/jiayq/Data/ILSVRC12/train-leveldb' " + " source: 'leveldb:///home/jiayq/Data/ILSVRC12/train-leveldb' " " batch_size: 256 " " } " " transform_param { " diff --git a/src/caffe/util/blocking_queue.cpp b/src/caffe/util/blocking_queue.cpp new file mode 100644 index 00000000000..db4c983e2b0 --- /dev/null +++ b/src/caffe/util/blocking_queue.cpp @@ -0,0 +1,87 @@ +#include +#include + +#include "caffe/data_layers.hpp" +#include "caffe/util/blocking_queue.hpp" + +namespace caffe { + +template +class blocking_queue::sync { + public: + mutable boost::mutex mutex_; + boost::condition_variable condition_; +}; + +template +blocking_queue::blocking_queue() + : sync_(new sync()), + last_wait_log_(time(0)), + pops_() { +} + +template +blocking_queue::~blocking_queue() { +} + +template +void blocking_queue::push(const T& t) { + boost::mutex::scoped_lock lock(sync_.get()->mutex_); + queue_.push(t); + lock.unlock(); + sync_.get()->condition_.notify_one(); +} + +template +bool blocking_queue::empty() const { + boost::mutex::scoped_lock lock(sync_.get()->mutex_); + return queue_.empty(); +} +template +bool blocking_queue::try_pop(T* t) { + boost::mutex::scoped_lock lock(sync_.get()->mutex_); + + if (queue_.empty()) + return false; + + *t = queue_.front(); + queue_.pop(); + return true; +} + +template +T blocking_queue::pop(const string& log_on_wait) { + boost::mutex::scoped_lock lock(sync_.get()->mutex_); + + while (queue_.empty()) { + if (!log_on_wait.empty()) { + time_t now = time(0); + if (now - last_wait_log_ > 5) { + last_wait_log_ = now; + LOG(INFO)<< log_on_wait; + } + } + sync_.get()->condition_.wait(lock); + } + + T t = queue_.front(); + queue_.pop(); + pops_++; + return t; +} + +template +T blocking_queue::peek() { + boost::mutex::scoped_lock lock(sync_.get()->mutex_); + + while (queue_.empty()) + sync_.get()->condition_.wait(lock); + + return queue_.front(); +} + +template class blocking_queue*>; +template class blocking_queue*>; +template class blocking_queue; + +} // namespace caffe diff --git a/src/caffe/util/db.cpp b/src/caffe/util/db.cpp index 7f7018107ec..fbe10f4abea 100644 --- a/src/caffe/util/db.cpp +++ b/src/caffe/util/db.cpp @@ -5,6 +5,8 @@ namespace caffe { namespace db { +#ifndef NO_IO_DEPENDENCIES + const size_t LMDB_MAP_SIZE = 1099511627776; // 1 TB void LevelDB::Open(const string& source, Mode mode) { @@ -28,7 +30,17 @@ void LMDB::Open(const string& source, Mode mode) { } int flags = 0; if (mode == READ) { - flags = MDB_RDONLY | MDB_NOTLS; + // No locking, assume DB is not written to at the same time, otherwise + // LMDB tries to lock the file, which fails if filesystem is read-only + flags = MDB_RDONLY | MDB_NOTLS | MDB_NOLOCK; + } + // Allow DB to be stand-alone file + { + struct stat st_buf; + stat(source.c_str(), &st_buf); + if (S_ISREG(st_buf.st_mode)) { + flags |= MDB_NOSUBDIR; + } } MDB_CHECK(mdb_env_open(mdb_env_, source.c_str(), flags, 0664)); LOG(INFO) << "Opened lmdb " << source; @@ -59,25 +71,30 @@ void LMDBTransaction::Put(const string& key, const string& value) { MDB_CHECK(mdb_put(mdb_txn_, *mdb_dbi_, &mdb_key, &mdb_value, 0)); } +#endif + DB* GetDB(DataParameter::DB backend) { switch (backend) { +#ifndef NO_IO_DEPENDENCIES case DataParameter_DB_LEVELDB: return new LevelDB(); case DataParameter_DB_LMDB: return new LMDB(); +#endif default: LOG(FATAL) << "Unknown database backend"; } } DB* GetDB(const string& backend) { +#ifndef NO_IO_DEPENDENCIES if (backend == "leveldb") { return new LevelDB(); } else if (backend == "lmdb") { return new LMDB(); - } else { - LOG(FATAL) << "Unknown database backend"; } +#endif + LOG(FATAL) << "Unknown database backend"; } } // namespace db diff --git a/src/caffe/util/io.cpp b/src/caffe/util/io.cpp index 77ef7f257f4..cae125b3989 100644 --- a/src/caffe/util/io.cpp +++ b/src/caffe/util/io.cpp @@ -1,15 +1,20 @@ +#include #include #include #include #include +#include #include #include #include #include #include +#include #include +#include #include // NOLINT(readability/streams) +#include #include #include @@ -228,6 +233,8 @@ void CVMatToDatum(const cv::Mat& cv_img, Datum* datum) { datum->set_data(buffer); } +#ifndef NO_IO_DEPENDENCIES + // Verifies format of data stored in HDF5 file and reshapes blob accordingly. template void hdf5_load_nd_dataset_helper( @@ -303,4 +310,78 @@ void hdf5_save_nd_dataset( CHECK_GE(status, 0) << "Failed to make double dataset " << dataset_name; } +#endif + +// http://stackoverflow.com/questions/2616011/easy-way-to-parse-a-url-in-c-cross-platform +URI::URI(const string& uri, bool no_host) + : scheme_(), host_(), port_(), path_() { + typedef string::const_iterator it; + it uriEnd = uri.end(); + + it schemeStart = uri.begin(); + it schemeEnd = find(schemeStart, uriEnd, ':'); + if (schemeEnd != uriEnd) { + string scheme = &*(schemeEnd); + if ((scheme.length() > 3) && (scheme.substr(0, 3) == "://")) { + scheme_ = string(schemeStart, schemeEnd); + schemeEnd += 3; + } else { + schemeEnd = uri.begin(); + } + } else { + schemeEnd = uri.begin(); + } + + if (no_host) { + path_ = string(schemeEnd, uriEnd); + } else { + it hostStart = schemeEnd; + it pathStart = find(hostStart, uriEnd, '/'); + it hostEnd = find(schemeEnd, pathStart, ':'); // check for port + host_ = string(hostStart, hostEnd); + if ((hostEnd != uriEnd) && ((&*(hostEnd))[0] == ':')) { + hostEnd++; + port_ = string(hostEnd, pathStart); + } + if (pathStart != uriEnd) { + path_ = string(pathStart, uriEnd); + } + } +} + +File::File(const string& path, int flags) { + fd_ = open(path.c_str(), flags); + CHECK_GE(fd_, 0)<< "Could not open file " << path; +} + +File::~File() { + close(fd_); +} + +Socket::Socket(const URI& uri) { + addrinfo *res; + addrinfo hints; + caffe_memset(sizeof hints, 0, &hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + int n = getaddrinfo(uri.host().c_str(), uri.port().c_str(), &hints, &res); + CHECK_GE(n, 0)<< gai_strerror(n) << ": " << uri.host() << ":" << uri.port(); + fd_ = -1; + for (addrinfo* t = res; t; t = t->ai_next) { + fd_ = socket(t->ai_family, t->ai_socktype, t->ai_protocol); + if (fd_ >= 0) { + if (!connect(fd_, t->ai_addr, t->ai_addrlen)) + break; + close(fd_); + fd_ = -1; + } + } + freeaddrinfo(res); + CHECK_GE(fd_, 0)<< "Could not connect to " << uri.host() << ":" << uri.port(); +} + +Socket::~Socket() { + close(fd_); +} + } // namespace caffe diff --git a/src/caffe/util/upgrade_proto.cpp b/src/caffe/util/upgrade_proto.cpp index 38a06026adf..f672d95843b 100644 --- a/src/caffe/util/upgrade_proto.cpp +++ b/src/caffe/util/upgrade_proto.cpp @@ -303,7 +303,7 @@ bool UpgradeV0LayerParameter(const V1LayerParameter& v0_layer_connection, } if (v0_layer_param.has_source()) { if (type == "data") { - layer_param->mutable_data_param()->set_source(v0_layer_param.source()); + layer_param->mutable_data_param()->add_source(v0_layer_param.source()); } else if (type == "hdf5_data") { layer_param->mutable_hdf5_data_param()->set_source( v0_layer_param.source()); @@ -731,6 +731,22 @@ bool UpgradeV1LayerParameter(const V1LayerParameter& v1_layer_param, if (v1_layer_param.has_data_param()) { layer_param->mutable_data_param()->CopyFrom( v1_layer_param.data_param()); + for (int i = 0; i < layer_param->data_param().source().size(); ++i) { + DataParameter_DB db = DataParameter_DB_LEVELDB; + if (layer_param->data_param().backend_size() == + layer_param->data_param().source().size()) { + db = layer_param->data_param().backend(i); + } + if (db == DataParameter_DB_LMDB) { + string s = "lmdb://" + layer_param->mutable_data_param()->source(i); + layer_param->mutable_data_param()->set_source(i, s); + } + if (db == DataParameter_DB_LEVELDB) { + string s = "leveldb://" + layer_param->mutable_data_param()->source(i); + layer_param->mutable_data_param()->set_source(i, s); + } + } + layer_param->mutable_data_param()->clear_backend(); } if (v1_layer_param.has_dropout_param()) { layer_param->mutable_dropout_param()->CopyFrom(