Skip to content

Commit

Permalink
Optimize SpatialDB
Browse files Browse the repository at this point in the history
Summary:
Two things:
1. Use hash-based index for data column family
2. Use Get() instead of Iterator Seek() when DB is opened read-only

Test Plan: added read-only test in unit test

Reviewers: yinwang

Reviewed By: yinwang

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22323
  • Loading branch information
igorcanadi committed Aug 26, 2014
1 parent 2386185 commit ff6ec0e
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 105 deletions.
244 changes: 173 additions & 71 deletions utilities/spatialdb/spatial_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <unordered_set>

#include "rocksdb/cache.h"
#include "rocksdb/options.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/stackable_db.h"
Expand Down Expand Up @@ -244,13 +246,76 @@ std::string FeatureSet::DebugString() const {
return out + "}";
}

class ValueGetter {
public:
ValueGetter() {}
virtual ~ValueGetter() {}

virtual bool Get(uint64_t id) = 0;
virtual const Slice value() const = 0;

virtual Status status() const = 0;
};

class ValueGetterFromDB : public ValueGetter {
public:
ValueGetterFromDB(DB* db, ColumnFamilyHandle* cf) : db_(db), cf_(cf) {}

virtual bool Get(uint64_t id) override {
std::string encoded_id;
PutFixed64BigEndian(&encoded_id, id);
status_ = db_->Get(ReadOptions(), cf_, encoded_id, &value_);
if (status_.IsNotFound()) {
status_ = Status::Corruption("Index inconsistency");
return false;
}

return true;
}

virtual const Slice value() const override { return value_; }

virtual Status status() const override { return status_; }

private:
std::string value_;
DB* db_;
ColumnFamilyHandle* cf_;
Status status_;
};

class ValueGetterFromIterator : public ValueGetter {
public:
explicit ValueGetterFromIterator(Iterator* iterator) : iterator_(iterator) {}

virtual bool Get(uint64_t id) override {
std::string encoded_id;
PutFixed64BigEndian(&encoded_id, id);
iterator_->Seek(encoded_id);

if (!iterator_->Valid() || iterator_->key() != Slice(encoded_id)) {
status_ = Status::Corruption("Index inconsistency");
return false;
}

return true;
}

virtual const Slice value() const override { return iterator_->value(); }

virtual Status status() const override { return status_; }

private:
std::unique_ptr<Iterator> iterator_;
Status status_;
};

class SpatialIndexCursor : public Cursor {
public:
// tile_box is inclusive
SpatialIndexCursor(Iterator* spatial_iterator, Iterator* data_iterator,
SpatialIndexCursor(Iterator* spatial_iterator, ValueGetter* value_getter,
const BoundingBox<uint64_t>& tile_bbox, uint32_t tile_bits)
: data_iterator_(data_iterator),
valid_(true) {
: value_getter_(value_getter), valid_(true) {
// calculate quad keys we'll need to query
std::vector<uint64_t> quad_keys;
quad_keys.reserve((tile_bbox.max_x - tile_bbox.min_x + 1) *
Expand Down Expand Up @@ -329,7 +394,7 @@ class SpatialIndexCursor : public Cursor {
if (!status_.ok()) {
return status_;
}
return data_iterator_->status();
return value_getter_->status();
}

private:
Expand All @@ -356,32 +421,23 @@ class SpatialIndexCursor : public Cursor {
return true;
}

// doesn't return anything, but sets valid_ and status_ on corruption
void ExtractData() {
assert(valid_);
std::string encoded_id;
PutFixed64BigEndian(&encoded_id, *primary_keys_iterator_);
valid_ = value_getter_->Get(*primary_keys_iterator_);

data_iterator_->Seek(encoded_id);

if (!data_iterator_->Valid() ||
data_iterator_->key() != Slice(encoded_id)) {
status_ = Status::Corruption("Index inconsistency");
valid_ = false;
return;
if (valid_) {
Slice data = value_getter_->value();
current_feature_set_.Clear();
if (!GetLengthPrefixedSlice(&data, &current_blob_) ||
!current_feature_set_.Deserialize(data)) {
status_ = Status::Corruption("Primary key column family corruption");
valid_ = false;
}
}

Slice data = data_iterator_->value();
current_feature_set_.Clear();
if (!GetLengthPrefixedSlice(&data, &current_blob_) ||
!current_feature_set_.Deserialize(data)) {
status_ = Status::Corruption("Primary key column family corruption");
valid_ = false;
return;
}
}

unique_ptr<Iterator> data_iterator_;
unique_ptr<ValueGetter> value_getter_;
bool valid_;
Status status_;

Expand Down Expand Up @@ -427,10 +483,11 @@ class SpatialDBImpl : public SpatialDB {
DB* db, ColumnFamilyHandle* data_column_family,
const std::vector<std::pair<SpatialIndexOptions, ColumnFamilyHandle*>>&
spatial_indexes,
uint64_t next_id)
uint64_t next_id, bool read_only)
: SpatialDB(db),
data_column_family_(data_column_family),
next_id_(next_id) {
next_id_(next_id),
read_only_(read_only) {
for (const auto& index : spatial_indexes) {
name_to_index_.insert(
{index.first.name, IndexColumnFamily(index.first, index.second)});
Expand Down Expand Up @@ -521,17 +578,26 @@ class SpatialDBImpl : public SpatialDB {
return new ErrorCursor(Status::InvalidArgument(
"Spatial index " + spatial_index + " not found"));
}
const auto& si = itr->second.index;
Iterator* spatial_iterator;
ValueGetter* value_getter;

std::vector<Iterator*> iterators;
Status s = NewIterators(read_options,
{data_column_family_, itr->second.column_family},
&iterators);
if (!s.ok()) {
return new ErrorCursor(s);
}
if (read_only_) {
spatial_iterator = NewIterator(read_options, itr->second.column_family);
value_getter = new ValueGetterFromDB(this, data_column_family_);
} else {
std::vector<Iterator*> iterators;
Status s = NewIterators(read_options,
{data_column_family_, itr->second.column_family},
&iterators);
if (!s.ok()) {
return new ErrorCursor(s);
}

const auto& si = itr->second.index;
return new SpatialIndexCursor(iterators[1], iterators[0],
spatial_iterator = iterators[1];
value_getter = new ValueGetterFromIterator(iterators[0]);
}
return new SpatialIndexCursor(spatial_iterator, value_getter,
GetTileBoundingBox(si, bbox), si.tile_bits);
}

Expand All @@ -548,31 +614,61 @@ class SpatialDBImpl : public SpatialDB {
std::unordered_map<std::string, IndexColumnFamily> name_to_index_;

std::atomic<uint64_t> next_id_;
bool read_only_;
};

namespace {
Options GetRocksDBOptionsFromOptions(const SpatialDBOptions& options) {
Options rocksdb_options;
rocksdb_options.IncreaseParallelism(options.num_threads);
rocksdb_options.write_buffer_size = 256 * 1024 * 1024; // 256MB
rocksdb_options.max_bytes_for_level_base = 1024 * 1024 * 1024; // 1 GB
DBOptions GetDBOptions(const SpatialDBOptions& options) {
DBOptions db_options;
db_options.IncreaseParallelism(options.num_threads);
if (options.bulk_load) {
db_options.disableDataSync = true;
}
return db_options;
}

ColumnFamilyOptions GetColumnFamilyOptions(const SpatialDBOptions& options,
std::shared_ptr<Cache> block_cache) {
ColumnFamilyOptions column_family_options;
column_family_options.write_buffer_size = 256 * 1024 * 1024; // 256MB
column_family_options.max_bytes_for_level_base = 1024 * 1024 * 1024; // 1 GB
// only compress levels >= 1
rocksdb_options.compression_per_level.resize(rocksdb_options.num_levels);
for (int i = 0; i < rocksdb_options.num_levels; ++i) {
column_family_options.compression_per_level.resize(
column_family_options.num_levels);
for (int i = 0; i < column_family_options.num_levels; ++i) {
if (i == 0) {
rocksdb_options.compression_per_level[i] = kNoCompression;
column_family_options.compression_per_level[i] = kNoCompression;
} else {
rocksdb_options.compression_per_level[i] = kLZ4Compression;
column_family_options.compression_per_level[i] = kLZ4Compression;
}
}
BlockBasedTableOptions table_options;
table_options.block_cache = NewLRUCache(options.cache_size);
rocksdb_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
table_options.block_cache = block_cache;
column_family_options.table_factory.reset(
NewBlockBasedTableFactory(table_options));
if (options.bulk_load) {
rocksdb_options.PrepareForBulkLoad();
}
return rocksdb_options;
column_family_options.level0_file_num_compaction_trigger = (1 << 30);
column_family_options.level0_slowdown_writes_trigger = (1 << 30);
column_family_options.level0_stop_writes_trigger = (1 << 30);
column_family_options.disable_auto_compactions = true;
column_family_options.source_compaction_factor = (1 << 30);
column_family_options.num_levels = 2;
column_family_options.target_file_size_base = 256 * 1024 * 1024;
column_family_options.max_mem_compaction_level = 0;
}
return column_family_options;
}

ColumnFamilyOptions OptimizeOptionsForDataColumnFamily(
ColumnFamilyOptions options, std::shared_ptr<Cache> block_cache) {
options.prefix_extractor.reset(NewNoopTransform());
BlockBasedTableOptions block_based_options;
block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
block_based_options.block_cache = block_cache;
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
return options;
}

} // namespace

class MetadataStorage {
Expand Down Expand Up @@ -618,26 +714,30 @@ class MetadataStorage {
Status SpatialDB::Create(
const SpatialDBOptions& options, const std::string& name,
const std::vector<SpatialIndexOptions>& spatial_indexes) {
Options rocksdb_options = GetRocksDBOptionsFromOptions(options);
rocksdb_options.create_if_missing = true;
rocksdb_options.create_missing_column_families = true;
rocksdb_options.error_if_exists = true;
DBOptions db_options = GetDBOptions(options);
db_options.create_if_missing = true;
db_options.create_missing_column_families = true;
db_options.error_if_exists = true;

auto block_cache = NewLRUCache(options.cache_size);
ColumnFamilyOptions column_family_options =
GetColumnFamilyOptions(options, block_cache);

std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options)));
column_families.push_back(ColumnFamilyDescriptor(
kMetadataColumnFamilyName, ColumnFamilyOptions(rocksdb_options)));
kDefaultColumnFamilyName,
OptimizeOptionsForDataColumnFamily(column_family_options, block_cache)));
column_families.push_back(
ColumnFamilyDescriptor(kMetadataColumnFamilyName, column_family_options));

for (const auto& index : spatial_indexes) {
column_families.emplace_back(GetSpatialIndexColumnFamilyName(index.name),
ColumnFamilyOptions(rocksdb_options));
column_family_options);
}

std::vector<ColumnFamilyHandle*> handles;
DB* base_db;
Status s = DB::Open(DBOptions(rocksdb_options), name, column_families,
&handles, &base_db);
Status s = DB::Open(db_options, name, column_families, &handles, &base_db);
if (!s.ok()) {
return s;
}
Expand All @@ -659,13 +759,15 @@ Status SpatialDB::Create(

Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,
SpatialDB** db, bool read_only) {
Options rocksdb_options = GetRocksDBOptionsFromOptions(options);
DBOptions db_options = GetDBOptions(options);
auto block_cache = NewLRUCache(options.cache_size);
ColumnFamilyOptions column_family_options =
GetColumnFamilyOptions(options, block_cache);

Status s;
std::vector<std::string> existing_column_families;
std::vector<std::string> spatial_indexes;
s = DB::ListColumnFamilies(DBOptions(rocksdb_options), name,
&existing_column_families);
s = DB::ListColumnFamilies(db_options, name, &existing_column_families);
if (!s.ok()) {
return s;
}
Expand All @@ -678,22 +780,22 @@ Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,

std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options)));
column_families.push_back(ColumnFamilyDescriptor(
kMetadataColumnFamilyName, ColumnFamilyOptions(rocksdb_options)));
kDefaultColumnFamilyName,
OptimizeOptionsForDataColumnFamily(column_family_options, block_cache)));
column_families.push_back(
ColumnFamilyDescriptor(kMetadataColumnFamilyName, column_family_options));

for (const auto& index : spatial_indexes) {
column_families.emplace_back(GetSpatialIndexColumnFamilyName(index),
ColumnFamilyOptions(rocksdb_options));
column_family_options);
}
std::vector<ColumnFamilyHandle*> handles;
DB* base_db;
if (read_only) {
s = DB::OpenForReadOnly(DBOptions(rocksdb_options), name, column_families,
&handles, &base_db);
s = DB::OpenForReadOnly(db_options, name, column_families, &handles,
&base_db);
} else {
s = DB::Open(DBOptions(rocksdb_options), name, column_families, &handles,
&base_db);
s = DB::Open(db_options, name, column_families, &handles, &base_db);
}
if (!s.ok()) {
return s;
Expand Down Expand Up @@ -730,13 +832,13 @@ Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,
for (auto h : handles) {
delete h;
}
delete db;
delete base_db;
return s;
}

// I don't need metadata column family any more, so delete it
delete handles[1];
*db = new SpatialDBImpl(base_db, handles[0], index_cf, next_id);
*db = new SpatialDBImpl(base_db, handles[0], index_cf, next_id, read_only);
return Status::OK();
}

Expand Down
Loading

0 comments on commit ff6ec0e

Please sign in to comment.