Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 123 additions & 4 deletions mooncake-store/include/offset_allocator/offset_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <memory>
#include <optional>
#include <glog/logging.h>

#include "mutex.h"

Expand Down Expand Up @@ -94,6 +95,8 @@ class OffsetAllocationHandle {
// The real base and requested size of the allocated memory.
uint64_t real_base;
uint64_t requested_size;

friend class OffsetAllocatorTest; // for unit tests
};

struct OffsetAllocatorMetrics {
Expand Down Expand Up @@ -155,6 +158,13 @@ class OffsetAllocator : public std::enable_shared_from_this<OffsetAllocator> {
[[nodiscard]]
OffsetAllocatorMetrics get_metrics() const;

// Serialize the allocator with serializer.
template <typename T>
void serialize_to(T& serializer) const;

template <typename T>
static std::shared_ptr<OffsetAllocator> deserialize_from(T& serializer);

private:
friend class OffsetAllocationHandle;

Expand All @@ -166,11 +176,11 @@ class OffsetAllocator : public std::enable_shared_from_this<OffsetAllocator> {
OffsetAllocatorMetrics get_metrics_internal() const;

std::unique_ptr<__Allocator> m_allocator GUARDED_BY(m_mutex);
const uint64_t m_base;
uint64_t m_base;
// The real offset and size of the allocated memory need to be multiplied by
// m_multiplier
const uint64_t m_multiplier_bits;
const uint64_t m_capacity;
uint64_t m_multiplier_bits;
uint64_t m_capacity;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the allocator always allocates a max_capacity array, I believe m_current_capacity serves no purpose?

mutable Mutex m_mutex;

// Lightweight metrics maintained during allocation/deallocation
Expand All @@ -179,12 +189,20 @@ class OffsetAllocator : public std::enable_shared_from_this<OffsetAllocator> {

// Private constructor - use create() factory method instead
OffsetAllocator(uint64_t base, size_t size, uint32 init_capacity,
uint32 max_capacity);
uint32 max_capacity);

// Private constructor - initialize from serialized data
template <typename T>
OffsetAllocator(T& serializer);

friend class OffsetAllocatorTest; // for unit tests
};

class __Allocator {
public:
__Allocator(uint32 size, uint32 init_capacity, uint32 max_capacity);
template <typename T>
__Allocator(T& serializer) noexcept(false);
__Allocator(__Allocator&& other);
~__Allocator();
void reset();
Expand All @@ -196,6 +214,10 @@ class __Allocator {
OffsetAllocStorageReport storageReport() const;
OffsetAllocStorageReportFull storageReportFull() const;

// Serialize the allocator with serializer.
template <typename T>
void serialize_to(T& serializer) const;

private:
uint32 insertNodeIntoBin(uint32 size, uint32 dataOffset);
void removeNodeFromBin(uint32 nodeIndex);
Expand Down Expand Up @@ -224,6 +246,103 @@ class __Allocator {
Node* m_nodes;
NodeIndex* m_freeNodes;
uint32 m_freeOffset;

friend class OffsetAllocatorTest; // for unit tests
};

// Template method implementations
template <typename T>
void OffsetAllocator::serialize_to(T& serializer) const {
MutexLocker guard(&m_mutex);

if (!m_allocator) {
serializer.set_error("Allocator is not initialized");
return;
}

// Basic member variables
serializer.write(&m_base, sizeof(m_base));
serializer.write(&m_multiplier_bits, sizeof(m_multiplier_bits));
serializer.write(&m_capacity, sizeof(m_capacity));
serializer.write(&m_allocated_size, sizeof(m_allocated_size));
serializer.write(&m_allocated_num, sizeof(m_allocated_num));
// Serialize the allocator
m_allocator->serialize_to(serializer);
}

template <typename T>
std::shared_ptr<OffsetAllocator> OffsetAllocator::deserialize_from(
T& serializer) {
return std::shared_ptr<OffsetAllocator>(new OffsetAllocator(serializer));
}

template <typename T>
OffsetAllocator::OffsetAllocator(T& serializer) {
// serializer.read() will throw an exception if the buffer is corrupted.
try {
serializer.read(&m_base, sizeof(m_base));
serializer.read(&m_multiplier_bits, sizeof(m_multiplier_bits));
serializer.read(&m_capacity, sizeof(m_capacity));
serializer.read(&m_allocated_size, sizeof(m_allocated_size));
serializer.read(&m_allocated_num, sizeof(m_allocated_num));
m_allocator = std::make_unique<__Allocator>(serializer);
} catch (const std::exception& e) {
LOG(ERROR) << "Deserializing OffsetAllocator failed, error="
<< e.what();
throw std::runtime_error("Deserializing OffsetAllocator failed");
}
}

template <typename T>
void __Allocator::serialize_to(T& serializer) const {
if (!m_nodes || !m_freeNodes) {
serializer.set_error("Allocator is not initialized");
return;
}

serializer.write(&m_size, sizeof(m_size));
serializer.write(&m_current_capacity, sizeof(m_current_capacity));
serializer.write(&m_max_capacity, sizeof(m_max_capacity));
serializer.write(&m_freeStorage, sizeof(m_freeStorage));
serializer.write(&m_usedBinsTop, sizeof(m_usedBinsTop));
serializer.write(&m_usedBins, sizeof(m_usedBins));
serializer.write(&m_binIndices, sizeof(m_binIndices));
serializer.write(&m_freeOffset, sizeof(m_freeOffset));
serializer.write(m_nodes, m_current_capacity * sizeof(Node));
serializer.write(m_freeNodes, m_current_capacity * sizeof(NodeIndex));
}

template <typename T>
__Allocator::__Allocator(T& serializer) {
m_nodes = nullptr;
m_freeNodes = nullptr;

// serializer.read() will throw an exception if the buffer is corrupted.
try {
// Deserialize basic member variables
serializer.read(&m_size, sizeof(m_size));
serializer.read(&m_current_capacity, sizeof(m_current_capacity));
serializer.read(&m_max_capacity, sizeof(m_max_capacity));
serializer.read(&m_freeStorage, sizeof(m_freeStorage));
serializer.read(&m_usedBinsTop, sizeof(m_usedBinsTop));
serializer.read(&m_usedBins, sizeof(m_usedBins));
serializer.read(&m_binIndices, sizeof(m_binIndices));
serializer.read(&m_freeOffset, sizeof(m_freeOffset));

// Allocate memory for nodes and freeNodes
m_nodes = new Node[m_max_capacity];
m_freeNodes = new NodeIndex[m_max_capacity];

// Deserialize the arrays
serializer.read(m_nodes, m_current_capacity * sizeof(Node));
serializer.read(m_freeNodes, m_current_capacity * sizeof(NodeIndex));
} catch (const std::exception& e) {
// Free memory if deserialization fails
LOG(ERROR) << "Deserializing __Allocator failed, error=" << e.what();
if (m_nodes) delete[] m_nodes;
if (m_freeNodes) delete[] m_freeNodes;
throw std::runtime_error("Deserializing __Allocator failed");
}
}

} // namespace mooncake::offset_allocator
Loading
Loading