Skip to content

Commit

Permalink
chore: implement QList::Erase functionality (#4092)
Browse files Browse the repository at this point in the history
* chore: implement QList::Erase functionality

    Also add a parametrized test covering fill/compress options.
    Fix a compression bug introduced when copying the code.
    Introduce move c'tor/operator.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: implement QList::Replace functionality

Also add a parametrized test covering fill/compress options.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Nov 9, 2024
1 parent d3ef0a3 commit 1819e51
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 58 deletions.
216 changes: 187 additions & 29 deletions src/core/qlist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ using namespace std;
* size, the maximum limit is bigger, but still safe.
* 8k is a recommended / default size limit */
#define SIZE_SAFETY_LIMIT 8192
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)

/* Maximum estimate of the listpack entry overhead.
* Although in the worst case(sz < 64), we will waste 6 bytes in one
Expand Down Expand Up @@ -54,7 +53,7 @@ using namespace std;
if ((_node)->recompress) \
CompressNode((_node)); \
else \
Compress(_node); \
this->Compress(_node); \
} while (0)

namespace dfly {
Expand Down Expand Up @@ -90,7 +89,7 @@ bool IsLargeElement(size_t sz, int fill) {
if (ABSL_PREDICT_FALSE(packed_threshold != 0))
return sz >= packed_threshold;
if (fill >= 0)
return !sizeMeetsSafetyLimit(sz);
return sz > SIZE_SAFETY_LIMIT;
else
return sz > NodeNegFillLimit(fill);
}
Expand Down Expand Up @@ -178,15 +177,15 @@ void NodeUpdateSz(quicklistNode* node) {
* Returns true if listpack compressed successfully.
* Returns false if compression failed or if listpack too small to compress. */
bool CompressNode(quicklistNode* node) {
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
DCHECK(!node->dont_compress);
#ifdef SERVER_TEST
node->attempted_compress = 1;
#endif
if (node->dont_compress)
return false;

/* validate that the node is neither
* tail nor head (it has prev and next)*/
assert(node->prev && node->next);
DCHECK(node->prev && node->next);

node->recompress = 0;
/* Don't bother compressing small values */
Expand All @@ -213,6 +212,14 @@ bool CompressNode(quicklistNode* node) {
return true;
}

bool CompressNodeIfNeeded(quicklistNode* node) {
DCHECK(node);
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
return CompressNode(node);
}
return false;
}

/* Uncompress the listpack in 'node' and update encoding details.
* Returns 1 on successful decode, 0 on failure to decode. */
bool DecompressNode(bool recompress, quicklistNode* node) {
Expand Down Expand Up @@ -246,15 +253,6 @@ void RecompressOnly(quicklistNode* node) {
}
}

bool ElemCompare(const QList::Entry& entry, string_view elem) {
if (entry.value) {
return entry.view() == elem;
}

absl::AlphaNum an(entry.longval);
return elem == an.Piece();
}

quicklistNode* SplitNode(quicklistNode* node, int offset, bool after) {
size_t zl_sz = node->sz;

Expand Down Expand Up @@ -293,23 +291,53 @@ QList::QList() : fill_(-2), compress_(0), bookmark_count_(0) {
QList::QList(int fill, int compress) : fill_(fill), compress_(compress), bookmark_count_(0) {
}

QList::QList(QList&& other)
: head_(other.head_),
tail_(other.tail_),
count_(other.count_),
len_(other.len_),
fill_(other.fill_),
compress_(other.compress_),
bookmark_count_(other.bookmark_count_) {
other.head_ = other.tail_ = nullptr;
other.len_ = other.count_ = 0;
}

QList::~QList() {
unsigned long len;
quicklistNode *current, *next;
Clear();
}

QList& QList::operator=(QList&& other) {
if (this != &other) {
Clear();
head_ = other.head_;
tail_ = other.tail_;
len_ = other.len_;
count_ = other.count_;
fill_ = other.fill_;
compress_ = other.compress_;
bookmark_count_ = other.bookmark_count_;

other.head_ = other.tail_ = nullptr;
other.len_ = other.count_ = 0;
}
return *this;
}

current = head_;
len = len_;
while (len--) {
next = current->next;
void QList::Clear() {
quicklistNode* current = head_;

zfree(current->entry);
count_ -= current->count;
while (len_) {
quicklistNode* next = current->next;

zfree(current->entry);
zfree(current);

len_--;
current = next;
}
head_ = tail_ = nullptr;
count_ = 0;
}

void QList::Push(string_view value, Where where) {
Expand Down Expand Up @@ -353,7 +381,7 @@ bool QList::Insert(std::string_view pivot, std::string_view elem, InsertOpt opt)
Iterator it = GetIterator(HEAD);

while (it.Next()) {
if (ElemCompare(it.Get(), pivot)) {
if (it.Get() == pivot) {
Insert(it, elem, opt);
return true;
}
Expand Down Expand Up @@ -435,7 +463,6 @@ bool QList::PushTail(string_view value) {
} else {
quicklistNode* node = CreateNode();
node->entry = LP_FromElem(value);

NodeUpdateSz(node);
InsertNode(orig, node, AFTER);
}
Expand Down Expand Up @@ -599,7 +626,63 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
}

void QList::Replace(Iterator it, std::string_view elem) {
// TODO
quicklistNode* node = it.current_;
unsigned char* newentry;
size_t sz = elem.size();

if (ABSL_PREDICT_TRUE(!QL_NODE_IS_PLAIN(node) && !IsLargeElement(sz, fill_) &&
(newentry = lpReplace(node->entry, &it.zi_, uint_ptr(elem), sz)) != NULL)) {
node->entry = newentry;
NodeUpdateSz(node);
/* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */
quicklistCompress(node);
} else if (QL_NODE_IS_PLAIN(node)) {
if (IsLargeElement(sz, fill_)) {
zfree(node->entry);
node->entry = (uint8_t*)zmalloc(sz);
node->sz = sz;
memcpy(node->entry, elem.data(), sz);
quicklistCompress(node);
} else {
Insert(it, elem, AFTER);
DelNode(node);
}
} else { /* The node is full or data is a large element */
quicklistNode *split_node = NULL, *new_node;
node->dont_compress = 1; /* Prevent compression in InsertNode() */

/* If the entry is not at the tail, split the node at the entry's offset. */
if (it.offset_ != node->count - 1 && it.offset_ != -1)
split_node = SplitNode(node, it.offset_, 1);

/* Create a new node and insert it after the original node.
* If the original node was split, insert the split node after the new node. */
new_node = CreateNode(IsLargeElement(sz, fill_) ? QUICKLIST_NODE_CONTAINER_PLAIN
: QUICKLIST_NODE_CONTAINER_PACKED,
elem);
InsertNode(node, new_node, AFTER);
if (split_node)
InsertNode(new_node, split_node, AFTER);
count_++;

/* Delete the replaced element. */
if (node->count == 1) {
DelNode(node);
} else {
unsigned char* p = lpSeek(node->entry, -1);
DelPackedIndex(node, &p);
node->dont_compress = 0; /* Re-enable compression */
new_node = MergeNodes(new_node);
/* We can't know if the current node and its sibling nodes are correctly compressed,
* and we don't know if they are within the range of compress depth, so we need to
* use quicklistCompress() for compression, which checks if node is within compress
* depth before compressing. */
quicklistCompress(new_node);
quicklistCompress(new_node->prev);
if (new_node->next)
quicklistCompress(new_node->next);
}
}
}

/* Force 'quicklist' to meet compression guidelines set by compress depth.
Expand Down Expand Up @@ -642,11 +725,11 @@ void QList::Compress(quicklistNode* node) {
}

if (!in_depth)
CompressNode(node);
CompressNodeIfNeeded(node);

/* At this point, forward and reverse are one node beyond depth */
CompressNode(forward);
CompressNode(reverse);
CompressNodeIfNeeded(forward);
CompressNodeIfNeeded(reverse);
}

/* Attempt to merge listpacks within two nodes on either side of 'center'.
Expand Down Expand Up @@ -770,6 +853,32 @@ void QList::DelNode(quicklistNode* node) {
zfree(node);
}

/* Delete one entry from list given the node for the entry and a pointer
* to the entry in the node.
*
* Note: DelPackedIndex() *requires* uncompressed nodes because you
* already had to get *p from an uncompressed node somewhere.
*
* Returns 1 if the entire node was deleted, 0 if node still exists.
* Also updates in/out param 'p' with the next offset in the listpack. */
bool QList::DelPackedIndex(quicklistNode* node, uint8_t** p) {
DCHECK(!QL_NODE_IS_PLAIN(node));

bool gone = false;
node->entry = lpDelete(node->entry, *p, p);
node->count--;
if (node->count == 0) {
gone = true;
DelNode(node);
} else {
NodeUpdateSz(node);
}
count_--;

/* If we deleted the node, the original node is no longer valid */
return gone;
}

auto QList::GetIterator(Where where) const -> Iterator {
Iterator it;
it.owner_ = this;
Expand Down Expand Up @@ -839,6 +948,55 @@ auto QList::GetIterator(long idx) const -> Iterator {
return iter;
}

auto QList::Erase(Iterator it) -> Iterator {
DCHECK(it.current_);

quicklistNode* node = it.current_;
quicklistNode* prev = node->prev;
quicklistNode* next = node->next;

bool deleted_node = false;
if (QL_NODE_IS_PLAIN(node)) {
DelNode(node);
deleted_node = true;
} else {
deleted_node = DelPackedIndex(node, &it.zi_);
}

/* after delete, the zi is now invalid for any future usage. */
it.zi_ = NULL;

/* If current node is deleted, we must update iterator node and offset. */
if (deleted_node) {
if (it.direction_ == AL_START_HEAD) {
it.current_ = next;
it.offset_ = 0;
} else if (it.direction_ == AL_START_TAIL) {
it.current_ = prev;
it.offset_ = -1;
}
}

/* else if (!deleted_node), no changes needed.
* we already reset iter->zi above, and the existing iter->offset
* doesn't move again because:
* - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1
* - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0
* if we deleted the last element at offset N and now
* length of this listpack is N-1, the next call into
* quicklistNext() will jump to the next node. */
return it;
}

bool QList::Entry::operator==(std::string_view sv) const {
if (std::holds_alternative<int64_t>(value_)) {
char buf[absl::numbers_internal::kFastToBufferSize];
char* end = absl::numbers_internal::FastIntToBuffer(std::get<int64_t>(value_), buf);
return sv == std::string_view(buf, end - buf);
}
return view() == sv;
}

bool QList::Iterator::Next() {
if (!current_)
return false;
Expand Down
Loading

0 comments on commit 1819e51

Please sign in to comment.