Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support BITOP command #471

Merged
merged 3 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,36 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
case kRedisBitmap: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in putcf cmd setbit : args error ,should contain setbit offset";
LOG(ERROR) << "Fail to parse write_batch in putcf type bitmap : args error ,should at least contain cmd";
return rocksdb::Status::OK();
}
bool bit_value = Redis::Bitmap::GetBitFromValueAndOffset(value.ToString(), std::stoi((*args)[0]));
command_args = {"SETBIT", user_key, (*args)[0], bit_value ? "1" : "0"};
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
switch (cmd) {
case kRedisCmdSetBit: {
if (args->size() < 2) {
LOG(ERROR) << "Fail to parse write_batch in putcf cmd setbit : args error ,should contain setbit offset";
return rocksdb::Status::OK();
}
bool bit_value = Redis::Bitmap::GetBitFromValueAndOffset(value.ToString(), std::stoi((*args)[1]));
command_args = {"SETBIT", user_key, (*args)[1], bit_value ? "1" : "0"};
break;
}
case kRedisCmdBitOp:
if (first_seen_) {
if (args->size() < 4) {
LOG(ERROR)
<< "Fail to parse write_batch in putcf cmd bitop : args error, should at least contain srckey";
return rocksdb::Status::OK();
}
command_args = {"BITOP", (*args)[1], user_key};
command_args.insert(command_args.end(), args->begin() + 2, args->end());
first_seen_ = false;
}
break;
default:
LOG(ERROR) << "Fail to parse write_batch in putcf type bitmap : cmd error";
return rocksdb::Status::OK();
}
break;
}
case kRedisSortedint: {
Expand Down
189 changes: 188 additions & 1 deletion src/redis_bitmap.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "redis_bitmap.h"
#include <vector>
#include <memory>
#include <utility>
#include <algorithm>

#include "redis_bitmap_string.h"
Expand Down Expand Up @@ -183,7 +185,7 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool new_
value[byte_index] &= ~(1 << bit_offset);
}
rocksdb::WriteBatch batch;
WriteBatchLogData log_data(kRedisBitmap, {std::to_string(offset)});
WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit), std::to_string(offset)});
batch.PutLogData(log_data.Encode());
batch.Put(sub_key, value);
if (metadata.size != bitmap_size) {
Expand Down Expand Up @@ -313,6 +315,191 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool bit, int64_t start,
return rocksdb::Status::OK();
}

rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const std::string &op_name,
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
std::string ns_key, raw_value, ns_op_key;
AppendNamespacePrefix(user_key, &ns_key);
LockGuard guard(storage_->GetLockManager(), ns_key);

rocksdb::Status s;
std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
uint64_t max_size = 0, num_keys = op_keys.size();

for (const auto &op_key : op_keys) {
BitmapMetadata metadata(false);
AppendNamespacePrefix(op_key, &ns_op_key);
s = GetMetadata(ns_op_key, &metadata, &raw_value);
if (!s.ok()) {
if (s.IsNotFound()) {
num_keys--;
continue;
}
return s;
}
if (metadata.Type() == kRedisString) {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}
if (metadata.size > max_size) max_size = metadata.size;
meta_pairs.emplace_back(std::make_pair(ns_op_key, metadata));
}

rocksdb::WriteBatch batch;
ColinChamber marked this conversation as resolved.
Show resolved Hide resolved
if (max_size == 0) {
batch.Delete(metadata_cf_handle_, ns_key);
return storage_->Write(rocksdb::WriteOptions(), &batch);
}
std::vector<std::string> log_args = {std::to_string(kRedisCmdBitOp), op_name};
for (const auto &op_key : op_keys) {
log_args.emplace_back(op_key.ToString());
}
WriteBatchLogData log_data(kRedisBitmap, std::move(log_args));
batch.PutLogData(log_data.Encode());

BitmapMetadata res_metadata;
if (num_keys == op_keys.size() || op_flag != kBitOpAnd) {
uint64_t i, frag_numkeys = num_keys, stop_index = (max_size -1)/kBitmapSegmentBytes;
std::unique_ptr<unsigned char[]> frag_res(new unsigned char[kBitmapSegmentBytes]);
uint16_t frag_maxlen = 0, frag_minlen = 0;
std::string sub_key, fragment;
unsigned char output, byte;
std::vector<std::string> fragments;

LatestSnapShot ss(db_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
for (uint64_t frag_index = 0; frag_index <= stop_index; frag_index++) {
for (const auto &meta_pair : meta_pairs) {
InternalKey(meta_pair.first, std::to_string(frag_index * kBitmapSegmentBytes),
meta_pair.second.version, storage_->IsSlotIdEncoded()).Encode(&sub_key);
auto s = db_->Get(read_options, sub_key, &fragment);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
frag_numkeys--;
if (op_flag == kBitOpAnd) {
frag_maxlen = 0;
break;
}
} else {
if (frag_maxlen < fragment.size()) frag_maxlen = fragment.size();
if (fragment.size() < frag_minlen || frag_minlen == 0) frag_minlen = fragment.size();
fragments.emplace_back(fragment);
}
}

if (frag_maxlen != 0 || op_flag == kBitOpNot) {
uint16_t j = 0;
if (op_flag == kBitOpNot) {
memset(frag_res.get(), UCHAR_MAX, kBitmapSegmentBytes);
} else {
memset(frag_res.get(), 0, frag_maxlen);
}

#ifndef USE_ALIGNED_ACCESS
if (frag_minlen >= sizeof(uint64_t)*4 && frag_numkeys <= 16) {
uint64_t *lres = reinterpret_cast<uint64_t*>(frag_res.get());
const uint64_t *lp[16];
for (i = 0; i < frag_numkeys; i++) {
lp[i] = reinterpret_cast<const uint64_t*>(fragments[i].data());
}
memcpy(frag_res.get(), fragments[0].data(), frag_minlen);

if (op_flag == kBitOpAnd) {
while (frag_minlen >= sizeof(uint64_t)*4) {
for (i = 1; i < frag_numkeys; i++) {
lres[0] &= lp[i][0];
lres[1] &= lp[i][1];
lres[2] &= lp[i][2];
lres[3] &= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(uint64_t)*4;
frag_minlen -= sizeof(uint64_t)*4;
}
} else if (op_flag == kBitOpOr) {
while (frag_minlen >= sizeof(uint64_t)*4) {
for (i = 1; i < frag_numkeys; i++) {
lres[0] |= lp[i][0];
lres[1] |= lp[i][1];
lres[2] |= lp[i][2];
lres[3] |= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(uint64_t)*4;
frag_minlen -= sizeof(uint64_t)*4;
}
} else if (op_flag == kBitOpXor) {
while (frag_minlen >= sizeof(uint64_t)*4) {
for (i = 1; i < frag_numkeys; i++) {
lres[0] ^= lp[i][0];
lres[1] ^= lp[i][1];
lres[2] ^= lp[i][2];
lres[3] ^= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(uint64_t)*4;
frag_minlen -= sizeof(uint64_t)*4;
}
} else if (op_flag == kBitOpNot) {
while (frag_minlen >= sizeof(uint64_t)*4) {
lres[0] = ~lres[0];
lres[1] = ~lres[1];
lres[2] = ~lres[2];
lres[3] = ~lres[3];
lres+=4;
j += sizeof(uint64_t)*4;
frag_minlen -= sizeof(uint64_t)*4;
}
}
}
#endif

for (; j < frag_maxlen; j++) {
output = (fragments[0].size() <= j) ? 0 : fragments[0][j];
if (op_flag == kBitOpNot) output = ~output;
for (i = 1; i < frag_numkeys; i++) {
byte = (fragments[i].size() <= j) ? 0 : fragments[i][j];
switch (op_flag) {
case kBitOpAnd: output &= byte; break;
case kBitOpOr: output |= byte; break;
case kBitOpXor: output ^= byte; break;
default: break;
}
}
frag_res[j] = output;
}

if (op_flag == kBitOpNot) {
if (frag_index == stop_index) {
frag_maxlen = max_size % kBitmapSegmentBytes;
} else {
frag_maxlen = kBitmapSegmentBytes;
}
}
InternalKey(ns_key, std::to_string(frag_index * kBitmapSegmentBytes),
res_metadata.version, storage_->IsSlotIdEncoded()).Encode(&sub_key);
batch.Put(sub_key, Slice(reinterpret_cast<char*>(frag_res.get()), frag_maxlen));
}

frag_maxlen = 0;
frag_minlen = 0;
frag_numkeys = num_keys;
fragments.clear();
}
}

std::string bytes;
res_metadata.size = max_size;
res_metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
*len = max_size;
return storage_->Write(rocksdb::WriteOptions(), &batch);
}

bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t offset) {
bool bit = false;
uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
Expand Down
13 changes: 13 additions & 0 deletions src/redis_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
#include <string>
#include <vector>

#if defined(__sparc__) || defined(__arm__)
#define USE_ALIGNED_ACCESS
#endif

enum BitOpFlags {
kBitOpAnd,
kBitOpOr,
kBitOpXor,
kBitOpNot,
};

namespace Redis {

class Bitmap : public Database {
Expand All @@ -16,6 +27,8 @@ class Bitmap : public Database {
rocksdb::Status SetBit(const Slice &user_key, uint32_t offset, bool new_bit, bool *old_bit);
rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop, uint32_t *cnt);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos);
rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name,
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len);
static bool GetBitFromValueAndOffset(const std::string &value, const uint32_t offset);
static bool IsEmptySegment(const Slice &segment);
private:
Expand Down
39 changes: 39 additions & 0 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,44 @@ class CommandBitPos: public Commander {
bool bit_ = false, stop_given_ = false;
};

class CommandBitOp : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
std::string opname = Util::ToLower(args[1]);
if (opname == "and")
op_flag_ = kBitOpAnd;
else if (opname == "or")
op_flag_ = kBitOpOr;
else if (opname == "xor")
op_flag_ = kBitOpXor;
else if (opname == "not")
op_flag_ = kBitOpNot;
else
return Status(Status::RedisInvalidCmd, "Unknown bit operation");
if (op_flag_ == kBitOpNot && args.size() != 4) {
return Status(Status::RedisInvalidCmd,
"BITOP NOT must be called with a single source key.");
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int64_t destkey_len = 0;
Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace());
std::vector<Slice> op_keys;
for (uint64_t i = 3; i < args_.size(); i++) {
op_keys.emplace_back(Slice(args_[i]));
}
rocksdb::Status s = bitmap_db.BitOp(op_flag_, args_[1], args_[2], op_keys, &destkey_len);
if (!s.ok()) return Status(Status::RedisExecErr, s.ToString());
*output = Redis::Integer(destkey_len);
return Status::OK();
}

private:
BitOpFlags op_flag_;
};

class CommandType : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -4614,6 +4652,7 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("setbit", 4, "write", 1, 1, 1, CommandSetBit),
ADD_CMD("bitcount", -2, "read-only", 1, 1, 1, CommandBitCount),
ADD_CMD("bitpos", -3, "read-only", 1, 1, 1, CommandBitPos),
ADD_CMD("bitop", -4, "write", 2, -1, 1, CommandBitOp),

ADD_CMD("hget", 3, "read-only", 1, 1, 1, CommandHGet),
ADD_CMD("hincrby", 4, "write", 1, 1, 1, CommandHIncrBy),
Expand Down
2 changes: 2 additions & 0 deletions src/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ enum RedisCommand {
kRedisCmdLPush,
kRedisCmdRPush,
kRedisCmdExpire,
kRedisCmdSetBit,
kRedisCmdBitOp,
};

const std::vector<std::string> RedisTypeNames = {
Expand Down
4 changes: 2 additions & 2 deletions tests/tcl/tests/unit/command.tcl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
start_server {tags {"command"}} {
test {kvrocks has 167 commands currently} {
test {kvrocks has 168 commands currently} {
r command count
} {168}
} {169}

test {acquire GET command info by COMMAND INFO} {
set e [lindex [r command info get] 0]
Expand Down
Loading