Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions mooncake-store/include/transfer_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <vector>

#include "transfer_engine.h"
#include "transport/transport.h"
#include "types.h"
#include "replica.h"
#include "storage_backend.h"
Expand Down Expand Up @@ -382,14 +381,14 @@ class TransferSubmitter {
* @return TransferFuture representing the async operation, or nullopt on
* failure
*/
std::optional<TransferFuture> submit(
const Replica::Descriptor& replica, std::vector<Slice>& slices,
Transport::TransferRequest::OpCode op_code);
std::optional<TransferFuture> submit(const Replica::Descriptor& replica,
std::vector<Slice>& slices,
TransferRequest::OpCode op_code);

std::optional<TransferFuture> submit_batch(
const std::vector<Replica::Descriptor>& replicas,
std::vector<std::vector<Slice>>& all_slices,
Transport::TransferRequest::OpCode op_code);
TransferRequest::OpCode op_code);

private:
TransferEngine& engine_;
Expand Down Expand Up @@ -423,27 +422,27 @@ class TransferSubmitter {
*/
std::optional<TransferFuture> submitMemcpyOperation(
const std::vector<AllocatedBuffer::Descriptor>& handles,
std::vector<Slice>& slices, Transport::TransferRequest::OpCode op_code);
std::vector<Slice>& slices, TransferRequest::OpCode op_code);

/**
* @brief Submit transfer engine operation asynchronously
*/
std::optional<TransferFuture> submitTransferEngineOperation(
const std::vector<AllocatedBuffer::Descriptor>& handles,
std::vector<Slice>& slices, Transport::TransferRequest::OpCode op_code);
std::vector<Slice>& slices, TransferRequest::OpCode op_code);

std::optional<TransferFuture> submitFileReadOperation(
const Replica::Descriptor& replica, std::vector<Slice>& slices,
Transport::TransferRequest::OpCode op_code);
TransferRequest::OpCode op_code);

/**
* @brief Calculate total bytes for transfer operation and update metrics
*/
void updateTransferMetrics(const std::vector<Slice>& slices,
Transport::TransferRequest::OpCode op);
TransferRequest::OpCode op);

std::optional<TransferFuture> submitTransfer(
std::vector<Transport::TransferRequest>& requests);
std::vector<TransferRequest>& requests);
};

} // namespace mooncake
1 change: 0 additions & 1 deletion mooncake-store/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

#include "transfer_engine.h"
#include "transfer_task.h"
#include "transport/transport.h"
#include "config.h"
#include "types.h"

Expand Down
42 changes: 20 additions & 22 deletions mooncake-store/src/transfer_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <algorithm>
#include <cstdlib>
#include "transfer_engine.h"

namespace mooncake {

Expand Down Expand Up @@ -387,7 +388,7 @@ TransferSubmitter::TransferSubmitter(TransferEngine& engine,

std::optional<TransferFuture> TransferSubmitter::submit(
const Replica::Descriptor& replica, std::vector<Slice>& slices,
Transport::TransferRequest::OpCode op_code) {
TransferRequest::OpCode op_code) {
std::optional<TransferFuture> future;

if (replica.is_memory_replica()) {
Expand Down Expand Up @@ -428,9 +429,9 @@ std::optional<TransferFuture> TransferSubmitter::submit(
std::optional<TransferFuture> TransferSubmitter::submit_batch(
const std::vector<Replica::Descriptor>& replicas,
std::vector<std::vector<Slice>>& all_slices,
Transport::TransferRequest::OpCode op_code) {
TransferRequest::OpCode op_code) {
std::optional<TransferFuture> future;
std::vector<Transport::TransferRequest> requests;
std::vector<TransferRequest> requests;
for (size_t i = 0; i < replicas.size(); ++i) {
auto& replica = replicas[i];
auto& slices = all_slices[i];
Expand All @@ -441,15 +442,14 @@ std::optional<TransferFuture> TransferSubmitter::submit_batch(
}
auto handle = mem_desc.buffer_descriptors[0];
uint64_t offset = 0;
Transport::SegmentHandle seg =
engine_.openSegment(handle.transport_endpoint_);
SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_);
if (seg == static_cast<uint64_t>(ERR_INVALID_ARGUMENT)) {
LOG(ERROR) << "Failed to open segment "
<< handle.transport_endpoint_;
return std::nullopt;
}
for (auto slice : slices) {
Transport::TransferRequest request;
TransferRequest request;
request.opcode = op_code;
request.source = static_cast<char*>(slice.ptr);
request.target_id = seg;
Expand All @@ -471,7 +471,7 @@ std::optional<TransferFuture> TransferSubmitter::submit_batch(

std::optional<TransferFuture> TransferSubmitter::submitMemcpyOperation(
const std::vector<AllocatedBuffer::Descriptor>& handles,
std::vector<Slice>& slices, Transport::TransferRequest::OpCode op_code) {
std::vector<Slice>& slices, TransferRequest::OpCode op_code) {
auto state = std::make_shared<MemcpyOperationState>();

// Create memcpy operations
Expand All @@ -487,7 +487,7 @@ std::optional<TransferFuture> TransferSubmitter::submitMemcpyOperation(
void* dest;
const void* src;

if (op_code == Transport::TransferRequest::READ) {
if (op_code == TransferRequest::READ) {
// READ: from handle (remote buffer) to slice (local
// buffer)
dest = slice.ptr;
Expand All @@ -513,11 +513,11 @@ std::optional<TransferFuture> TransferSubmitter::submitMemcpyOperation(
}

std::optional<TransferFuture> TransferSubmitter::submitTransfer(
std::vector<Transport::TransferRequest>& requests) {
std::vector<TransferRequest>& requests) {
// Allocate batch ID
const size_t batch_size = requests.size();
BatchID batch_id = engine_.allocateBatchID(batch_size);
if (batch_id == Transport::INVALID_BATCH_ID) {
if (batch_id == INVALID_BATCH_ID) {
LOG(ERROR) << "Failed to allocate batch ID";
return std::nullopt;
}
Expand All @@ -534,7 +534,7 @@ std::optional<TransferFuture> TransferSubmitter::submitTransfer(
return std::nullopt;
}

if (batch_id == Transport::INVALID_BATCH_ID) {
if (batch_id == INVALID_BATCH_ID) { // INVALID_BATCH_ID
LOG(ERROR) << "Invalid batch ID for transfer engine operation";
return std::nullopt;
}
Expand All @@ -549,9 +549,9 @@ std::optional<TransferFuture> TransferSubmitter::submitTransfer(

std::optional<TransferFuture> TransferSubmitter::submitTransferEngineOperation(
const std::vector<AllocatedBuffer::Descriptor>& handles,
std::vector<Slice>& slices, Transport::TransferRequest::OpCode op_code) {
std::vector<Slice>& slices, TransferRequest::OpCode op_code) {
// Create transfer requests
std::vector<Transport::TransferRequest> requests;
std::vector<TransferRequest> requests;
requests.reserve(handles.size());

for (size_t i = 0; i < handles.size(); ++i) {
Expand All @@ -566,16 +566,15 @@ std::optional<TransferFuture> TransferSubmitter::submitTransferEngineOperation(
return std::nullopt;
}

Transport::SegmentHandle seg =
engine_.openSegment(handle.transport_endpoint_);
SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_);

if (seg == static_cast<uint64_t>(ERR_INVALID_ARGUMENT)) {
LOG(ERROR) << "Failed to open segment for endpoint='"
<< handle.transport_endpoint_ << "'";
return std::nullopt;
}

Transport::TransferRequest request;
TransferRequest request;
request.opcode = op_code;
request.source = static_cast<char*>(slice.ptr);
request.target_id = seg;
Expand All @@ -589,7 +588,7 @@ std::optional<TransferFuture> TransferSubmitter::submitTransferEngineOperation(

std::optional<TransferFuture> TransferSubmitter::submitFileReadOperation(
const Replica::Descriptor& replica, std::vector<Slice>& slices,
Transport::TransferRequest::OpCode op_code) {
TransferRequest::OpCode op_code) {
auto state = std::make_shared<FilereadOperationState>();
auto disk_replica = replica.get_disk_descriptor();
std::string file_path = disk_replica.file_path;
Expand Down Expand Up @@ -675,9 +674,8 @@ bool TransferSubmitter::validateTransferParams(
return true;
}

void TransferSubmitter::updateTransferMetrics(
const std::vector<Slice>& slices,
Transport::TransferRequest::OpCode op_code) {
void TransferSubmitter::updateTransferMetrics(const std::vector<Slice>& slices,
TransferRequest::OpCode op_code) {
size_t total_bytes = 0;
for (const auto& slice : slices) {
total_bytes += slice.size;
Expand All @@ -687,10 +685,10 @@ void TransferSubmitter::updateTransferMetrics(
return;
}

if (op_code == Transport::TransferRequest::READ) {
if (op_code == TransferRequest::READ) {
transfer_metric_->total_read_bytes.inc(total_bytes);

} else if (op_code == Transport::TransferRequest::WRITE) {
} else if (op_code == TransferRequest::WRITE) {
transfer_metric_->total_write_bytes.inc(total_bytes);
}
}
Expand Down
2 changes: 0 additions & 2 deletions mooncake-transfer-engine/include/multi_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class MultiTransport {
using TransferStatus = Transport::TransferStatus;
using BatchDesc = Transport::BatchDesc;

const static BatchID INVALID_BATCH_ID = Transport::INVALID_BATCH_ID;

MultiTransport(std::shared_ptr<TransferMetadata> metadata,
std::string &local_server_name);

Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/include/transfer_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <thread>
Expand All @@ -45,6 +44,7 @@ using TransferStatusEnum = Transport::TransferStatusEnum;
using SegmentHandle = Transport::SegmentHandle;
using SegmentID = Transport::SegmentID;
using BatchID = Transport::BatchID;
const static BatchID INVALID_BATCH_ID = UINT64_MAX;
using BufferEntry = Transport::BufferEntry;

class TransferEngine {
Expand Down
1 change: 0 additions & 1 deletion mooncake-transfer-engine/include/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class Transport {
using SegmentHandle = SegmentID;

using BatchID = uint64_t;
const static BatchID INVALID_BATCH_ID = UINT64_MAX;

using BufferDesc = TransferMetadata::BufferDesc;
using SegmentDesc = TransferMetadata::SegmentDesc;
Expand Down
Loading