Skip to content

Commit

Permalink
DeleteFilesInRange before DeleteRange (#94)
Browse files Browse the repository at this point in the history
* fix cstore bugs and primary decode bug

* DeleteFilesInRange before DeleteRange
  • Loading branch information
lgqss authored Jul 6, 2020
1 parent 210a85a commit af87665
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 11 deletions.
10 changes: 9 additions & 1 deletion include/engine/rocks_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <string>
#include "rocksdb/db.h"
#include "rocksdb/convenience.h"
#include "rocksdb/slice.h"
#include "rocksdb/cache.h"
#include "rocksdb/options.h"
Expand Down Expand Up @@ -110,7 +111,14 @@ class RocksWrapper {
rocksdb::Status remove_range(const rocksdb::WriteOptions& options,
rocksdb::ColumnFamilyHandle* column_family,
const rocksdb::Slice& begin,
const rocksdb::Slice& end) {
const rocksdb::Slice& end,
bool delete_files_in_range) {
if (delete_files_in_range) {
auto s = rocksdb::DeleteFilesInRange(_txn_db, column_family, &begin, &end, false);
if (!s.ok()) {
return s;
}
}
return _txn_db->DeleteRange(options, column_family, begin, end);
}

Expand Down
1 change: 1 addition & 0 deletions src/engine/rocks_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ int32_t RocksWrapper::init(const std::string& path) {
db_options.max_open_files = FLAGS_rocks_max_open_files;
db_options.WAL_ttl_seconds = 10 * 60;
db_options.WAL_size_limit_MB = 0;
db_options.avoid_unnecessary_blocking_io = true;
db_options.max_background_compactions = 20;
if (FLAGS_rocks_kSkipAnyCorruptedRecords) {
db_options.wal_recovery_mode = rocksdb::WALRecoveryMode::kSkipAnyCorruptedRecords;
Expand Down
5 changes: 4 additions & 1 deletion src/exec/truncate_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ int TruncateNode::open(RuntimeState* state) {

rocksdb::Slice begin(region_start.data());
rocksdb::Slice end(region_end.data());
auto res = _db->remove_range(write_options, _data_cf, begin, end);
TimeCost cost;
auto res = _db->remove_range(write_options, _data_cf, begin, end, true);
if (!res.ok()) {
DB_WARNING_STATE(state, "truncate table failed: table:%ld, region:%ld, code=%d, msg=%s",
_table_id, _region_id, res.code(), res.ToString().c_str());
return -1;
}
DB_WARNING_STATE(state, "truncate table:%ld, region:%ld, cost:%ld",
_table_id, _region_id, cost.get_time());
/*
res = _db->compact_range(rocksdb::CompactRangeOptions(), _data_cf, &begin, &end);
if (!res.ok()) {
Expand Down
3 changes: 2 additions & 1 deletion src/meta_server/meta_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ int MetaStateMachine::on_snapshot_load(braft::SnapshotReader* reader) {
auto status = RocksWrapper::get_instance()->remove_range(options,
RocksWrapper::get_instance()->get_meta_info_handle(),
remove_start_key,
MetaServer::MAX_IDENTIFY);
MetaServer::MAX_IDENTIFY,
false);
if (!status.ok()) {
DB_FATAL("remove_range error when on snapshot load: code=%d, msg=%s",
status.code(), status.ToString().c_str());
Expand Down
3 changes: 2 additions & 1 deletion src/raft/my_raft_log_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ int MyRaftLogStorage::truncate_prefix(const int64_t first_index_kept) {
auto status = _db->remove_range(rocksdb::WriteOptions(),
_handle,
rocksdb::Slice(start_key, LOG_DATA_KEY_SIZE),
rocksdb::Slice(end_key, LOG_DATA_KEY_SIZE));
rocksdb::Slice(end_key, LOG_DATA_KEY_SIZE),
true);
if (!status.ok()) {
DB_WARNING("tuncate log entry fail, region_id: %ld, truncate to first index kept:%ld from first log index:%ld",
_region_id, first_index_kept, _first_log_index.load());
Expand Down
6 changes: 3 additions & 3 deletions src/store/meta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ int MetaWriter::clear_txn_log_index(int64_t region_id) {
std::string start_key = transcation_log_index_key(region_id, 0);
std::string end_key = transcation_log_index_key(region_id, UINT64_MAX);
auto status = _rocksdb->remove_range(MetaWriter::write_options, _meta_cf,
start_key, end_key);
start_key, end_key, false);
if (!status.ok()) {
DB_WARNING("remove_range error: code=%d, msg=%s, region_id: %ld",
status.code(), status.ToString().c_str(), region_id);
Expand All @@ -303,7 +303,7 @@ int MetaWriter::clear_txn_infos(int64_t region_id) {
std::string start_key = transcation_pb_key(region_id, 0, 0);
std::string end_key = transcation_pb_key(region_id, UINT64_MAX, INT64_MAX);
auto status = _rocksdb->remove_range(MetaWriter::write_options, _meta_cf,
start_key, end_key);
start_key, end_key, false);
if (!status.ok()) {
DB_WARNING("remove_range error: code=%d, msg=%s, region_id: %ld",
status.code(), status.ToString().c_str(), region_id);
Expand All @@ -316,7 +316,7 @@ int MetaWriter::clear_pre_commit_infos(int64_t region_id) {
std::string start_key = pre_commit_key(region_id, 0);
std::string end_key = pre_commit_key(region_id, UINT64_MAX);
auto status = _rocksdb->remove_range(MetaWriter::write_options, _meta_cf,
start_key, end_key);
start_key, end_key, false);
if (!status.ok()) {
DB_WARNING("remove_range error: code=%d, msg=%s, region_id: %ld",
status.code(), status.ToString().c_str(), region_id);
Expand Down
2 changes: 1 addition & 1 deletion src/store/region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5015,7 +5015,7 @@ void Region::delete_local_rocksdb_for_ddl() {
MutTableKey end_key;
begin_key.append_i64(_region_id).append_i64(_ddl_param.index_id);
end_key.append_i64(_region_id).append_i64(_ddl_param.index_id).append_u64(0xFFFFFFFFFFFFFFFF);
auto res = _rocksdb->remove_range(write_options, _data_cf, begin_key.data(), end_key.data());
auto res = _rocksdb->remove_range(write_options, _data_cf, begin_key.data(), end_key.data(), true);
if (!res.ok()) {
DB_FATAL("DDL_LOG remove_index error: code=%d, msg=%s, region_id: %ld",
res.code(), res.ToString().c_str(), _region_id);
Expand Down
9 changes: 6 additions & 3 deletions src/store/region_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ int RegionControl::remove_data(int64_t drop_region_id) {
}
TimeCost cost;
auto res = rocksdb->remove_range(options, data_cf,
start_key.data(), end_key.data());
start_key.data(), end_key.data(), true);
if (!res.ok()) {
DB_WARNING("remove_range error: code=%d, msg=%s, region_id: %ld",
res.code(), res.ToString().c_str(), drop_region_id);
Expand Down Expand Up @@ -89,7 +89,7 @@ void RegionControl::compact_data_in_queue(int64_t region_id) {
}
in_compact_regions[region_id] = true;
Store::get_instance()->compact_queue().run([region_id]() {
if (in_compact_regions.count(region_id) == 0) {
if (in_compact_regions.count(region_id) == 1) {
if (!Store::get_instance()->is_shutdown()) {
RegionControl::compact_data(region_id);
in_compact_regions.erase(region_id);
Expand All @@ -116,7 +116,8 @@ int RegionControl::remove_log_entry(int64_t drop_region_id) {
auto status = rocksdb->remove_range(options,
rocksdb->get_raft_log_handle(),
start_key.data(),
end_key.data());
end_key.data(),
true);
if (!status.ok()) {
DB_WARNING("remove_range error: code=%d, msg=%s, region_id: %ld",
status.code(), status.ToString().c_str(), drop_region_id);
Expand Down Expand Up @@ -145,6 +146,8 @@ int RegionControl::remove_snapshot_path(int64_t drop_region_id) {
return 0;
}
int RegionControl::clear_all_infos_for_region(int64_t drop_region_id) {
DB_WARNING("region_id: %ld, clear_all_infos_for_region do compact in queue", drop_region_id);
compact_data_in_queue(drop_region_id);
remove_data(drop_region_id);
remove_meta(drop_region_id);
remove_snapshot_path(drop_region_id);
Expand Down

0 comments on commit af87665

Please sign in to comment.