Skip to content

Commit

Permalink
add DelByType Interface && bugfix in blackwidow::*Scan() (OpenAtomFou…
Browse files Browse the repository at this point in the history
…ndation#98)

* add DelByType() to blackwidow, for codis/pika migration (OpenAtomFoundation#3)

* bugfix in blackwidow::*Scan()
  • Loading branch information
Axlgrep authored Jul 29, 2018
1 parent eb2d2c7 commit f19970d
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 3 deletions.
6 changes: 6 additions & 0 deletions include/blackwidow/blackwidow.h
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,12 @@ class BlackWidow {
int64_t Del(const std::vector<std::string>& keys,
std::map<DataType, Status>* type_status);

// Removes the specified keys of the specified type
// return -1 operation exception errors happen in database
// return >=0 the number of keys that were removed
int64_t DelByType(const std::vector<std::string>& keys,
DataType type);

// Iterate over a collection of elements
// return an updated cursor that the user need to use as the cursor argument
// in the next call
Expand Down
78 changes: 78 additions & 0 deletions src/blackwidow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,84 @@ int64_t BlackWidow::Del(const std::vector<std::string>& keys,
}
}

int64_t BlackWidow::DelByType(const std::vector<std::string>& keys,
DataType type) {
Status s;
int64_t count = 0;
bool is_corruption = false;

for (const auto& key : keys) {
switch (type) {
// Strings
case DataType::kStrings:
{
s = strings_db_->Del(key);
if (s.ok()) {
count++;
} else if (!s.IsNotFound()) {
is_corruption = true;
}
break;
}
// Hashes
case DataType::kHashes:
{
s = hashes_db_->Del(key);
if (s.ok()) {
count++;
} else if (!s.IsNotFound()) {
is_corruption = true;
}
break;
}
// Sets
case DataType::kSets:
{
s = sets_db_->Del(key);
if (s.ok()) {
count++;
} else if (!s.IsNotFound()) {
is_corruption = true;
}
break;
}
// Lists
case DataType::kLists:
{
s = lists_db_->Del(key);
if (s.ok()) {
count++;
} else if (!s.IsNotFound()) {
is_corruption = true;
}
break;
}
// ZSets
case DataType::kZSets:
{
s = zsets_db_->Del(key);
if (s.ok()) {
count++;
} else if (!s.IsNotFound()) {
is_corruption = true;
}
break;
}
case DataType::kAll:
{
return -1;
}
}
}

if (is_corruption) {
return -1;
} else {
return count;
}
}


int64_t BlackWidow::Exists(const std::vector<std::string>& keys,
std::map<DataType, Status>* type_status) {
int64_t count = 0;
Expand Down
5 changes: 4 additions & 1 deletion src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ Status RedisHashes::HStrlen(const Slice& key,

Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& pattern,
int64_t count, std::vector<FieldValue>* field_values, int64_t* next_cursor) {
*next_cursor = 0;
field_values->clear();
if (cursor < 0) {
*next_cursor = 0;
Expand All @@ -715,6 +716,7 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
if (parsed_hashes_meta_value.IsStale()
|| parsed_hashes_meta_value.count() == 0) {
*next_cursor = 0;
return Status::NotFound();
} else {
std::string start_field;
int32_t version = parsed_hashes_meta_value.version();
Expand Down Expand Up @@ -749,8 +751,9 @@ Status RedisHashes::HScan(const Slice& key, int64_t cursor, const std::string& p
}
} else {
*next_cursor = 0;
return s;
}
return s;
return Status::OK();
}

Status RedisHashes::GetHScanStartField(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_field) {
Expand Down
5 changes: 4 additions & 1 deletion src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ Status RedisSets::SUnionstore(const Slice& destination,

Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::string& pattern,
int64_t count, std::vector<std::string>* members, int64_t* next_cursor) {
*next_cursor = 0;
members->clear();
if (cursor < 0) {
*next_cursor = 0;
Expand All @@ -1072,6 +1073,7 @@ Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::string& pat
if (parsed_sets_meta_value.IsStale()
|| parsed_sets_meta_value.count() == 0) {
*next_cursor = 0;
return Status::NotFound();
} else {
std::string start_member;
int32_t version = parsed_sets_meta_value.version();
Expand Down Expand Up @@ -1107,8 +1109,9 @@ Status RedisSets::SScan(const Slice& key, int64_t cursor, const std::string& pat
}
} else {
*next_cursor = 0;
return s;
}
return s;
return Status::OK();
}

Status RedisSets::GetSScanStartMember(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_member) {
Expand Down
5 changes: 4 additions & 1 deletion src/redis_zsets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ Status RedisZSets::Expireat(const Slice& key, int32_t timestamp) {

Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pattern,
int64_t count, std::vector<ScoreMember>* score_members, int64_t* next_cursor) {
*next_cursor = 0;
score_members->clear();
if (cursor < 0) {
*next_cursor = 0;
Expand All @@ -1356,6 +1357,7 @@ Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pa
s = GetZScanStartMember(key, pattern, cursor, &start_member);
if (s.IsNotFound()) {
cursor = 0;
return Status::OK();
}

ZSetsMemberKey zsets_member_prefix(key, version, Slice());
Expand Down Expand Up @@ -1388,8 +1390,9 @@ Status RedisZSets::ZScan(const Slice& key, int64_t cursor, const std::string& pa
}
} else {
*next_cursor = 0;
return s;
}
return s;
return Status::OK();
}

Status RedisZSets::GetZScanStartMember(const Slice& key, const Slice& pattern, int64_t cursor, std::string* start_member) {
Expand Down
Loading

0 comments on commit f19970d

Please sign in to comment.