diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 3dc1f2259eb..09b220ae3c9 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -474,8 +474,8 @@ Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResp // Commands | Response | Instance // ++++++++++++++++++++++++++++++++++++++++ -// set Redis::Integer :1/r/n -// hset Redis::SimpleString +OK/r/n +// set Redis::Integer :1\r\n +// hset Redis::SimpleString +OK\r\n // sadd Redis::Integer // zadd Redis::Integer // siadd Redis::Integer @@ -497,6 +497,7 @@ Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResp // sirem Redis::Integer // del Redis::Integer // xadd Redis::BulkString +// bitfield Redis::Array *1\r\n:0 Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) { if (sock_fd < 0 || total <= 0) { return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, count={}", sock_fd, total)}; @@ -509,7 +510,7 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) { setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); // Start checking response - size_t bulk_len = 0; + size_t bulk_or_array_len = 0; int cnt = 0; parser_state_ = ParserState::ArrayLen; UniqueEvbuf evbuf; @@ -534,14 +535,20 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) { if (line[0] == '-') { return {Status::NotOK, fmt::format("got invalid response of length {}: {}", line.length, line.get())}; - } else if (line[0] == '$') { + } else if (line[0] == '$' || line[0] == '*') { auto parse_result = ParseInt(std::string(line.get() + 1, line.length - 1), 10); if (!parse_result) { return {Status::NotOK, "protocol error: expected integer value"}; } - bulk_len = *parse_result; - parser_state_ = bulk_len > 0 ? ParserState::BulkData : ParserState::OneRspEnd; + bulk_or_array_len = *parse_result; + if (bulk_or_array_len <= 0) { + parser_state_ = ParserState::OneRspEnd; + } else if (line[0] == '$') { + parser_state_ = ParserState::BulkData; + } else { + parser_state_ = ParserState::ArrayData; + } } else if (line[0] == '+' || line[0] == ':') { parser_state_ = ParserState::OneRspEnd; } else { @@ -552,17 +559,33 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) { } // Handle bulk string response case ParserState::BulkData: { - if (evbuffer_get_length(evbuf.get()) < bulk_len + 2) { + if (evbuffer_get_length(evbuf.get()) < bulk_or_array_len + 2) { LOG(INFO) << "[migrate] Bulk data in event buffer is not complete, read socket again"; run = false; break; } // TODO(chrisZMF): Check tail '\r\n' - evbuffer_drain(evbuf.get(), bulk_len + 2); - bulk_len = 0; + evbuffer_drain(evbuf.get(), bulk_or_array_len + 2); + bulk_or_array_len = 0; parser_state_ = ParserState::OneRspEnd; break; } + case ParserState::ArrayData: { + while (run && bulk_or_array_len > 0) { + evbuffer_ptr ptr = evbuffer_search_eol(evbuf.get(), nullptr, nullptr, EVBUFFER_EOL_CRLF_STRICT); + if (ptr.pos < 0) { + LOG(INFO) << "[migrate] Array data in event buffer is not complete, read socket again"; + run = false; + break; + } + evbuffer_drain(evbuf.get(), ptr.pos + 2); + --bulk_or_array_len; + } + if (run) { + parser_state_ = ParserState::OneRspEnd; + } + break; + } case ParserState::OneRspEnd: { cnt++; if (cnt >= total) { diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 4ac00ad58d1..9aad1dcc5ac 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -138,7 +138,7 @@ class SlotMigrator : public redis::Database { void resumeSyncCtx(const Status &migrate_result); - enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd }; + enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd }; enum class ThreadState { Uninitialized, Running, Terminated }; static const int kDefaultMaxPipelineSize = 16; diff --git a/src/commands/cmd_bit.cc b/src/commands/cmd_bit.cc index f0a25d391e9..c1d72462e42 100644 --- a/src/commands/cmd_bit.cc +++ b/src/commands/cmd_bit.cc @@ -19,6 +19,7 @@ */ #include "commander.h" +#include "commands/command_parser.h" #include "error_constants.h" #include "server/server.h" #include "types/redis_bitmap.h" @@ -132,6 +133,8 @@ class CommandBitCount : public Commander { class CommandBitPos : public Commander { public: + using Commander::Parse; + Status Parse(const std::vector &args) override { if (args.size() >= 4) { auto parse_start = ParseInt(args[3], 10); @@ -225,10 +228,158 @@ class CommandBitOp : public Commander { BitOpFlags op_flag_; }; +class CommandBitfield : public Commander { + public: + Status Parse(const std::vector &args) override { + BitfieldOperation cmd; + + read_only_ = true; + // BITFIELD [commands...] + for (CommandParser group(args, 2); group.Good();) { + auto remains = group.Remains(); + + std::string opcode = util::ToLower(group[0]); + if (opcode == "get") { + cmd.type = BitfieldOperation::Type::kGet; + } else if (opcode == "set") { + cmd.type = BitfieldOperation::Type::kSet; + read_only_ = false; + } else if (opcode == "incrby") { + cmd.type = BitfieldOperation::Type::kIncrBy; + read_only_ = false; + } else if (opcode == "overflow") { + constexpr auto kOverflowCmdSize = 2; + if (remains < kOverflowCmdSize) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + auto s = parseOverflowSubCommand(group[1], &cmd); + if (!s.IsOK()) { + return s; + } + + group.Skip(kOverflowCmdSize); + continue; + } else { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + + if (remains < 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + // parse encoding + auto encoding = parseBitfieldEncoding(group[1]); + if (!encoding.IsOK()) { + return encoding.ToStatus(); + } + cmd.encoding = encoding.GetValue(); + + // parse offset + if (!GetBitOffsetFromArgument(group[2], &cmd.offset).IsOK()) { + return {Status::RedisParseErr, "bit offset is not an integer or out of range"}; + } + + if (cmd.type != BitfieldOperation::Type::kGet) { + if (remains < 4) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + auto value = ParseInt(group[3], 10); + if (!value.IsOK()) { + return value.ToStatus(); + } + cmd.value = value.GetValue(); + + // SET|INCRBY + group.Skip(4); + } else { + // GET + group.Skip(3); + } + + cmds_.push_back(cmd); + } + + return Commander::Parse(args); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace()); + std::vector> rets; + rocksdb::Status s; + if (read_only_) { + s = bitmap_db.BitfieldReadOnly(args_[1], cmds_, &rets); + } else { + s = bitmap_db.Bitfield(args_[1], cmds_, &rets); + } + std::vector str_rets(rets.size()); + for (size_t i = 0; i != rets.size(); ++i) { + if (rets[i].has_value()) { + if (rets[i]->Encoding().IsSigned()) { + str_rets[i] = redis::Integer(CastToSignedWithoutBitChanges(rets[i]->Value())); + } else { + str_rets[i] = redis::Integer(rets[i]->Value()); + } + } else { + str_rets[i] = redis::NilString(); + } + } + *output = redis::Array(str_rets); + return Status::OK(); + } + + private: + static Status parseOverflowSubCommand(const std::string &overflow, BitfieldOperation *cmd) { + std::string lower = util::ToLower(overflow); + if (lower == "wrap") { + cmd->overflow = BitfieldOverflowBehavior::kWrap; + } else if (lower == "sat") { + cmd->overflow = BitfieldOverflowBehavior::kSat; + } else if (lower == "fail") { + cmd->overflow = BitfieldOverflowBehavior::kFail; + } else { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + return Status::OK(); + } + + static StatusOr parseBitfieldEncoding(const std::string &token) { + if (token.empty()) { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + + auto sign = std::tolower(token[0]); + if (sign != 'u' && sign != 'i') { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + + auto type = BitfieldEncoding::Type::kUnsigned; + if (sign == 'i') { + type = BitfieldEncoding::Type::kSigned; + } + + auto bits_parse = ParseInt(token.substr(1), 10); + if (!bits_parse.IsOK()) { + return bits_parse.ToStatus(); + } + uint8_t bits = bits_parse.GetValue(); + + auto encoding = BitfieldEncoding::Create(type, bits); + if (!encoding.IsOK()) { + return {Status::RedisParseErr, errUnknownSubcommandOrWrongArguments}; + } + return encoding.GetValue(); + } + + std::vector cmds_; + bool read_only_; +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("getbit", 3, "read-only", 1, 1, 1), MakeCmdAttr("setbit", 4, "write", 1, 1, 1), MakeCmdAttr("bitcount", -2, "read-only", 1, 1, 1), MakeCmdAttr("bitpos", -3, "read-only", 1, 1, 1), - MakeCmdAttr("bitop", -4, "write", 2, -1, 1), ) + MakeCmdAttr("bitop", -4, "write", 2, -1, 1), + MakeCmdAttr("bitfield", -2, "write", 1, 1, 1), ) } // namespace redis diff --git a/src/commands/command_parser.h b/src/commands/command_parser.h index 483778e2a52..3aa31ae0931 100644 --- a/src/commands/command_parser.h +++ b/src/commands/command_parser.h @@ -41,6 +41,9 @@ struct CommandParser { public: using ValueType = typename Iter::value_type; + static constexpr bool IsRandomAccessIter = + std::is_base_of_v::iterator_category>; + CommandParser(Iter begin, Iter end) : begin_(std::move(begin)), end_(std::move(end)) {} template @@ -56,12 +59,41 @@ struct CommandParser { decltype(auto) RawPeek() const { return *begin_; } + decltype(auto) operator[](size_t index) const { + Iter iter = begin_; + std::advance(iter, index); + return *iter; + } + decltype(auto) RawTake() { return *begin_++; } decltype(auto) RawNext() { ++begin_; } bool Good() const { return begin_ != end_; } + std::enable_if_t Remains() const { + // O(1) iff Iter is random access iterator. + auto d = std::distance(begin_, end_); + DCHECK(d >= 0); + return d; + } + + size_t Skip(size_t count) { + if constexpr (IsRandomAccessIter) { + size_t steps = std::min(Remains(), count); + begin_ += steps; + return steps; + } else { + size_t steps = 0; + while (count != 0 && Good()) { + ++begin_; + ++steps; + --count; + } + return steps; + } + } + template bool EatPred(Pred&& pred) { if (Good() && std::forward(pred)(RawPeek())) { diff --git a/src/common/bitfield_util.cc b/src/common/bitfield_util.cc new file mode 100644 index 00000000000..bd8a549fd56 --- /dev/null +++ b/src/common/bitfield_util.cc @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "bitfield_util.h" + +namespace detail { + +static uint64_t WrappedSignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits) { + uint64_t res = value + static_cast(incr); + if (bits < 64) { + auto mask = std::numeric_limits::max() << bits; + if ((res & (1 << (bits - 1))) != 0) { + res |= mask; + } else { + res &= ~mask; + } + } + return res; +} + +// See also https://github.com/redis/redis/blob/7f4bae817614988c43c3024402d16edcbf3b3277/src/bitops.c#L325 +StatusOr SignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, + uint64_t *dst) { + Status bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kSigned, bits)); + if (!bits_status) { + return bits_status; + } + + auto max = std::numeric_limits::max(); + if (bits != 64) { + max = (static_cast(1) << (bits - 1)) - 1; + } + int64_t min = -max - 1; + + int64_t signed_value = CastToSignedWithoutBitChanges(value); + int64_t max_incr = CastToSignedWithoutBitChanges(static_cast(max) - value); + int64_t min_incr = min - signed_value; + + if (signed_value > max || (bits != 64 && incr > max_incr) || (signed_value >= 0 && incr >= 0 && incr > max_incr)) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = WrappedSignedBitfieldPlus(value, incr, bits); + } else if (overflow == BitfieldOverflowBehavior::kSat) { + *dst = max; + } else { + DCHECK(overflow == BitfieldOverflowBehavior::kFail); + } + return true; + } else if (signed_value < min || (bits != 64 && incr < min_incr) || + (signed_value < 0 && incr < 0 && incr < min_incr)) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = WrappedSignedBitfieldPlus(value, incr, bits); + } else if (overflow == BitfieldOverflowBehavior::kSat) { + *dst = min; + } else { + DCHECK(overflow == BitfieldOverflowBehavior::kFail); + } + return true; + } + + *dst = signed_value + incr; + return false; +} + +static uint64_t WrappedUnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits) { + uint64_t mask = std::numeric_limits::max() << bits; + uint64_t res = value + incr; + res &= ~mask; + return res; +} + +// See also https://github.com/redis/redis/blob/7f4bae817614988c43c3024402d16edcbf3b3277/src/bitops.c#L288 +StatusOr UnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, + uint64_t *dst) { + Status bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kUnsigned, bits)); + if (!bits_status) { + return bits_status; + } + + auto max = (static_cast(1) << bits) - 1; + int64_t max_incr = CastToSignedWithoutBitChanges(max - value); + int64_t min_incr = CastToSignedWithoutBitChanges((~value) + 1); + + if (value > max || (incr > 0 && incr > max_incr)) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = WrappedUnsignedBitfieldPlus(value, incr, bits); + } else if (overflow == BitfieldOverflowBehavior::kSat) { + *dst = max; + } else { + DCHECK(overflow == BitfieldOverflowBehavior::kFail); + } + return true; + } else if (incr < 0 && incr < min_incr) { + if (overflow == BitfieldOverflowBehavior::kWrap) { + *dst = WrappedUnsignedBitfieldPlus(value, incr, bits); + } else if (overflow == BitfieldOverflowBehavior::kSat) { + *dst = 0; + } else { + DCHECK(overflow == BitfieldOverflowBehavior::kFail); + } + return true; + } + + *dst = value + incr; + return false; +} + +} // namespace detail diff --git a/src/common/bitfield_util.h b/src/common/bitfield_util.h new file mode 100644 index 00000000000..dbb44b1db76 --- /dev/null +++ b/src/common/bitfield_util.h @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "status.h" + +enum class BitfieldOverflowBehavior : uint8_t { kWrap, kSat, kFail }; + +class BitfieldEncoding { + public: + enum class Type : uint8_t { kSigned, kUnsigned }; + + // check whether the bit length is fit to given number sign. + // Redis has bits length limitation to bitfield. + // Quote: + // The supported encodings are up to 64 bits for signed integers, and up to 63 bits for unsigned integers. This + // limitation with unsigned integers is due to the fact that currently the Redis protocol is unable to return 64 bit + // unsigned integers as replies. + // see also https://redis.io/commands/bitfield/ + static Status CheckSupportedBitLengths(Type type, uint8_t bits) noexcept { + uint8_t max_bits = 64; + if (type == Type::kUnsigned) { + max_bits = 63; + } + if (1 <= bits && bits <= max_bits) { + return Status::OK(); + } + + return {Status::NotOK, "Unsupported new bits length, only i1~i64, u1~u63 are supported."}; + } + + static StatusOr Create(Type type, uint8_t bits) noexcept { + Status bits_status(CheckSupportedBitLengths(type, bits)); + if (!bits_status) { + return bits_status; + } + + BitfieldEncoding enc; + enc.type_ = type; + enc.bits_ = bits; + return enc; + } + + Type GetType() const noexcept { return type_; } + + bool IsSigned() const noexcept { return type_ == Type::kSigned; } + + bool IsUnsigned() const noexcept { return type_ == Type::kUnsigned; } + + uint8_t Bits() const noexcept { return bits_; } + + Status SetBitsCount(uint8_t new_bits) noexcept { + Status bits_status(CheckSupportedBitLengths(type_, new_bits)); + if (!bits_status) { + return bits_status; + } + + bits_ = new_bits; + return Status::OK(); + } + + Status SetType(Type new_type) noexcept { + Status bits_status(CheckSupportedBitLengths(new_type, bits_)); + if (!bits_status) { + return bits_status; + } + + type_ = new_type; + return Status::OK(); + } + + std::string ToString() const noexcept { return (IsSigned() ? "i" : "u") + std::to_string(static_cast(bits_)); } + + private: + BitfieldEncoding() = default; + + Type type_; + uint8_t bits_; +}; + +struct BitfieldOperation { + // see https://redis.io/commands/bitfield/ to get more details. + enum class Type : uint8_t { kGet, kSet, kIncrBy }; + + Type type; + BitfieldOverflowBehavior overflow{BitfieldOverflowBehavior::kWrap}; + BitfieldEncoding encoding{BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 32).GetValue()}; + uint32_t offset; + // INCRBY amount or SET value + int64_t value; +}; + +namespace detail { +// Let value add incr, according to the bits limit and overflow rule. The value is reguarded as a signed integer. +// Return true if overflow. Status is not ok iff calling BitfieldEncoding::IsSupportedBitLengths() +// for given op return false. +StatusOr SignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, + uint64_t *dst); + +// Let value add incr, according to the bits limit and overflow rule. The value is reguarded as an unsigned integer. +// Return true if overflow. Status is not ok iff calling BitfieldEncoding::IsSupportedBitLengths() +// for given op return false. +StatusOr UnsignedBitfieldPlus(uint64_t value, int64_t incr, uint8_t bits, BitfieldOverflowBehavior overflow, + uint64_t *dst); +} // namespace detail + +// safe cast from unsigned to signed, without any bit changes. +// see also "Integral conversions" on https://en.cppreference.com/w/cpp/language/implicit_conversion +// If the destination type is signed, the result when overflow is implementation-defined until C++20 +inline int64_t CastToSignedWithoutBitChanges(uint64_t x) { + int64_t res = 0; + memcpy(&res, &x, sizeof(res)); + return res; +} + +class BitfieldValue { + public: + BitfieldValue(BitfieldEncoding encoding, uint64_t value) noexcept : encoding_(encoding), value_(value) {} + + template , int> = 0> + bool operator==(T rhs) const { + return value_ == static_cast(rhs); + } + + template + friend bool operator==(T lhs, const BitfieldValue &rhs) { + return rhs == lhs; + } + + BitfieldEncoding Encoding() const noexcept { return encoding_; } + + uint64_t Value() const noexcept { return value_; } + + private: + BitfieldEncoding encoding_; + uint64_t value_; +}; + +// Let value add incr, according to the encoding and overflow rule. +// return true if overflow. Status is not ok iff calling BitfieldEncoding::IsSupportedBitLengths() +// for given op return false. +inline StatusOr BitfieldPlus(uint64_t value, int64_t incr, BitfieldEncoding enc, + BitfieldOverflowBehavior overflow, uint64_t *dst) { + if (enc.IsSigned()) { + return detail::SignedBitfieldPlus(value, incr, enc.Bits(), overflow, dst); + } + return detail::UnsignedBitfieldPlus(value, incr, enc.Bits(), overflow, dst); +} + +// return true if successful. Status is not ok iff calling BitfieldEncoding::IsSupportedBitLengths() +// for given op return false. +inline StatusOr BitfieldOp(BitfieldOperation op, uint64_t old_value, uint64_t *new_value) { + if (op.type == BitfieldOperation::Type::kGet) { + *new_value = old_value; + return true; + } + + bool overflow = false; + if (op.type == BitfieldOperation::Type::kSet) { + overflow = GET_OR_RET(BitfieldPlus(op.value, 0, op.encoding, op.overflow, new_value)); + } else { + overflow = GET_OR_RET(BitfieldPlus(old_value, op.value, op.encoding, op.overflow, new_value)); + } + + return op.overflow != BitfieldOverflowBehavior::kFail || !overflow; +} + +// Use a small buffer to store a range of bytes on a bitmap. +// If you try to visit other place on bitmap, it will failed. +class ArrayBitfieldBitmap { + public: + // byte_offset is the byte offset of view in entire bitmap. + explicit ArrayBitfieldBitmap(uint32_t byte_offset = 0) noexcept : byte_offset_(byte_offset) { Reset(); } + + ArrayBitfieldBitmap(const ArrayBitfieldBitmap &) = delete; + ArrayBitfieldBitmap &operator=(const ArrayBitfieldBitmap &) = delete; + ~ArrayBitfieldBitmap() = default; + + // Change the position this represents. + void SetByteOffset(uint64_t byte_offset) noexcept { byte_offset_ = byte_offset; } + + void Reset() { memset(buf_, 0, sizeof(buf_)); } + + Status Set(uint32_t byte_offset, uint32_t bytes, const uint8_t *src) { + Status bound_status(checkLegalBound(byte_offset, bytes)); + if (!bound_status) { + return bound_status; + } + byte_offset -= byte_offset_; + memcpy(buf_ + byte_offset, src, bytes); + return Status::OK(); + } + + Status Get(uint32_t byte_offset, uint32_t bytes, uint8_t *dst) const { + Status bound_status(checkLegalBound(byte_offset, bytes)); + if (!bound_status) { + return bound_status; + } + byte_offset -= byte_offset_; + memcpy(dst, buf_ + byte_offset, bytes); + return Status::OK(); + } + + StatusOr GetUnsignedBitfield(uint64_t bit_offset, uint64_t bits) const { + Status bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kUnsigned, bits)); + if (!bits_status) { + return bits_status; + } + return getBitfield(bit_offset, bits); + } + + StatusOr GetSignedBitfield(uint64_t bit_offset, uint64_t bits) const { + Status bits_status(BitfieldEncoding::CheckSupportedBitLengths(BitfieldEncoding::Type::kSigned, bits)); + if (!bits_status) { + return bits_status; + } + uint64_t bitfield = GET_OR_RET(getBitfield(bit_offset, bits)); + int64_t value = CastToSignedWithoutBitChanges(bitfield); + + // for a bits of k signed number, 1 << (bits - 1) is the MSB (most-significant bit). + // the number is a negative when the MSB is "1". + auto msb = static_cast(1) << (bits - 1); // NOLINT + if ((value & msb) != 0) { + // The way of enlarge width of a signed integer is sign-extended. + // The values of higher bits should all "1", when the number is negative. + // For example: + // constexpr int32_t a = -128; + // static_assert(a == 0xffffff80); + // constexpr int64_t b = a; // b is -128 too. + // static_assert(b == 0xffffffffffffff80); + value |= CastToSignedWithoutBitChanges(std::numeric_limits::max() << bits); + } + return value; + } + + Status SetBitfield(uint32_t bit_offset, uint32_t bits, uint64_t value) { + uint32_t first_byte = bit_offset / 8; + uint32_t last_byte = (bit_offset + bits - 1) / 8 + 1; + Status bound_status(checkLegalBound(first_byte, last_byte - first_byte)); + if (!bound_status) { + return bound_status; + } + + bit_offset -= byte_offset_ * 8; + while (bits--) { + bool v = (value & (uint64_t(1) << bits)) != 0; + uint32_t byte = bit_offset >> 3; + uint32_t bit = 7 - (bit_offset & 7); + uint32_t byteval = buf_[byte]; + byteval &= ~(1 << bit); + byteval |= int(v) << bit; + buf_[byte] = char(byteval); + bit_offset++; + } + return Status::OK(); + } + + private: + // The bit field cannot exceed 64 bits, takes an extra byte when it unaligned. + static constexpr uint32_t kSize = 8 + 1; + + Status checkLegalBound(uint32_t byte_offset, uint32_t bytes) const noexcept { + if (byte_offset < byte_offset_ || byte_offset_ + kSize < byte_offset + bytes) { + return {Status::NotOK, "The range [offset, offset + bytes) is out of bitfield."}; + } + return Status::OK(); + } + + StatusOr getBitfield(uint32_t bit_offset, uint8_t bits) const { + uint32_t first_byte = bit_offset / 8; + uint32_t last_byte = (bit_offset + bits - 1) / 8 + 1; + Status bound_status(checkLegalBound(first_byte, last_byte - first_byte)); + if (!bound_status) { + return bound_status; + } + + bit_offset -= byte_offset_ * 8; + uint64_t value = 0; + while (bits--) { + uint32_t byte = bit_offset >> 3; + uint32_t bit = 7 - (bit_offset & 0x7); + uint32_t byteval = buf_[byte]; + uint32_t bitval = (byteval >> bit) & 1; + value = (value << 1) | bitval; + bit_offset++; + } + return value; + } + + uint8_t buf_[kSize]; + uint32_t byte_offset_; +}; diff --git a/src/common/lock_manager.h b/src/common/lock_manager.h index ee4e17422be..782d6c8254a 100644 --- a/src/common/lock_manager.h +++ b/src/common/lock_manager.h @@ -92,10 +92,18 @@ class LockGuard { LockGuard(const LockGuard &) = delete; LockGuard &operator=(const LockGuard &) = delete; - LockGuard(LockGuard &&guard) : lock_(guard.lock_) { guard.lock_ = nullptr; } + LockGuard(LockGuard &&guard) noexcept : lock_(guard.lock_) { guard.lock_ = nullptr; } + + LockGuard &operator=(LockGuard &&other) noexcept { + if (&other != this) { + std::destroy_at(this); + new (this) LockGuard(std::move(other)); + } + return *this; + } private: - std::mutex *lock_; + std::mutex *lock_{nullptr}; }; class MultiLockGuard { diff --git a/src/common/parse_util.h b/src/common/parse_util.h index 63c03c00a94..3e3cf116c3f 100644 --- a/src/common/parse_util.h +++ b/src/common/parse_util.h @@ -33,13 +33,18 @@ namespace details { template struct ParseIntFunc; +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtol; +}; + template <> struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtol; }; template <> -struct ParseIntFunc { +struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtol; }; @@ -53,13 +58,18 @@ struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtoll; }; +template <> +struct ParseIntFunc { // NOLINT + constexpr static const auto value = std::strtoul; +}; + template <> struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtoul; }; template <> -struct ParseIntFunc { +struct ParseIntFunc { // NOLINT constexpr static const auto value = std::strtoul; }; diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index c66aca80eef..db7174d0839 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -232,6 +232,10 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic first_seen_ = false; } break; + case kRedisCmdBitfield: + command_args = {"BITFIELD", user_key}; + command_args.insert(command_args.end(), args->begin() + 1, args->end()); + break; default: LOG(ERROR) << "Failed to parse write_batch in PutCF. Type=Bitmap: unhandled command with code " << *parsed_cmd; diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 48c40f67c09..cbdb7a41e6d 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -61,6 +61,7 @@ enum RedisCommand { kRedisCmdExpire, kRedisCmdSetBit, kRedisCmdBitOp, + kRedisCmdBitfield, kRedisCmdLMove, }; diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index a4c71cd2c16..b0fc65f6b0c 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -38,6 +38,25 @@ const char kErrBitmapStringOutOfRange[] = "The size of the bitmap string exceeds the " "configuration item max-bitmap-to-string-mb"; +// Resize the segment to makes its new length at least min_bytes, new bytes will be set to 0. +// min_bytes can not more than kBitmapSegmentBytes +void ExpandBitmapSegment(std::string *segment, size_t min_bytes) { + assert(min_bytes <= kBitmapSegmentBytes); + + auto old_size = segment->size(); + if (min_bytes > old_size) { + size_t new_size = 0; + if (min_bytes > old_size * 2) { + new_size = min_bytes; + } else if (old_size * 2 > kBitmapSegmentBytes) { + new_size = kBitmapSegmentBytes; + } else { + new_size = old_size * 2; + } + segment->resize(new_size, 0); + } +} + rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata *metadata, std::string *raw_value) { std::string old_metadata; metadata->Encode(&old_metadata); @@ -187,17 +206,7 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool new_ uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes; uint64_t used_size = index + byte_index + 1; uint64_t bitmap_size = std::max(used_size, metadata.size); - if (byte_index >= value.size()) { // expand the bitmap - size_t expand_size = 0; - if (byte_index >= value.size() * 2) { - expand_size = byte_index - value.size() + 1; - } else if (value.size() * 2 > kBitmapSegmentBytes) { - expand_size = kBitmapSegmentBytes - value.size(); - } else { - expand_size = value.size(); - } - value.append(expand_size, 0); - } + ExpandBitmapSegment(&value, byte_index + 1); uint32_t bit_offset = offset % 8; *old_bit = (value[byte_index] & (1 << bit_offset)) != 0; if (new_bit) { @@ -539,6 +548,321 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const std::string &op_name, co return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +// SegmentCacheStore is used to read segments from storage. +class Bitmap::SegmentCacheStore { + public: + SegmentCacheStore(engine::Storage *storage, rocksdb::ColumnFamilyHandle *metadata_cf_handle, + std::string namespace_key, const Metadata &bitmap_metadata) + : storage_(storage), + metadata_cf_handle_(metadata_cf_handle), + ns_key_(std::move(namespace_key)), + metadata_(bitmap_metadata) {} + + // Get a read-only segment by given index + rocksdb::Status Get(uint32_t index, const std::string **cache) { + std::string *res = nullptr; + auto s = get(index, /*set_dirty=*/false, &res); + if (s.ok()) { + *cache = res; + } + return s; + } + + // Get a segment by given index, and mark it dirty. + rocksdb::Status GetMut(uint32_t index, std::string **cache) { return get(index, /*set_dirty=*/true, cache); } + + // Add all dirty segments into write batch. + void BatchForFlush(ObserverOrUniquePtr &batch) { + uint64_t used_size = 0; + for (auto &[index, content] : cache_) { + if (content.first) { + std::string sub_key = + InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode(); + batch->Put(sub_key, content.second); + used_size = std::max(used_size, static_cast(index) * kBitmapSegmentBytes + content.second.size()); + } + } + if (used_size > metadata_.size) { + metadata_.size = used_size; + std::string bytes; + metadata_.Encode(&bytes); + batch->Put(metadata_cf_handle_, ns_key_, bytes); + } + } + + private: + rocksdb::Status get(uint32_t index, bool set_dirty, std::string **cache) { + auto [seg_itor, no_cache] = cache_.try_emplace(index); + auto &[is_dirty, str] = seg_itor->second; + + if (no_cache) { + is_dirty = false; + std::string sub_key = + InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode(); + rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key, &str); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + } + + is_dirty |= set_dirty; + *cache = &str; + return rocksdb::Status::OK(); + } + + static std::string getSegmentSubKey(uint32_t index) { return std::to_string(index * kBitmapSegmentBytes); } + + engine::Storage *storage_; + rocksdb::ColumnFamilyHandle *metadata_cf_handle_; + std::string ns_key_; + Metadata metadata_; + // Segment index -> [is_dirty, segment_cache_string] + std::unordered_map> cache_; +}; + +// Copy a range of bytes from entire bitmap and store them into ArrayBitfieldBitmap. +static rocksdb::Status CopySegmentsBytesToBitfield(Bitmap::SegmentCacheStore &store, uint32_t byte_offset, + uint32_t bytes, ArrayBitfieldBitmap *bitfield) { + bitfield->SetByteOffset(byte_offset); + bitfield->Reset(); + + uint32_t segment_index = byte_offset / kBitmapSegmentBytes; + int64_t remain_bytes = bytes; + // the byte_offset in current segment. + auto segment_byte_offset = static_cast(byte_offset % kBitmapSegmentBytes); + for (; remain_bytes > 0; ++segment_index) { + const std::string *cache = nullptr; + auto cache_status = store.Get(segment_index, &cache); + if (!cache_status.ok()) { + return cache_status; + } + + auto cache_size = static_cast(cache->size()); + auto copyable = std::max(0, cache_size - segment_byte_offset); + auto copy_count = std::min(static_cast(remain_bytes), copyable); + auto src = reinterpret_cast(cache->data() + segment_byte_offset); + auto status = bitfield->Set(byte_offset, copy_count, src); + if (!status) { + return rocksdb::Status::InvalidArgument(); + } + + // next segment will copy from its front. + byte_offset = (segment_index + 1) * kBitmapSegmentBytes; + // maybe negative, but still correct. + remain_bytes -= kBitmapSegmentBytes - segment_byte_offset; + segment_byte_offset = 0; + } + + return rocksdb::Status::OK(); +} + +static rocksdb::Status GetBitfieldInteger(const ArrayBitfieldBitmap &bitfield, uint32_t bit_offset, + BitfieldEncoding enc, uint64_t *res) { + if (enc.IsSigned()) { + auto status = bitfield.GetSignedBitfield(bit_offset, enc.Bits()); + if (!status) { + return rocksdb::Status::InvalidArgument(); + } + *res = status.GetValue(); + } else { + auto status = bitfield.GetUnsignedBitfield(bit_offset, enc.Bits()); + if (!status) { + return rocksdb::Status::InvalidArgument(); + } + *res = status.GetValue(); + } + return rocksdb::Status::OK(); +} + +static rocksdb::Status CopyBitfieldBytesToSegments(Bitmap::SegmentCacheStore &store, + const ArrayBitfieldBitmap &bitfield, uint32_t byte_offset, + uint32_t bytes) { + uint32_t segment_index = byte_offset / kBitmapSegmentBytes; + auto segment_byte_offset = static_cast(byte_offset % kBitmapSegmentBytes); + auto remain_bytes = static_cast(bytes); + for (; remain_bytes > 0; ++segment_index) { + std::string *cache = nullptr; + auto cache_status = store.GetMut(segment_index, &cache); + if (!cache_status.ok()) { + return cache_status; + } + + auto copy_count = std::min(remain_bytes, static_cast(kBitmapSegmentBytes - segment_byte_offset)); + if (static_cast(cache->size()) < segment_byte_offset + copy_count) { + cache->resize(segment_byte_offset + copy_count); + } + + auto dst = reinterpret_cast(cache->data()) + segment_byte_offset; + auto status = bitfield.Get(byte_offset, copy_count, dst); + if (!status) { + return rocksdb::Status::InvalidArgument(); + } + + // next segment will copy from its front. + byte_offset = (segment_index + 1) * kBitmapSegmentBytes; + // maybe negative, but still correct. + remain_bytes -= static_cast(kBitmapSegmentBytes - segment_byte_offset); + segment_byte_offset = 0; + } + return rocksdb::Status::OK(); +} + +template +rocksdb::Status Bitmap::bitfield(const Slice &user_key, const std::vector &ops, + std::vector> *rets) { + std::string ns_key = AppendNamespacePrefix(user_key); + + std::optional guard; + if constexpr (!ReadOnly) { + guard = LockGuard(storage_->GetLockManager(), ns_key); + } + + BitmapMetadata metadata; + std::string raw_value; + auto s = GetMetadata(ns_key, &metadata, &raw_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + + if (metadata.Type() == RedisType::kRedisString) { + if constexpr (ReadOnly) { + s = BitmapString::BitfieldReadOnly(ns_key, raw_value, ops, rets); + } else { + s = BitmapString(storage_, namespace_).Bitfield(ns_key, &raw_value, ops, rets); + } + return s; + } + + if (metadata.Type() != RedisType::kRedisBitmap) { + return rocksdb::Status::InvalidArgument("The value is not a bitmap or string."); + } + + // We firstly do the bitfield operation by fetching segments into memory. + // Use SegmentCacheStore to record dirty segments. (if not read-only mode) + SegmentCacheStore cache(storage_, metadata_cf_handle_, ns_key, metadata); + runBitfieldOperationsWithCache(cache, ops, rets); + + if constexpr (!ReadOnly) { + // Write changes into storage. + auto batch = storage_->GetWriteBatchBase(); + if (bitfieldWriteAheadLog(batch, ops)) { + cache.BatchForFlush(batch); + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + } + } + return rocksdb::Status::OK(); +} + +template +rocksdb::Status Bitmap::runBitfieldOperationsWithCache(SegmentCacheStore &cache, + const std::vector &ops, + std::vector> *rets) { + ArrayBitfieldBitmap bitfield; + for (BitfieldOperation op : ops) { + // found all bytes that contents the bitfield. + uint32_t first_byte = op.offset / 8; + uint32_t last_bytes = (op.offset + op.encoding.Bits() - 1) / 8 + 1; + uint32_t bytes = last_bytes - first_byte; + + auto segment_status = CopySegmentsBytesToBitfield(cache, first_byte, bytes, &bitfield); + if (!segment_status.ok()) { + return segment_status; + } + + // Covert the bitfield from a buffer to an integer. + uint64_t unsigned_old_value = 0; + auto s = GetBitfieldInteger(bitfield, op.offset, op.encoding, &unsigned_old_value); + if (!s.ok()) { + return s; + } + + if constexpr (ReadOnly) { + rets->emplace_back() = {op.encoding, unsigned_old_value}; + continue; + } + + auto &ret = rets->emplace_back(); + uint64_t unsigned_new_value = 0; + // BitfieldOp failed only when the length or bits illegal. + // BitfieldOperation already check above case in construction function. + if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value).GetValue()) { + if (op.type != BitfieldOperation::Type::kGet) { + Status _ = bitfield.SetBitfield(op.offset, op.encoding.Bits(), unsigned_new_value); + s = CopyBitfieldBytesToSegments(cache, bitfield, first_byte, bytes); + if (!s.ok()) { + return s; + } + } + + if (op.type == BitfieldOperation::Type::kSet) { + unsigned_new_value = unsigned_old_value; + } + + ret = {op.encoding, unsigned_new_value}; + } + } + + return rocksdb::Status::OK(); +} + +template rocksdb::Status Bitmap::bitfield(const Slice &, const std::vector &, + std::vector> *); +template rocksdb::Status Bitmap::bitfield(const Slice &, const std::vector &, + std::vector> *); + +// Return true if there are any write operation to bitmap. Otherwise return false. +bool Bitmap::bitfieldWriteAheadLog(const ObserverOrUniquePtr &batch, + const std::vector &ops) { + std::vector cmd_args{std::to_string(kRedisCmdBitfield)}; + auto current_overflow = BitfieldOverflowBehavior::kWrap; + for (BitfieldOperation op : ops) { + if (op.type == BitfieldOperation::Type::kGet) { + continue; + } + if (current_overflow != op.overflow) { + current_overflow = op.overflow; + std::string overflow_str; + switch (op.overflow) { + case BitfieldOverflowBehavior::kWrap: + overflow_str = "WRAP"; + break; + case BitfieldOverflowBehavior::kSat: + overflow_str = "SAT"; + break; + case BitfieldOverflowBehavior::kFail: + overflow_str = "FAIL"; + break; + } + cmd_args.emplace_back("OVERFLOW"); + cmd_args.emplace_back(std::move(overflow_str)); + } + + if (op.type == BitfieldOperation::Type::kSet) { + cmd_args.emplace_back("SET"); + } else { + cmd_args.emplace_back("INCRBY"); + } + cmd_args.push_back(op.encoding.ToString()); + cmd_args.push_back(std::to_string(op.offset)); + if (op.type == BitfieldOperation::Type::kSet) { + if (op.encoding.IsSigned()) { + cmd_args.push_back(std::to_string(op.value)); + } else { + cmd_args.push_back(std::to_string(static_cast(op.value))); + } + } else { + cmd_args.push_back(std::to_string(op.value)); + } + } + + if (cmd_args.size() > 1) { + WriteBatchLogData log_data(kRedisBitmap, std::move(cmd_args)); + batch->PutLogData(log_data.Encode()); + return true; + } + return false; +} + bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t offset) { bool bit = false; uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes; diff --git a/src/types/redis_bitmap.h b/src/types/redis_bitmap.h index 7b12834fd5c..a0ada45bfe0 100644 --- a/src/types/redis_bitmap.h +++ b/src/types/redis_bitmap.h @@ -20,9 +20,11 @@ #pragma once +#include #include #include +#include "common/bitfield_util.h" #include "storage/redis_db.h" #include "storage/redis_metadata.h" @@ -41,6 +43,8 @@ namespace redis { class Bitmap : public Database { public: + class SegmentCacheStore; + Bitmap(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {} rocksdb::Status GetBit(const Slice &user_key, uint32_t offset, bool *bit); rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, std::string *value); @@ -49,11 +53,31 @@ class Bitmap : public Database { rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos); rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const Slice &user_key, const std::vector &op_keys, int64_t *len); + rocksdb::Status Bitfield(const Slice &user_key, const std::vector &ops, + std::vector> *rets) { + return bitfield(user_key, ops, rets); + } + // read-only version for Bitfield(), if there is a write operation in ops, the function will return with failed + // status. + rocksdb::Status BitfieldReadOnly(const Slice &user_key, const std::vector &ops, + std::vector> *rets) { + return bitfield(user_key, ops, rets); + } static bool GetBitFromValueAndOffset(const std::string &value, uint32_t offset); static bool IsEmptySegment(const Slice &segment); private: + template + rocksdb::Status bitfield(const Slice &user_key, const std::vector &ops, + std::vector> *rets); + static bool bitfieldWriteAheadLog(const ObserverOrUniquePtr &batch, + const std::vector &ops); rocksdb::Status GetMetadata(const Slice &ns_key, BitmapMetadata *metadata, std::string *raw_value); + + template + static rocksdb::Status runBitfieldOperationsWithCache(SegmentCacheStore &cache, + const std::vector &ops, + std::vector> *rets); }; } // namespace redis diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc index 38a0c8f38ec..d9d77114985 100644 --- a/src/types/redis_bitmap_string.cc +++ b/src/types/redis_bitmap_string.cc @@ -25,6 +25,7 @@ #include #include "redis_string.h" +#include "server/redis_reply.h" #include "storage/redis_metadata.h" #include "type_util.h" @@ -204,4 +205,88 @@ int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t count, bool bit) { return res; } +rocksdb::Status BitmapString::Bitfield(const Slice &ns_key, std::string *raw_value, + const std::vector &ops, + std::vector> *rets) { + auto header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]); + std::string string_value = raw_value->substr(header_offset); + for (BitfieldOperation op : ops) { + // [first_byte, last_byte) + uint32_t first_byte = op.offset / 8; + uint32_t last_byte = (op.offset + op.encoding.Bits() - 1) / 8 + 1; + + // expand string if need. + if (string_value.size() < last_byte) { + string_value.resize(last_byte); + } + + ArrayBitfieldBitmap bitfield(first_byte); + auto str = reinterpret_cast(string_value.data() + first_byte); + auto s = bitfield.Set(first_byte, last_byte - first_byte, str); + + uint64_t unsigned_old_value = 0; + if (op.encoding.IsSigned()) { + unsigned_old_value = bitfield.GetSignedBitfield(op.offset, op.encoding.Bits()).GetValue(); + } else { + unsigned_old_value = bitfield.GetUnsignedBitfield(op.offset, op.encoding.Bits()).GetValue(); + } + + uint64_t unsigned_new_value = 0; + auto &ret = rets->emplace_back(); + if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value).GetValue()) { + if (op.type != BitfieldOperation::Type::kGet) { + // never failed. + s = bitfield.SetBitfield(op.offset, op.encoding.Bits(), unsigned_new_value); + auto dst = reinterpret_cast(string_value.data()) + first_byte; + s = bitfield.Get(first_byte, last_byte - first_byte, dst); + } + + if (op.type == BitfieldOperation::Type::kSet) { + unsigned_new_value = unsigned_old_value; + } + + ret = {op.encoding, unsigned_new_value}; + } + } + + raw_value->resize(header_offset); + raw_value->append(string_value); + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisString); + batch->PutLogData(log_data.Encode()); + batch->Put(metadata_cf_handle_, ns_key, *raw_value); + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status BitmapString::BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value, + const std::vector &ops, + std::vector> *rets) { + std::string_view string_value = raw_value; + string_value = string_value.substr(Metadata::GetOffsetAfterExpire(string_value[0])); + + for (BitfieldOperation op : ops) { + if (op.type != BitfieldOperation::Type::kGet) { + return rocksdb::Status::InvalidArgument("Write bitfield in read-only mode."); + } + + uint32_t first_byte = op.offset / 8; + uint32_t last_byte = (op.offset + op.encoding.Bits() - 1) / 8 + 1; + + ArrayBitfieldBitmap bitfield(first_byte); + auto s = bitfield.Set(first_byte, last_byte - first_byte, + reinterpret_cast(string_value.data() + first_byte)); + + if (op.encoding.IsSigned()) { + int64_t value = bitfield.GetSignedBitfield(op.offset, op.encoding.Bits()).GetValue(); + rets->emplace_back(std::in_place, op.encoding, static_cast(value)); + } else { + uint64_t value = bitfield.GetUnsignedBitfield(op.offset, op.encoding.Bits()).GetValue(); + rets->emplace_back(std::in_place, op.encoding, value); + } + } + + return rocksdb::Status::OK(); +} + } // namespace redis diff --git a/src/types/redis_bitmap_string.h b/src/types/redis_bitmap_string.h index 027c6615930..ab61c211bfa 100644 --- a/src/types/redis_bitmap_string.h +++ b/src/types/redis_bitmap_string.h @@ -20,9 +20,11 @@ #pragma once +#include #include #include +#include "common/bitfield_util.h" #include "storage/redis_db.h" #include "storage/redis_metadata.h" @@ -36,6 +38,11 @@ class BitmapString : public Database { static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, int64_t stop, uint32_t *cnt); static rocksdb::Status BitPos(const std::string &raw_value, bool bit, int64_t start, int64_t stop, bool stop_given, int64_t *pos); + rocksdb::Status Bitfield(const Slice &ns_key, std::string *raw_value, const std::vector &ops, + std::vector> *rets); + static rocksdb::Status BitfieldReadOnly(const Slice &ns_key, const std::string &raw_value, + const std::vector &ops, + std::vector> *rets); static size_t RawPopcount(const uint8_t *p, int64_t count); static int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit); diff --git a/tests/cppunit/bitfield_util.cc b/tests/cppunit/bitfield_util.cc new file mode 100644 index 00000000000..71976df4627 --- /dev/null +++ b/tests/cppunit/bitfield_util.cc @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "common/bitfield_util.h" + +#include + +#include +#include + +#include "common/encoding.h" + +TEST(BitfieldUtil, Get) { + std::vector big_endian_bitmap{0x00, 0xff, 0x00, 0xff, 0x00, 0xff, 0x00, 0xff}; + std::vector little_endian_bitmap(big_endian_bitmap); + std::reverse(little_endian_bitmap.begin(), little_endian_bitmap.end()); + + ArrayBitfieldBitmap bitfield(0); + auto s = bitfield.Set(0, big_endian_bitmap.size(), big_endian_bitmap.data()); + + for (int bits = 16; bits < 64; bits *= 2) { + for (uint64_t offset = 0; bits + offset <= big_endian_bitmap.size() * 8; offset += bits) { + uint64_t value = bitfield.GetUnsignedBitfield(offset, bits).GetValue(); + if (IsBigEndian()) { + EXPECT_EQ(0, memcmp(&value, big_endian_bitmap.data(), bits / 8)); + } else { + EXPECT_EQ(0, memcmp(&value, little_endian_bitmap.data(), bits / 8)); + } + } + } +} diff --git a/tests/cppunit/types/bitmap_test.cc b/tests/cppunit/types/bitmap_test.cc index 75f2d3f88a8..197f0ae9888 100644 --- a/tests/cppunit/types/bitmap_test.cc +++ b/tests/cppunit/types/bitmap_test.cc @@ -24,16 +24,21 @@ #include "test_base.h" #include "types/redis_bitmap.h" +#include "types/redis_string.h" class RedisBitmapTest : public TestBase { protected: - explicit RedisBitmapTest() { bitmap_ = std::make_unique(storage_, "bitmap_ns"); } + explicit RedisBitmapTest() { + bitmap_ = std::make_unique(storage_, "bitmap_ns"); + string_ = std::make_unique(storage_, "bitmap_ns"); + } ~RedisBitmapTest() override = default; void SetUp() override { key_ = "test_bitmap_key"; } void TearDown() override {} std::unique_ptr bitmap_; + std::unique_ptr string_; }; TEST_F(RedisBitmapTest, GetAndSetBit) { @@ -89,3 +94,365 @@ TEST_F(RedisBitmapTest, BitPosSetBit) { } auto s = bitmap_->Del(key_); } + +TEST_F(RedisBitmapTest, BitfieldGetSetTest) { + constexpr uint32_t magic = 0xdeadbeef; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, 32).GetValue(); + op.offset = 114514; + op.value = magic; + + EXPECT_TRUE(bitmap_->Bitfield(key_, {op}, &rets).ok()); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kGet; + auto _ = op.encoding.SetBitsCount(1); + + // bitfield is stored in big-endian. + for (int i = 31; i != -1; --i) { + EXPECT_TRUE(bitmap_->Bitfield(key_, {op}, &rets).ok()); + EXPECT_EQ((magic >> i) & 1, rets[0].value()); + rets.clear(); + op.offset++; + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldTest) { + constexpr uint8_t bits = 5; + static_assert(bits < 64); + constexpr uint64_t max = (uint64_t(1) << bits) - 1; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, bits).GetValue(); + op.offset = 8189; // the two bitmap segments divide the bitfield + op.value = 0; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + for (uint64_t i = 1; i <= max; ++i) { + op.value = int64_t(i); + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(i - 1, rets[0].value()); + rets.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldTest) { + constexpr uint8_t bits = 10; + constexpr int64_t max = (uint64_t(1) << (bits - 1)) - 1; + constexpr int64_t min = -max - 1; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, bits).GetValue(); + op.offset = 8189; // the two bitmap segments divide the bitfield + op.value = min; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + for (int64_t i = min + 1; i <= max; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(i - 1, rets[0].value()); + rets.clear(); + } + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldWrapSetTest) { + constexpr uint8_t bits = 6; + constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1; + constexpr int64_t min = -max - 1; + constexpr int64_t loopback = int64_t(1) << bits; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, bits).GetValue(); + op.offset = 0; + op.value = max; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + op.value = 1; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(min, rets[0].value()); + rets.clear(); + + op.value = loopback; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(min, rets[0].value()); + rets.clear(); + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldWrapSetTest) { + constexpr uint8_t bits = 6; + static_assert(bits < 64); + constexpr uint64_t max = (uint64_t(1) << bits) - 1; + constexpr int64_t loopback = int64_t(1) << bits; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, bits).GetValue(); + op.offset = 0; + op.value = max; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + op.value = 1; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.value = loopback; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldSatSetTest) { + constexpr uint8_t bits = 6; + constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, bits).GetValue(); + op.overflow = BitfieldOverflowBehavior::kSat; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kGet; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(max, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 0; i <= max + 10; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(max, rets[0].value()); + rets.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldSatSetTest) { + constexpr uint8_t bits = 6; + static_assert(bits < 64); + constexpr uint64_t max = (uint64_t(1) << bits) - 1; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, bits).GetValue(); + op.overflow = BitfieldOverflowBehavior::kSat; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kGet; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(max, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 0; i <= int64_t(max) + 10; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(max, rets[0].value()); + rets.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, SignedBitfieldFailSetTest) { + constexpr uint8_t bits = 5; + constexpr int64_t max = (int64_t(1) << (bits - 1)) - 1; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, bits).GetValue(); + op.overflow = BitfieldOverflowBehavior::kFail; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_FALSE(rets[0].has_value()); + rets.clear(); + + op.value = max; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 1; i <= max; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_FALSE(rets[0].has_value()); + rets.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, UnsignedBitfieldFailSetTest) { + constexpr uint8_t bits = 5; + constexpr int64_t max = (int64_t(1) << bits) - 1; + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kSet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kUnsigned, bits).GetValue(); + op.overflow = BitfieldOverflowBehavior::kFail; + op.offset = 0; + op.value = max * 2; + + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_FALSE(rets[0].has_value()); + rets.clear(); + + op.value = max; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + + op.type = BitfieldOperation::Type::kIncrBy; + for (int64_t i = 1; i <= max; ++i) { + op.value = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_FALSE(rets[0].has_value()); + rets.clear(); + } + + auto s = bitmap_->Del(key_); +} + +TEST_F(RedisBitmapTest, BitfieldStringGetSetTest) { + std::string str = "dan yuan ren chang jiu, qian li gong chan juan."; + string_->Set(key_, str); + + std::vector> rets; + + BitfieldOperation op; + op.type = BitfieldOperation::Type::kGet; + op.encoding = BitfieldEncoding::Create(BitfieldEncoding::Type::kSigned, 8).GetValue(); + + int i = 0; + for (auto ch : str) { + op.offset = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(ch, rets[0].value()); + rets.clear(); + i += 8; + } + + for (; static_cast(i) <= str.size() + 10; i += 8) { + op.offset = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(0, rets[0].value()); + rets.clear(); + } + + // reverse all i8 in bitmap. + op.type = BitfieldOperation::Type::kSet; + for (int l = 0, r = static_cast(str.size() - 1); l < r; ++l, --r) { + op.offset = l * 8; + op.value = str[r]; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(str[l], rets[0].value()); + rets.clear(); + + op.offset = r * 8; + op.value = str[l]; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(str[r], rets[0].value()); + rets.clear(); + } + std::reverse(str.begin(), str.end()); + + // check reversed string. + i = 0; + op.type = BitfieldOperation::Type::kGet; + for (auto ch : str) { + op.offset = i; + bitmap_->Bitfield(key_, {op}, &rets); + EXPECT_EQ(1, rets.size()); + EXPECT_EQ(ch, rets[0].value()); + rets.clear(); + i += 8; + } +} diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index ed990ae349c..b64a841febc 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -912,6 +912,12 @@ func TestSlotMigrateDataType(t *testing.T) { for i := 10000; i < 11000; i += 2 { require.NoError(t, rdb0.SetBit(ctx, keys[8], int64(i), 1).Err()) } + for i := 20000; i < 21000; i += 5 { + res := rdb0.BitField(ctx, keys[8], "SET", "u5", strconv.Itoa(i), 23) + require.NoError(t, res.Err()) + require.EqualValues(t, 1, len(res.Val())) + require.EqualValues(t, 0, res.Val()[0]) + } // 7. type sortint require.NoError(t, rdb0.Do(ctx, "SIADD", keys[9], 2, 4, 1, 3).Err()) require.NoError(t, rdb0.Do(ctx, "SIREM", keys[9], 2).Err()) @@ -957,6 +963,12 @@ func TestSlotMigrateDataType(t *testing.T) { for i := 0; i < 20; i += 2 { require.EqualValues(t, 0, rdb1.GetBit(ctx, keys[8], int64(i)).Val()) } + for i := 20000; i < 21000; i += 5 { + res := rdb1.BitField(ctx, keys[8], "GET", "u5", strconv.Itoa(i)) + require.NoError(t, res.Err()) + require.EqualValues(t, 1, len(res.Val())) + require.EqualValues(t, 23, res.Val()[0]) + } // 7. type sortint require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys[9], 0, -1).Val()) diff --git a/tests/gocase/unit/type/bitmap/bitmap_test.go b/tests/gocase/unit/type/bitmap/bitmap_test.go index 5b23389c9c9..eb778622b7f 100644 --- a/tests/gocase/unit/type/bitmap/bitmap_test.go +++ b/tests/gocase/unit/type/bitmap/bitmap_test.go @@ -302,4 +302,14 @@ func TestBitmap(t *testing.T) { Set2SetBit(t, rdb, ctx, "a", []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")) require.EqualValues(t, 32, rdb.BitOpOr(ctx, "x", "a", "b").Val()) }) + + t.Run("BITFIELD on string type", func(t *testing.T) { + str := "zhe ge ren hen lan, shen me dou mei you liu xia." + require.NoError(t, rdb.Set(ctx, "str", str, 0).Err()) + res := rdb.BitField(ctx, "str", "GET", "u8", "32", "SET", "u8", "32", 'r', "GET", "u8", "32") + require.NoError(t, res.Err()) + require.EqualValues(t, str[4], res.Val()[0]) + require.EqualValues(t, str[4], res.Val()[1]) + require.EqualValues(t, 'r', res.Val()[2]) + }) }