Skip to content

Commit

Permalink
minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Jul 30, 2024
1 parent 64a7a44 commit 7cf88ef
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 93 deletions.
65 changes: 65 additions & 0 deletions src/commands/cmd_hll.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include <types/redis_hyperloglog.h>

#include <algorithm>

#include "commander.h"
#include "commands/command_parser.h"
#include "commands/error_constants.h"
#include "error_constants.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "storage/redis_metadata.h"

namespace redis {

/// PFADD key [element [element ...]]
/// Complexity: O(1) for each element added.
class CommandPfAdd final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
std::vector<uint64_t> hashes(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
hashes[i - 1] = redis::HyperLogLog::HllHash(args_[i]);
}
uint64_t ret{};
auto s = hll.Add(args_[0], hashes, &ret);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::Integer(ret);
return Status::OK();
}
};

/// PFCOUNT key [key ...]
/// Complexity: O(1) with a very small average constant time when called with a single key.
/// O(N) with N being the number of keys, and much bigger constant times,
/// when called with multiple keys.
class CommandPfCount final : public Commander {};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandPfAdd>("pfadd", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandPfCount>("pfcount", -2, "write", 1, 1, 1), );

} // namespace redis
12 changes: 9 additions & 3 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,25 @@ rocksdb::Status SearchMetadata::Decode(Slice *input) {
return rocksdb::Status::OK();
}

void HyperloglogMetadata::Encode(std::string *dst) const {
void HyperLogLogMetadata::Encode(std::string *dst) const {
Metadata::Encode(dst);
PutFixed8(dst, static_cast<uint8_t>(encode_type_));
}

rocksdb::Status HyperloglogMetadata::Decode(Slice *input) {
rocksdb::Status HyperLogLogMetadata::Decode(Slice *input) {
if (auto s = Metadata::Decode(input); !s.ok()) {
return s;
}

if (!GetFixed8(input, reinterpret_cast<uint8_t *>(&encode_type_))) {
uint8_t encoded_type = 0;
if (!GetFixed8(input, &encoded_type)) {
return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
}
// Check validity of encode type
if (encoded_type > 0) {
return rocksdb::Status::InvalidArgument(fmt::format("Invalid encode type {}", encoded_type));
}
encode_type_ = static_cast<EncodeType>(encoded_type);

return rocksdb::Status::OK();
}
15 changes: 9 additions & 6 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,23 @@ class SearchMetadata : public Metadata {
rocksdb::Status Decode(Slice *input) override;
};

class HyperloglogMetadata : public Metadata {
class HyperLogLogMetadata : public Metadata {
public:
enum class EncodeType : uint8_t {
DENSE = 0, // dense encoding implement as sub keys to store registers by segment in data column family.
SPARSE = 1, // TODO sparse encoding implement as a compressed string to store registers in metadata column family.
// Redis-style dense encoding implement as bitmap like sub keys to
// store registers by segment in data column family.
// The registers are stored in 6-bit format and each segment contains
// 768 registers.
DENSE = 0,
// TODO: sparse encoding
// SPARSE = 1,
};

explicit HyperloglogMetadata(bool generate_version = true) : Metadata(kRedisHyperLogLog, generate_version) {}
explicit HyperLogLogMetadata(bool generate_version = true) : Metadata(kRedisHyperLogLog, generate_version) {}

void Encode(std::string *dst) const override;
using Metadata::Decode;
rocksdb::Status Decode(Slice *input) override;

private:
// TODO optimize for converting storage encoding automatically
EncodeType encode_type_ = EncodeType::DENSE;
};
42 changes: 14 additions & 28 deletions src/types/hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@

#include "hyperloglog.h"

#include "murmurhash2.h"
#include "vendor/murmurhash2.h"

/* Store the value of the register at position 'index' into variable 'val'.
* 'registers' is an array of unsigned bytes. */
uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t index) {
uint32_t byte = index * kHyperLogLogBits / 8;
uint8_t fb = index * kHyperLogLogBits & 7;
uint32_t byte = (index * kHyperLogLogRegisterBits) / 8;
uint8_t fb = (index * kHyperLogLogRegisterBits) & 7;
uint8_t fb8 = 8 - fb;
uint8_t b0 = registers[byte];
uint8_t b1 = 0;
if (fb > 8 - kHyperLogLogBits) {
if (fb > 8 - kHyperLogLogRegisterBits) {
b1 = registers[byte + 1];
}
return ((b0 >> fb) | (b1 << fb8)) & kHyperLogLogRegisterMax;
Expand All @@ -72,28 +72,21 @@ uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t index) {
/* Set the value of the register at position 'index' to 'val'.
* 'registers' is an array of unsigned bytes. */
void HllDenseSetRegister(uint8_t *registers, uint32_t index, uint8_t val) {
uint32_t byte = index * kHyperLogLogBits / 8;
uint8_t fb = index * kHyperLogLogBits & 7;
uint32_t byte = index * kHyperLogLogRegisterBits / 8;
uint8_t fb = index * kHyperLogLogRegisterBits & 7;
uint8_t fb8 = 8 - fb;
uint8_t v = val;
registers[byte] &= ~(kHyperLogLogRegisterMax << fb);
registers[byte] |= v << fb;
if (fb > 8 - kHyperLogLogBits) {
if (fb > 8 - kHyperLogLogRegisterBits) {
registers[byte + 1] &= ~(kHyperLogLogRegisterMax >> fb8);
registers[byte + 1] |= v >> fb8;
}
}

/* ========================= HyperLogLog algorithm ========================= */

/* Given a string element to add to the HyperLogLog, returns the length
* of the pattern 000..1 of the element hash. As a side effect 'register_index' is
* set which the element hashes to. */
uint8_t HllPatLen(const std::vector<uint8_t> &element, uint32_t *register_index) {
int elesize = static_cast<int>(element.size());
uint64_t hash = 0, bit = 0, index = 0;
int count = 0;

DenseHllResult ExtractDenseHllResult(uint64_t hash) {
/* Count the number of zeroes starting from bit kHyperLogLogRegisterCount
* (that is a power of two corresponding to the first bit we don't use
* as index). The max run can be 64-kHyperLogLogRegisterCountPow+1 = kHyperLogLogHashBitCount+1 bits.
Expand All @@ -105,19 +98,12 @@ uint8_t HllPatLen(const std::vector<uint8_t> &element, uint32_t *register_index)
*
* This may sound like inefficient, but actually in the average case
* there are high probabilities to find a 1 after a few iterations. */
hash = MurmurHash64A(element.data(), elesize, kHyperLogLogHashSeed);
index = hash & kHyperLogLogRegisterCountMask; /* Register index. */
uint32_t index = hash & kHyperLogLogRegisterCountMask; /* Register index. */
DCHECK_LT(index, kHyperLogLogRegisterCount);
hash >>= kHyperLogLogRegisterCountPow; /* Remove bits used to address the register. */
hash |= ((uint64_t)1 << kHyperLogLogHashBitCount); /* Make sure the loop terminates
and count will be <= kHyperLogLogHashBitCount+1. */
bit = 1;
count = 1; /* Initialized to 1 since we count the "00000...1" pattern. */
while ((hash & bit) == 0) {
count++;
bit <<= 1;
}
*register_index = (int)index;
return count;
hash |= (static_cast<uint64_t>(1U) << kHyperLogLogHashBitCount);
uint8_t ctz = __builtin_ctzll(hash);
return DenseHllResult{index, ctz};
}

/* Compute the register histogram in the dense representation. */
Expand Down Expand Up @@ -233,7 +219,7 @@ uint64_t HllCount(const std::vector<uint8_t> &registers) {
z *= 0.5;
}
z += m * HllSigma(reghisto[0] / (double)m);
e = static_cast<double>(llroundl(kHyperLogLogAlphaInf * m * m / z));
e = static_cast<double>(llroundl(kHyperLogLogAlpha * m * m / z));

return (uint64_t)e;
}
Expand Down
36 changes: 25 additions & 11 deletions src/types/hyperloglog.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,45 @@
*
*/

#pragma once

#include <cstdint>
#include <vector>

#include "redis_bitmap.h"

constexpr uint32_t kHyperLogLogRegisterCountPow = 14; /* The greater is Pow, the smaller the error. */
constexpr uint32_t kHyperLogLogHashBitCount =
64 - kHyperLogLogRegisterCountPow; /* The number of bits of the hash value used for determining the number of
leading zeros. */
/* The greater is Pow, the smaller the error. */
constexpr uint32_t kHyperLogLogRegisterCountPow = 14;
/* The number of bits of the hash value used for determining the number of leading zeros. */
constexpr uint32_t kHyperLogLogHashBitCount = 50;
constexpr uint32_t kHyperLogLogRegisterCount = 1 << kHyperLogLogRegisterCountPow; /* With Pow=14, 16384 registers. */

// NOTICE: adapt to the requirements of use Bitmap::SegmentCacheStore
constexpr size_t kHyperLogLogSegmentBytes = 768;
constexpr size_t kHyperLogLogSegmentRegisters = 1024;

constexpr uint32_t kHyperLogLogRegisterCountPerSegment = redis::kBitmapSegmentBits / 8;

constexpr uint32_t kHyperLogLogSegmentCount = kHyperLogLogRegisterCount / kHyperLogLogRegisterCountPerSegment;
constexpr uint32_t kHyperLogLogBits = 6;
constexpr uint32_t kHyperLogLogRegisterBits = 6;
constexpr uint32_t kHyperLogLogRegisterCountMask = kHyperLogLogRegisterCount - 1; /* Mask to index register. */
constexpr uint32_t kHyperLogLogRegisterMax = ((1 << kHyperLogLogBits) - 1);
constexpr double kHyperLogLogAlphaInf = 0.721347520444481703680; /* constant for 0.5/ln(2) */
constexpr uint32_t kHyperLogLogRegisterBytesPerSegment = kHyperLogLogRegisterCountPerSegment * kHyperLogLogBits / 8;
constexpr uint32_t kHyperLogLogRegisterBytes = (kHyperLogLogRegisterCount * kHyperLogLogBits + 7) / 8;
constexpr uint32_t kHyperLogLogRegisterMax = ((1 << kHyperLogLogRegisterBits) - 1);
/* constant for 0.5/ln(2) */
constexpr double kHyperLogLogAlpha = 0.721347520444481703680;
constexpr uint32_t kHyperLogLogRegisterBytesPerSegment =
(kHyperLogLogRegisterCountPerSegment * kHyperLogLogRegisterBits) / 8;
constexpr uint32_t kHyperLogLogRegisterBytes = (kHyperLogLogRegisterCount * kHyperLogLogRegisterBits + 7) / 8;
// Copied from redis
// https://github.com/valkey-io/valkey/blob/14e09e981e0039edbf8c41a208a258c18624cbb7/src/hyperloglog.c#L472
constexpr uint32_t kHyperLogLogHashSeed = 0xadc83b19;

struct DenseHllResult {
uint32_t register_index;
uint8_t hll_trailing_zero;
};

DenseHllResult ExtractDenseHllResult(uint64_t hash);

uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t index);
void HllDenseSetRegister(uint8_t *registers, uint32_t index, uint8_t val);
uint8_t HllPatLen(const std::vector<uint8_t> &element, uint32_t *register_index);
uint64_t HllCount(const std::vector<uint8_t> &registers);
void HllMerge(std::vector<uint8_t> *registers_max, const std::vector<uint8_t> &registers);
Loading

0 comments on commit 7cf88ef

Please sign in to comment.