Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datum db #1568

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/dataset.hpp"
#include "caffe/datum_DB.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
Expand Down Expand Up @@ -100,8 +100,8 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
virtual void InternalThreadEntry();

shared_ptr<Dataset<string, Datum> > dataset_;
Dataset<string, Datum>::const_iterator iter_;
shared_ptr<DatumDB> datumdb_;
shared_ptr<DatumDBCursor> datum_cursor_;
};

/**
Expand Down
142 changes: 142 additions & 0 deletions include/caffe/datum_DB.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#ifndef CAFFE_DATUMDB_HPP
#define CAFFE_DATUMDB_HPP

#include <string>

#include "leveldb/db.h"
#include "lmdb.h"

#include "caffe/common.hpp"
#include "caffe/datum_DB_factory.hpp"
#include "caffe/proto/caffe.pb.h"

namespace caffe {

class DatumDBCursor {
public:
explicit DatumDBCursor(const DatumDBParameter& param)
: param_(param) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel too strongly about this, but I think it could be argued that param passing should be left up to subclasses, and subclasses should be explicit about what information needs to be given to cursors. E.g., it seems that LMDB and LevelDB only need the single boolean param_.loop(), but doing it this way obscures that fact and makes one wonder if params might affect cursors in arbitrary ways.

~DatumDBCursor() {
LOG(INFO) << "Closing DatumDBCursor on " << param_.source();
}

virtual bool Valid() = 0;
virtual void SeekToFirst() = 0;
virtual void Next() = 0;
virtual string key() = 0;
virtual Datum value() = 0;

protected:
DatumDBParameter param_;
DISABLE_COPY_AND_ASSIGN(DatumDBCursor);
};

class DatumDB {
public:
explicit DatumDB(const DatumDBParameter& param)
: param_(param),
is_opened_(new bool(false)) {}
virtual ~DatumDB() {
if (is_opened_.unique()) {
DatumDBRegistry::RemoveSource(param_.source());
}
}

virtual bool Get(const string& key, Datum* value) = 0;
virtual void Put(const string& key, const Datum& value) = 0;
virtual void Commit() = 0;
virtual DatumDBCursor* NewCursor() = 0;

protected:
virtual void Open() = 0;
virtual void Close() = 0;

DatumDBParameter param_;
shared_ptr<bool> is_opened_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm baffled by this... why shared_ptr<bool>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is shared is because I just want to open the DatumDB once per source and close it when the last Generator or reference is destroyed.
So all the DatumDB passed to Generators will share the same opened state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surely this trickery is no longer necessary, esp. given that copy is explicitly disabled on the next line?

DISABLE_COPY_AND_ASSIGN(DatumDB);
};

class DatumLevelDB : public DatumDB {
public:
explicit DatumLevelDB(const DatumDBParameter& param)
: DatumDB(param) { Open(); }
virtual ~DatumLevelDB() { Close(); }

virtual bool Get(const string& key, Datum* datum);
virtual void Put(const string& key, const Datum& datum);
virtual void Commit();
virtual DatumDBCursor* NewCursor();

protected:
virtual void Open();
virtual void Close();

shared_ptr<leveldb::DB> db_;
shared_ptr<leveldb::WriteBatch> batch_;
};

class DatumLevelDBCursor : public DatumDBCursor {
public:
explicit DatumLevelDBCursor(const DatumDBParameter& param,
leveldb::Iterator* iter)
: DatumDBCursor(param),
iter_(iter) { CHECK_NOTNULL(iter_); SeekToFirst(); }
~DatumLevelDBCursor() {
LOG(INFO) << "Closing DatumLevelDBCursor";
}
virtual bool Valid();
virtual void SeekToFirst();
virtual void Next();
virtual string key();
virtual Datum value();

protected:
leveldb::Iterator* iter_;
};

class DatumLMDB : public DatumDB {
public:
explicit DatumLMDB(const DatumDBParameter& param)
: DatumDB(param) { Open(); }
virtual ~DatumLMDB() { Close(); }

virtual bool Get(const string& key, Datum* datum);
virtual void Put(const string& key, const Datum& datum);
virtual void Commit();
virtual DatumDBCursor* NewCursor();

protected:
virtual void Open();
virtual void Close();

MDB_env* mdb_env_;
MDB_txn* mdb_txn_;
MDB_dbi mdb_dbi_;
};

class DatumLMDBCursor : public DatumDBCursor {
public:
explicit DatumLMDBCursor(const DatumDBParameter& param,
MDB_cursor* mdb_cursor)
: DatumDBCursor(param),
mdb_cursor_(mdb_cursor) { CHECK_NOTNULL(mdb_cursor_); SeekToFirst(); }
~DatumLMDBCursor() {
LOG(INFO) << "Closing DatumLMDBCursor";
mdb_cursor_close(mdb_cursor_);
}
virtual bool Valid();
virtual void SeekToFirst();
virtual void Next();
virtual string key();
virtual Datum value();

protected:
MDB_cursor* mdb_cursor_;
MDB_val mdb_key_, mdb_value_;
int mdb_status_;
};


} // namespace caffe

#endif // CAFFE_DATUMDB_HPP
98 changes: 98 additions & 0 deletions include/caffe/datum_DB_factory.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#ifndef CAFFE_DATUMDB_FACTORY_H_
#define CAFFE_DATUMDB_FACTORY_H_

#include <map>
#include <string>

#include "caffe/common.hpp"
#include "caffe/proto/caffe.pb.h"

namespace caffe {

class DatumDB;

class DatumDBRegistry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to keep with the registry pattern, I wonder if we could template it away? That might get overly abstract though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could, but I made it a bit more specific to store only one DatumDB per source, this avoids opening twice the same DB.

But it took me a while to get this right, so maybe it will be a good idea to template it.

public:
typedef DatumDB* (*Creator)(const DatumDBParameter&);
typedef std::map<string, Creator> CreatorRegistry;
typedef std::map<string, DatumDB* > SourceRegistry;

static CreatorRegistry& Registry() {
static CreatorRegistry* g_registry_ = new CreatorRegistry();
return *g_registry_;
}

static SourceRegistry& Sources() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is here to prevent multiple opens of the same DB? If so, it seems like an odd place for that... what I expect in a *Registry class is code dealing with the static-variable registration trick, but database paths come at run time. I think if we need this it would make more sense in the DB class itself (or its subclasses, if we need it for only some types of DBs).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is there to prevent multiple opens of the same DB. So yeah, it maybe possible to move to the Open method of the datumDB, but then there will need to be a static SourceRegistry
The good side will be that the *Registry class will become more similar to the LayerRegistry and could be abstracted away if want to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there's nothing wrong with using static members in non-factory classes where appropriate (which is admittedly rare).

static SourceRegistry* s_registry_ = new SourceRegistry();
return *s_registry_;
}

// Adds a creator.
static void AddCreator(const string& backend,
Creator creator) {
CreatorRegistry& registry = Registry();
CHECK_EQ(registry.count(backend), 0)
<< "DatumDB backend " << backend << " already registered.";
registry[backend] = creator;
}

// Get a DatumDB using a DatumDBParameter.
static DatumDB* GetDatumDB(const DatumDBParameter& param) {
SourceRegistry& sources = Sources();
SourceRegistry::iterator it = sources.find(param.source());
if (it != sources.end()) {
LOG(INFO) << "Reusing DatumDB " << param.source();
return (*it).second;
} else {
LOG(INFO) << "Creating DatumDB " << param.source();
const string& backend = param.backend();
CreatorRegistry& registry = Registry();
CHECK_EQ(registry.count(backend), 1);
DatumDB* datumdb = registry[backend](param);
if (param.unique_source()) {
sources[param.source()] = datumdb;
}
return datumdb;
}
}

static bool RemoveSource(const string& source) {
SourceRegistry& sources = Sources();
SourceRegistry::iterator it = sources.find(source);
if (it != sources.end()) {
LOG(INFO) << "Removing Source " << source;
sources.erase(it);
return true;
}
return false;
}

private:
// Layer registry should never be instantiated - everything is done with its
// static variables.
DatumDBRegistry() {}
};

class DatumDBRegisterer {
public:
DatumDBRegisterer(const string& backend,
DatumDB* (*creator)(const DatumDBParameter&)) {
LOG(INFO) << "Registering DatumDB backend: " << backend;
DatumDBRegistry::AddCreator(backend, creator);
}
};


#define REGISTER_DATUMDB_CREATOR(backend, creator) \
static DatumDBRegisterer g_datumdb_##creator(backend, creator);

#define REGISTER_DATUMDB_CLASS(backend, clsname) \
DatumDB* Creator_##clsname(const DatumDBParameter& param) { \
return new clsname(param); \
} \
REGISTER_DATUMDB_CREATOR(backend, Creator_##clsname)


} // namespace caffe

#endif // CAFFE_DATUMDB_FACTORY_H_
63 changes: 63 additions & 0 deletions include/caffe/datum_imagesdb.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#ifndef CAFFE_IMAGESDB_HPP
#define CAFFE_IMAGESDB_HPP

#include <fstream> // NOLINT(readability/streams)
#include <iterator>
#include <map>
#include <string>
#include <utility>
#include <vector>

#include "caffe/datum_DB.hpp"

namespace caffe {

class DatumImagesDB : public DatumDB {
public:
explicit DatumImagesDB(const DatumDBParameter& param)
: DatumDB(param) { Open(); }
virtual ~DatumImagesDB() { Close(); }

virtual bool Get(const string& key, Datum* datum);
virtual void Put(const string& key, const Datum& datum);
virtual void Commit();
virtual DatumDBCursor* NewCursor();

protected:
virtual void Open();
virtual void Close();

std::fstream file_;
shared_ptr<map<string, Datum> > datum_database_;
vector<string> keys_;
vector<pair<string, Datum> > batch_;
};

class DatumImagesDBCursor : public DatumDBCursor {
public:
explicit DatumImagesDBCursor(const DatumDBParameter& param,
shared_ptr<map<string, Datum> > datum_database, vector<string> keys)
: DatumDBCursor(param),
datum_database_(datum_database),
keys_(keys) { SeekToFirst(); }
~DatumImagesDBCursor() {
LOG(INFO) << "Closing DatumImagesDBCursor";
}

virtual bool Valid();
virtual void SeekToFirst();
virtual void Next();
virtual string key();
virtual Datum value();

protected:
virtual void ShuffleKeys();

shared_ptr<map<string, Datum> > datum_database_;
vector<string> keys_;
vector<string>::iterator read_it_;
};

} // namespace caffe

#endif // CAFFE_IMAGESDB_HPP
Loading