diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc index f0a25d391e9..a47fc011733 100644 --- a/src/commands/cmd_bit.cc +++ b/src/commands/cmd_bit.cc @@ -225,10 +225,136 @@ class CommandBitOp : public Commander { BitOpFlags op_flag_; }; +class CommandBitfield : public Commander { + public: + Status Parse(const std::vector &args) override { + BitfieldOperation cmd; + + read_only_ = true; + // BITFIELD [commands...] + for (auto group = args.begin() + 2; group != args.end();) { + auto size = std::distance(group, args.end()); + + std::string opcode = util::ToLower(group[0]); + if (opcode == "get") { + cmd.type = BitfieldOperation::Type::kGet; + } else if (opcode == "set") { + cmd.type = BitfieldOperation::Type::kSet; + read_only_ = false; + } else if (opcode == "incrby") { + cmd.type = BitfieldOperation::Type::kIncrBy; + read_only_ = false; + } else if (opcode == "overflow") { + constexpr auto kOverflowCmdSize = 2; + if (size < kOverflowCmdSize) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + std::string overflow = util::ToLower(group[1]); + if (overflow == "wrap") { + cmd.overflow = BitfieldOverflowBehavior::kWrap; + } else if (overflow == "sat") { + cmd.overflow = BitfieldOverflowBehavior::kSat; + } else if (overflow == "fail") { + cmd.overflow = BitfieldOverflowBehavior::kFail; + } else { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + group += kOverflowCmdSize; + continue; + } else { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + + if (size < 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + // parse encoding + auto encoding = parseBitfieldEncoding(group[1]); + if (!encoding.IsOK()) { + return encoding.ToStatus(); + } + cmd.encoding = encoding.GetValue(); + + // parse offset + if (!GetBitOffsetFromArgument(group[2], &cmd.offset).IsOK()) { + return {Status::RedisParseErr, "bit offset is not an integer or out of range"}; + } + + if (cmd.type != BitfieldOperation::Type::kGet) { + if (size < 4) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + auto value = ParseInt(group[3], 10); + if (!value.IsOK()) { + return value.ToStatus(); + } + cmd.value = value.GetValue(); + } + + cmds_.push_back(cmd); + } + + return Commander::Parse(args); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace()); + std::vector rets; + rocksdb::Status s; + if (read_only_) { + s = bitmap_db.BitfieldReadOnly(args_[1], cmds_, &rets); + } else { + s = bitmap_db.Bitfield(args_[1], cmds_, &rets); + } + *output = redis::Array(rets); + return Status::OK(); + } + + private: + static StatusOr parseBitfieldEncoding(const std::string &token) { + if (token.empty()) { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + + auto sign = std::tolower(token[0]); + if (sign != 'u' && sign != 'i') { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + + BitfieldEncoding enc; + enc.is_signed = false; + // u64 is unsupported for redis now. + // see also https://redis.io/commands/bitfield/ + int16_t max_bits = 63; + + if (sign == 'i') { + enc.is_signed = true; + max_bits = 64; + } + + auto res = ParseInt(token.substr(1), 10); + if (!res.IsOK()) { + return res.ToStatus(); + } + + enc.bits = res.GetValue(); + if (enc.bits < 1 || max_bits < enc.bits) { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + return enc; + } + + std::vector cmds_; + bool read_only_; +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("getbit", 3, "read-only", 1, 1, 1), MakeCmdAttr("setbit", 4, "write", 1, 1, 1), MakeCmdAttr("bitcount", -2, "read-only", 1, 1, 1), MakeCmdAttr("bitpos", -3, "read-only", 1, 1, 1), - MakeCmdAttr("bitop", -4, "write", 2, -1, 1), ) + MakeCmdAttr("bitop", -4, "write", 2, -1, 1), + MakeCmdAttr("bitfield", -2, "write", 1, 1, 1), ) } // namespace redis diff --git a/src/common/bitfield_util.cc b/src/common/bitfield_util.cc new file mode 100644 index 00000000000..6ac21b34b61 --- /dev/null +++ b/src/common/bitfield_util.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "bitfield_util.h" + +namespace detail { +namespace { +// safe cast from unsigned to signed. +// see also "Integral conversions" on https://en.cppreference.com/w/cpp/language/implicit_conversion +// If the destination type is signed, the result when overflow is implementation-defined until C++20 +union SafeInt64 { + int64_t i; + uint64_t u; + + explicit SafeInt64(int64_t i) noexcept : i(i) {} + explicit SafeInt64(uint64_t u) noexcept : u(u) {} +}; +} // namespace + +bool SignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, uint64_t *dst) { + auto max = std::numeric_limits::max(); + if (bits != 64) { + max = (static_cast(1) << (bits - 1)) - 1; + } + int64_t min = -max - 1; + + SafeInt64 safe_value(value); + + SafeInt64 max_incr(static_cast(max) - safe_value.u); + SafeInt64 min_incr(min - safe_value.i); + + auto wrap = [](uint64_t value, int64_t incr, uint8_t bits) { + uint64_t res = value + static_cast(incr); + if (bits < 64) { + auto mask = std::numeric_limits::max() << bits; + if ((res & (1 << (bits - 1))) != 0) { + res |= mask; + } else { + res &= ~mask; + } + } + return res; + }; + + if (safe_value.i > max || (bits != 64 && incr > max_incr.i) || + (safe_value.i >= 0 && incr >= 0 && incr > max_incr.i)) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = wrap(safe_value.u, incr, bits); + } else { + *dst = max; + } + return true; + } else if (safe_value.i < min || (bits != 64 && incr < min_incr.i) || + (safe_value.i < 0 && incr < 0 && incr < min_incr.i)) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = wrap(safe_value.u, incr, bits); + } else { + *dst = min; + } + return true; + } + + *dst = safe_value.i + incr; + return false; +} + +// return true if overflow. +bool UnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, + uint64_t *dst) { + auto max = std::numeric_limits::max(); + if (bits != 64) { + max = (static_cast(1) << bits) - 1; + } + + SafeInt64 max_incr(max - value); + SafeInt64 min_incr(-value); + + auto wrap = [](uint64_t value, int64_t incr, uint8_t bits) { + uint64_t mask = std::numeric_limits::max() << bits; + uint64_t res = value + incr; + res &= ~mask; + return res; + }; + + if (value > max || (incr > 0 && incr > max_incr.i)) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = wrap(value, incr, bits); + } else if (overflow == BitfieldOverflowBehavior::kSat) { + *dst = max; + } + return true; + } else if (incr < 0 && incr < min_incr.i) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = wrap(value, incr, bits); + } else if (overflow == BitfieldOverflowBehavior::kSat) { + *dst = 0; + } + return true; + } + + *dst = value + incr; + return false; +} +} // namespace detail diff --git a/src/common/bitfield_util.h b/src/common/bitfield_util.h new file mode 100644 index 00000000000..e6ad6e08f7c --- /dev/null +++ b/src/common/bitfield_util.h @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include +#include +#include + +enum class BitfieldOverflowBehavior : uint8_t { kWrap, kSat, kFail }; + +struct BitfieldEncoding { + bool is_signed; + // 1 ~ 63 if unsigned, 1 ~ 64 if signed. + uint8_t bits; +}; + +struct BitfieldOperation { + // see https://redis.io/commands/bitfield/ to get more details. + enum class Type : uint8_t { kGet, kSet, kIncrBy }; + + Type type; + BitfieldOverflowBehavior overflow{BitfieldOverflowBehavior::kWrap}; + BitfieldEncoding encoding; + uint32_t offset; + // INCRBY amount or SET value + int64_t value; +}; + +namespace detail { +// return true if overflow +bool SignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, uint64_t *dst); + +// return true if overflow. +bool UnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, uint64_t *dst); +} // namespace detail + +// return true if overflow. +inline auto BitfieldPlus(uint64_t value, int64_t incr, BitfieldEncoding enc, BitfieldOverflowBehavior overflow, + uint64_t *dst) { + if (enc.is_signed) { + return detail::SignedBitfieldPlus(value, incr, enc.bits, overflow, dst); + } + return detail::UnsignedBitfieldPlus(value, incr, enc.bits, overflow, dst); +} + +// return true if successful +inline bool BitfieldOp(BitfieldOperation op, uint64_t old_value, uint64_t *new_value) { + if (op.type == BitfieldOperation::Type::kGet) { + *new_value = old_value; + return true; + } + + bool overflow = false; + if (op.type == BitfieldOperation::Type::kSet) { + overflow = BitfieldPlus(op.value, 0, op.encoding, op.overflow, new_value); + } else { + overflow = BitfieldPlus(old_value, op.value, op.encoding, op.overflow, new_value); + } + + return op.overflow != BitfieldOverflowBehavior::kFail || !overflow; +} + +template +inline void SetBitfield(BitmapView view, uint32_t offset, uint32_t bits, uint64_t value) { + while (bits--) { + bool v = (value & (uint64_t(1) << bits)) != 0; + uint32_t byte = offset >> 3; + uint32_t bit = 7 - (offset & 7); + uint32_t byteval = view[byte]; + byteval &= ~(1 << bit); + byteval |= int(v) << bit; + view[byte] = char(byteval); + offset++; + } +} + +template +inline uint64_t GetUnsignedBitfield(BitmapView &&view, uint64_t offset, uint64_t bits) { + uint64_t value = 0; + while (bits--) { + uint32_t byte = offset >> 3; + uint32_t bit = 7 - (offset & 0x7); + uint32_t byteval = view[byte]; + uint32_t bitval = (byteval >> bit) & 1; + value = (value << 1) | bitval; + offset++; + } + return value; +} + +template +inline int64_t GetSignedBitfield(BitmapView &&view, uint64_t offset, uint64_t bits) { + union { + uint64_t u; + int64_t i; + } conv; + conv.u = GetUnsignedBitfield(std::forward(view), offset, bits); + int64_t value = conv.i; + + if ((value & (static_cast(1) << (bits - 1))) != 0) { + value |= static_cast(-1) << bits; + } + return value; +} diff --git a/src/common/lock_manager.h b/src/common/lock_manager.h index ee4e17422be..ce923268240 100644 --- a/src/common/lock_manager.h +++ b/src/common/lock_manager.h @@ -81,6 +81,8 @@ class LockManager { class LockGuard { public: + LockGuard() = default; + template explicit LockGuard(LockManager *lock_mgr, const KeyType &key) : lock_(lock_mgr->Get(key)) { lock_->lock(); @@ -94,8 +96,10 @@ class LockGuard { LockGuard(LockGuard &&guard) : lock_(guard.lock_) { guard.lock_ = nullptr; } + LockGuard &operator=(LockGuard &&) = default; + private: - std::mutex *lock_; + std::mutex *lock_{nullptr}; }; class MultiLockGuard { diff --git a/src/common/parse_util.h b/src/common/parse_util.h index 63c03c00a94..2226b580416 100644 --- a/src/common/parse_util.h +++ b/src/common/parse_util.h @@ -33,6 +33,11 @@ namespace details { template struct ParseIntFunc; +template <> +struct ParseIntFunc { + constexpr static const auto value = std::strtol; +}; + template <> struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtol; @@ -53,6 +58,11 @@ struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtoll; }; +template <> +struct ParseIntFunc { + constexpr static const auto value = std::strtol; +}; + template <> struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtoul; diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index c66aca80eef..db7174d0839 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -232,6 +232,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic first_seen_ = false; } break; + case kRedisCmdBitfield: + command_args = {"BITFIELD", user_key}; + command_args.insert(command_args.end(), args->begin() + 1, args->end()); + break; default: LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Bitmap: unhandled command with code " << *parsed_cmd; diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 48c40f67c09..cbdb7a41e6d 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -61,6 +61,7 @@ enum RedisCommand { kRedisCmdExpire, kRedisCmdSetBit, kRedisCmdBitOp, + kRedisCmdBitfield, kRedisCmdLMove, }; diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index a7b7384753b..fb297522a9c 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -28,6 +28,7 @@ #include "db_util.h" #include "parse_util.h" #include "redis_bitmap_string.h" +#include "server/redis_reply.h" namespace redis { @@ -38,6 +39,22 @@ const char kErrBitmapStringOutOfRange[] = "The size of the bitmap string exceeds the " "configuration item max-bitmap-to-string-mb"; +// min_bytes can not more than kBitmapSegmentBytes +void ExpandBitmapSegment(std::string *segment, size_t min_bytes) { + auto old_size = segment->size(); + if (min_bytes > old_size) { + size_t expand_size = 0; + if (min_bytes > old_size * 2) { + expand_size = min_bytes - old_size; + } else if (old_size * 2 > kBitmapSegmentBytes) { + expand_size = kBitmapSegmentBytes - old_size; + } else { + expand_size = old_size; + } + segment->append(expand_size, 0); + } +} + rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata *metadata, std::string *raw_value) { std::string old_metadata; metadata->Encode(&old_metadata); @@ -187,17 +204,7 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool new_ uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes; uint64_t used_size = index + byte_index + 1; uint64_t bitmap_size = std::max(used_size, metadata.size); - if (byte_index >= value.size()) { // expand the bitmap - size_t expand_size = 0; - if (byte_index >= value.size() * 2) { - expand_size = byte_index - value.size() + 1; - } else if (value.size() * 2 > kBitmapSegmentBytes) { - expand_size = kBitmapSegmentBytes - value.size(); - } else { - expand_size = value.size(); - } - value.append(expand_size, 0); - } + ExpandBitmapSegment(&value, byte_index + 1); uint32_t bit_offset = offset % 8; *old_bit = (value[byte_index] & (1 << bit_offset)) != 0; if (new_bit) { @@ -538,6 +545,297 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const std::string &op_name, co return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +namespace { +union IntResult { + int64_t i; + uint64_t u; +}; +} // namespace + +namespace { +using SegmentCache = std::pair; + +// used to simplify accessing bitfield that occupies multiple continuous segments. +// you can access multiple segments like accessing in one big segments. +class BitmapSegmentsView { + public: + BitmapSegmentsView() = default; + explicit BitmapSegmentsView(std::vector segments, bool expand_in_advance = true) + : segments_(std::move(segments)) { + if (expand_in_advance && segments_.size() > 1) { + // expand segments during iteration (not in advance) maybe increase allocation times. + // we assume that all end of each segment except last one will be accessed. + // note: segments_.size() is unsigned. + for (size_t i = 0; i < segments_.size() - 1; ++i) { + ExpandBitmapSegment(&segments_[i]->second, kBitmapSegmentBytes); + } + } + } + + char &operator[](uint32_t index) noexcept { + SegmentCache &seg = GetSegment(index / kBitmapSegmentBytes); + index %= kBitmapSegmentBytes; + ExpandBitmapSegment(&seg.second, index + 1); + return seg.second[index]; + } + + SegmentCache &GetSegment(uint32_t index) noexcept { return *segments_[index]; } + + size_t LastSegmentSize() const noexcept { return segments_.back()->second.size(); } + + size_t SegmentsCount() const noexcept { return segments_.size(); } + + auto SegmentBegin() noexcept { return segments_.begin(); } + + auto SegmentEnd() noexcept { return segments_.end(); } + + private: + std::vector segments_; +}; +} // namespace + +template +rocksdb::Status Bitmap::bitfield(const Slice &user_key, const std::vector &ops, + std::vector *rets) { + std::string ns_key = AppendNamespacePrefix(user_key); + + LockGuard guard; + if constexpr (!ReadOnly) { + guard = LockGuard(storage_->GetLockManager(), ns_key); + } + + BitmapMetadata metadata; + std::string raw_value; + auto s = GetMetadata(ns_key, &metadata, &raw_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + + if (metadata.Type() == RedisType::kRedisString) { + if constexpr (ReadOnly) { + s = BitmapString::BitfieldReadOnly(ns_key, raw_value, ops, rets); + } else { + s = BitmapString(storage_, namespace_).Bitfield(ns_key, &raw_value, ops, rets); + } + return s; + } + + if (metadata.Type() != RedisType::kRedisBitmap) { + return rocksdb::Status::InvalidArgument("The value is not a bitmap or string."); + } + + std::unordered_map seg_cache; + auto get_segment = [&seg_cache, &ns_key, v = metadata.version, this](uint32_t index, SegmentCache **res) { + auto [seg_itor, no_cache] = seg_cache.try_emplace(index); + auto &[sub_key, value] = seg_itor->second; + if (no_cache) { + auto sub_sub_key = std::to_string(index * kBitmapSegmentBytes); + sub_key = InternalKey(ns_key, sub_sub_key, v, storage_->IsSlotIdEncoded()).Encode(); + rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + } + *res = &seg_itor->second; + return rocksdb::Status::OK(); + }; + + auto init_view = [&get_segment](uint32_t offset, uint32_t bits, BitmapSegmentsView *view) { + uint32_t first_segment = offset / kBitmapSegmentBits; + uint32_t last_segment = (offset + bits - 1) / kBitmapSegmentBits; + + std::vector segments(last_segment - first_segment + 1); + for (auto &seg : segments) { + auto s = get_segment(first_segment++, &seg); + if (!s.ok()) { + return s; + } + } + *view = BitmapSegmentsView(std::move(segments)); + return rocksdb::Status::OK(); + }; + + // get bitfield value from entire bitmap + auto get = [&init_view](uint32_t offset, BitfieldEncoding enc, uint64_t *res) { + BitmapSegmentsView view; + auto s = init_view(offset, enc.bits, &view); + if (!s.ok()) { + return s; + } + + uint32_t first_bit_segment_offset = offset % kBitmapSegmentBits; + if (enc.is_signed) { + *res = GetSignedBitfield(view, first_bit_segment_offset, enc.bits); + } else { + *res = GetUnsignedBitfield(view, first_bit_segment_offset, enc.bits); + } + + return rocksdb::Status::OK(); + }; + + if constexpr (ReadOnly) { + std::string ret; + for (BitfieldOperation op : ops) { + if (op.type != BitfieldOperation::Type::kGet) { + return rocksdb::Status::InvalidArgument("Write bitfield in read-only mode."); + } + + uint8_t max_bits = 64; + if (!op.encoding.is_signed) { + max_bits = 63; + } + if (op.encoding.bits < 1 || max_bits < op.encoding.bits) { + return rocksdb::Status::InvalidArgument("Uncorrected bitfield encoding length."); + } + + IntResult res; + s = get(op.offset, op.encoding, &res.u); + if (!s.ok()) { + return s; + } + + if (op.encoding.is_signed) { + ret = redis::Integer(res.i); + } else { + ret = redis::Integer(res.u); + } + + rets->push_back(std::move(ret)); + } + return rocksdb::Status::OK(); + } + + auto batch = storage_->GetWriteBatchBase(); + { + std::vector cmd_args{std::to_string(kRedisCmdBitfield)}; + auto current_overflow = BitfieldOverflowBehavior::kWrap; + for (BitfieldOperation op : ops) { + if (op.type == BitfieldOperation::Type::kGet) { + continue; + } + if (current_overflow != op.overflow) { + current_overflow = op.overflow; + std::string overflow_str; + switch (op.overflow) { + case BitfieldOverflowBehavior::kWrap: + overflow_str = "WRAP"; + break; + case BitfieldOverflowBehavior::kSat: + overflow_str = "SAT"; + break; + case BitfieldOverflowBehavior::kFail: + overflow_str = "FAIL"; + break; + } + cmd_args.emplace_back("OVERFLOW"); + cmd_args.emplace_back(std::move(overflow_str)); + } + + if (op.type == BitfieldOperation::Type::kSet) { + cmd_args.emplace_back("SET"); + } else { + cmd_args.emplace_back("INCRBY"); + } + cmd_args.push_back((op.encoding.is_signed ? "i" : "u") + std::to_string(static_cast(op.encoding.bits))); + cmd_args.push_back(std::to_string(op.offset)); + if (op.type == BitfieldOperation::Type::kSet) { + if (op.encoding.is_signed) { + cmd_args.push_back(std::to_string(static_cast(op.value))); + } else { + cmd_args.push_back(std::to_string(static_cast(op.value))); + } + } else { + cmd_args.push_back(std::to_string(op.value)); + } + } + + if (cmd_args.size() > 1) { + WriteBatchLogData log_data(kRedisBitmap, std::move(cmd_args)); + batch->PutLogData(log_data.Encode()); + } + } + + // set bitfield value among segments + auto set = [&init_view, &batch, &metadata, &ns_key, this](uint32_t offset, BitfieldEncoding enc, + BitfieldOverflowBehavior overflow, uint64_t value) { + BitmapSegmentsView view; + auto s = init_view(offset, enc.bits, &view); + if (!s.ok()) { + return s; + } + + uint32_t first_bit_segment_offset = offset % kBitmapSegmentBits; + SetBitfield(view, first_bit_segment_offset, enc.bits, value); + + for (auto segment = view.SegmentBegin(); segment != view.SegmentEnd(); ++segment) { + batch->Put((*segment)->first, (*segment)->second); + } + + uint64_t segments_cnt = view.SegmentsCount(); + uint64_t used_size = (offset / 8) + (segments_cnt - 1) * kBitmapSegmentBytes + view.LastSegmentSize(); + if (metadata.size < used_size) { + metadata.size = used_size; + std::string bytes; + metadata.Encode(&bytes); + batch->Put(metadata_cf_handle_, ns_key, bytes); + } + + return rocksdb::Status::OK(); + }; + + std::string ret; + for (BitfieldOperation op : ops) { + uint8_t max_bits = 64; + if (!op.encoding.is_signed) { + max_bits = 63; + } + if (op.encoding.bits < 1 || max_bits < op.encoding.bits) { + return rocksdb::Status::InvalidArgument("Uncorrected bitfield encoding length."); + } + + IntResult old_value; + s = get(op.offset, op.encoding, &old_value.u); + if (!s.ok()) { + return s; + } + + IntResult new_value; + if (BitfieldOp(op, old_value.u, &new_value.u)) { + if (op.type != BitfieldOperation::Type::kGet) { + s = set(op.offset, op.encoding, op.overflow, new_value.u); + if (!s.ok()) { + return s; + } + } + + if (op.type == BitfieldOperation::Type::kSet) { + if (op.encoding.is_signed) { + ret = redis::Integer(old_value.i); + } else { + ret = redis::Integer(old_value.u); + } + } else { + if (op.encoding.is_signed) { + ret = redis::Integer(new_value.i); + } else { + ret = redis::Integer(new_value.u); + } + } + } else { + ret = redis::NilString(); + } + + rets->push_back(std::move(ret)); + } + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +template rocksdb::Status Bitmap::bitfield(const Slice &, const std::vector &, + std::vector *); +template rocksdb::Status Bitmap::bitfield(const Slice &, const std::vector &, + std::vector *); + bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t offset) { bool bit = false; uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes; diff --git a/src/types/redis_bitmap.h b/src/types/redis_bitmap.h index 7b12834fd5c..d12a5f329d5 100644 --- a/src/types/redis_bitmap.h +++ b/src/types/redis_bitmap.h @@ -21,8 +21,10 @@ #pragma once #include +#include #include +#include "common/bitfield_util.h" #include "storage/redis_db.h" #include "storage/redis_metadata.h" @@ -49,10 +51,23 @@ class Bitmap : public Database { rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos); rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const Slice &user_key, const std::vector &op_keys, int64_t *len); + rocksdb::Status Bitfield(const Slice &user_key, const std::vector &ops, + std::vector *rets) { + return bitfield(user_key, ops, rets); + } + // read-only version for Bitfield(), if there is a write operation in ops, the function will return with failed + // status. + rocksdb::Status BitfieldReadOnly(const Slice &user_key, const std::vector &ops, + std::vector *rets) { + return bitfield(user_key, ops, rets); + } static bool GetBitFromValueAndOffset(const std::string &value, uint32_t offset); static bool IsEmptySegment(const Slice &segment); private: + template + rocksdb::Status bitfield(const Slice &user_key, const std::vector &ops, + std::vector *rets); rocksdb::Status GetMetadata(const Slice &ns_key, BitmapMetadata *metadata, std::string *raw_value); }; diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc index 38a0c8f38ec..77a1471350b 100644 --- a/src/types/redis_bitmap_string.cc +++ b/src/types/redis_bitmap_string.cc @@ -25,6 +25,7 @@ #include #include "redis_string.h" +#include "server/redis_reply.h" #include "storage/redis_metadata.h" #include "type_util.h" @@ -204,4 +205,131 @@ int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t count, bool bit) { return res; } +namespace { +class ReadOnlyStringBitmapView { + public: + explicit ReadOnlyStringBitmapView(std::string_view str) noexcept : str_(str) {} + + char operator[](uint32_t index) const noexcept { + if (index >= str_.size()) { + return 0; + } + return str_[index]; + } + + private: + std::string_view str_; +}; + +class ReadWriteStringBitmapView { + public: + explicit ReadWriteStringBitmapView(std::string *str) : str_(str) {} + + char &operator[](uint32_t index) { + if (index >= str_->size()) { + str_->append((index + 1) - str_->size(), 0); + } + return (*str_)[index]; + } + + private: + std::string *str_; +}; +} // namespace + +rocksdb::Status BitmapString::Bitfield(const Slice &ns_key, std::string *raw_value, + const std::vector &ops, std::vector *rets) { + union IntResult { + int64_t i; + uint64_t u; + }; + + auto header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]); + std::string string_value = raw_value->substr(header_offset); + ReadWriteStringBitmapView view(&string_value); + std::string ret; + for (BitfieldOperation op : ops) { + uint8_t max_bits = 64; + if (!op.encoding.is_signed) { + max_bits = 63; + } + if (op.encoding.bits < 1 || max_bits < op.encoding.bits) { + return rocksdb::Status::InvalidArgument("Uncorrected bitfield encoding length."); + } + + IntResult old_value; + if (op.encoding.is_signed) { + old_value.i = GetSignedBitfield(view, op.offset, op.encoding.bits); + } else { + old_value.u = GetUnsignedBitfield(view, op.offset, op.encoding.bits); + } + + IntResult new_value; + if (BitfieldOp(op, old_value.u, &new_value.u)) { + if (op.type != BitfieldOperation::Type::kGet) { + SetBitfield(view, op.offset, op.encoding.bits, new_value.u); + } + + if (op.type == BitfieldOperation::Type::kSet) { + if (op.encoding.is_signed) { + ret = redis::Integer(old_value.i); + } else { + ret = redis::Integer(old_value.u); + } + } else { + if (op.encoding.is_signed) { + ret = redis::Integer(new_value.i); + } else { + ret = redis::Integer(new_value.u); + } + } + } else { + ret = redis::NilString(); + } + + rets->push_back(std::move(ret)); + } + + raw_value->resize(header_offset); + raw_value->append(string_value); + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisString); + batch->PutLogData(log_data.Encode()); + batch->Put(metadata_cf_handle_, ns_key, *raw_value); + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status BitmapString::BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value, + const std::vector &ops, + std::vector *rets) { + std::string_view string_value = raw_value; + string_value = string_value.substr(Metadata::GetOffsetAfterExpire(string_value[0])); + + ReadOnlyStringBitmapView view(string_value); + for (BitfieldOperation op : ops) { + if (op.type != BitfieldOperation::Type::kGet) { + return rocksdb::Status::InvalidArgument("Write bitfield in read-only mode."); + } + + uint8_t max_bits = 64; + if (!op.encoding.is_signed) { + max_bits = 63; + } + if (op.encoding.bits < 1 || max_bits < op.encoding.bits) { + return rocksdb::Status::InvalidArgument("Uncorrected bitfield encoding length."); + } + + if (op.encoding.is_signed) { + int64_t value = GetSignedBitfield(view, op.offset, op.encoding.bits); + rets->push_back(redis::Integer(value)); + } else { + uint64_t value = GetUnsignedBitfield(view, op.offset, op.encoding.bits); + rets->push_back(redis::Integer(value)); + } + } + + return rocksdb::Status::OK(); +} + } // namespace redis diff --git a/src/types/redis_bitmap_string.h b/src/types/redis_bitmap_string.h index 027c6615930..f026655975e 100644 --- a/src/types/redis_bitmap_string.h +++ b/src/types/redis_bitmap_string.h @@ -23,6 +23,7 @@ #include #include +#include "common/bitfield_util.h" #include "storage/redis_db.h" #include "storage/redis_metadata.h" @@ -36,6 +37,10 @@ class BitmapString : public Database { static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, int64_t stop, uint32_t *cnt); static rocksdb::Status BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos); + rocksdb::Status Bitfield(const Slice &ns_key, std::string *raw_value, const std::vector &ops, + std::vector *rets); + static rocksdb::Status BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value, + const std::vector &ops, std::vector *rets); static size_t RawPopcount(const uint8_t *p, int64_t count); static int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit); diff --git a/tests/cppunit/bitfield_util.cc b/tests/cppunit/bitfield_util.cc new file mode 100644 index 00000000000..1221440bd49 --- /dev/null +++ b/tests/cppunit/bitfield_util.cc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "common/bitfield_util.h" + +#include + +#include +#include + +#include "common/encoding.h" + +TEST(BitfieldUtil, Get) { + std::vector big_endian_bitmap{0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x00, 0xff}; + std::vector little_endian_bitmap(big_endian_bitmap); + std::reverse(little_endian_bitmap.begin(), little_endian_bitmap.end()); + + for (int bits = 16; bits <= 64; bits *= 2) { + for (uint64_t offset = 0; bits + offset <= big_endian_bitmap.size() * 8; offset += bits) { + uint64_t value = GetUnsignedBitfield(big_endian_bitmap.data(), offset, bits); + if (IsBigEndian()) { + EXPECT_EQ(0, memcmp(&value, big_endian_bitmap.data(), bits / 8)); + } else { + EXPECT_EQ(0, memcmp(&value, little_endian_bitmap.data(), bits / 8)); + } + } + } +} diff --git a/tests/cppunit/types/bitmap_test.cc b/tests/cppunit/types/bitmap_test.cc index 75f2d3f88a8..4a926d40152 100644 --- a/tests/cppunit/types/bitmap_test.cc +++ b/tests/cppunit/types/bitmap_test.cc @@ -22,6 +22,7 @@ #include +#include "server/redis_reply.h" #include "test_base.h" #include "types/redis_bitmap.h" @@ -89,3 +90,315 @@ TEST_F(RedisBitmapTest, BitPosSetBit) { } auto s = bitmap_->Del(key_); } + +TEST_F(RedisBitmapTest, BitfieldGetSetTest) { + constexpr uint32_t magic = 0xdeadbeef; + + std::vector rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = 32; + op.encoding.is_signed = false; + op.offset = 114514; + op.value = magic; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(redis::Integer(0), rets[0]); + rets.clear(); + + op.type = BitfieldOperation::Type::kGet; + op.encoding.bits = 1; + + // bitfield is stored in big-endian. + for (int i = 31; i != -1; --i) { + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(redis::Integer((magic >> i) & 1), rets[0]); + rets.clear(); + op.offset++; + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldTest) { + constexpr uint8_t bits = 5; + static_assert(bits < 64); + constexpr uint64_t max = (uint64_t(1) << bits) - 1; + + std::vector rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = false; + op.offset = 8189; // the two bitmap segments divide the bitfield + op.value = 0; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(redis::Integer(0), rets[0]); + rets.clear(); + + for (uint64_t i = 1; i <= max; ++i) { + op.value = int64_t(i); + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(redis::Integer(i - 1), rets[0]); + rets.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldTest) { + constexpr uint8_t bits = 10; + constexpr int64_t max = (uint64_t(1) << (bits - 1)) - 1; + constexpr int64_t min = -max - 1; + + std::vector rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = true; + op.offset = 8189; // the two bitmap segments divide the bitfield + op.value = min; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(redis::Integer(0), rets[0]); + rets.clear(); + + for (int64_t i = min + 1; i <= max; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(redis::Integer(i - 1), rets[0]); + rets.clear(); + } + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldWrapSetTest) { + constexpr uint8_t bits = 6; + constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1; + constexpr int64_t min = -max - 1; + constexpr int64_t loopback = int64_t(1) << bits; + + std::vector res; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = true; + op.offset = 0; + op.value = max; + + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + op.value = 1; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(min), res[0]); + res.clear(); + + op.value = loopback; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(min), res[0]); + res.clear(); + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldWrapSetTest) { + constexpr uint8_t bits = 6; + static_assert(bits < 64); + constexpr uint64_t max = (uint64_t(1) << bits) - 1; + constexpr int64_t loopback = int64_t(1) << bits; + + std::vector res; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = false; + op.offset = 0; + op.value = max; + + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + op.value = 1; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.value = loopback; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldSatSetTest) { + constexpr uint8_t bits = 6; + constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1; + // constexpr int64_t min = -max - 1; + + std::vector res; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = true; + op.overflow = BitfieldOverflowBehavior::kSat; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kGet; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(max), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 0; i <= max + 10; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(max), res[0]); + res.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldSatSetTest) { + constexpr uint8_t bits = 6; + static_assert(bits < 64); + constexpr uint64_t max = (uint64_t(1) << bits) - 1; + + std::vector res; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = false; + op.overflow = BitfieldOverflowBehavior::kSat; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kGet; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(max), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 0; i <= int64_t(max) + 10; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(max), res[0]); + res.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldFailSetTest) { + constexpr uint8_t bits = 5; + constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1; + + std::vector res; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = true; + op.overflow = BitfieldOverflowBehavior::kFail; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::NilString(), res[0]); + res.clear(); + + op.value = max; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 1; i <= max; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::NilString(), res[0]); + res.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldFailSetTest) { + constexpr uint8_t bits = 5; + constexpr int64_t max = (int64_t(1) << bits) - 1; + + std::vector res; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding.bits = bits; + op.encoding.is_signed = false; + op.overflow = BitfieldOverflowBehavior::kFail; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::NilString(), res[0]); + res.clear(); + + op.value = max; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::Integer(0), res[0]); + res.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 1; i <= max; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &res); + EXPECT_EQ(1, res.size()); + EXPECT_EQ(redis::NilString(), res[0]); + res.clear(); + } + + auto s = bitmap_->Del(key_); +}