diff --git a/mooncake-store/include/transfer_task.h b/mooncake-store/include/transfer_task.h index f6e253dcb..a97b47bc8 100644 --- a/mooncake-store/include/transfer_task.h +++ b/mooncake-store/include/transfer_task.h @@ -14,7 +14,6 @@ #include #include "transfer_engine.h" -#include "transport/transport.h" #include "types.h" #include "replica.h" #include "storage_backend.h" @@ -382,14 +381,14 @@ class TransferSubmitter { * @return TransferFuture representing the async operation, or nullopt on * failure */ - std::optional submit( - const Replica::Descriptor& replica, std::vector& slices, - Transport::TransferRequest::OpCode op_code); + std::optional submit(const Replica::Descriptor& replica, + std::vector& slices, + TransferRequest::OpCode op_code); std::optional submit_batch( const std::vector& replicas, std::vector>& all_slices, - Transport::TransferRequest::OpCode op_code); + TransferRequest::OpCode op_code); private: TransferEngine& engine_; @@ -423,27 +422,27 @@ class TransferSubmitter { */ std::optional submitMemcpyOperation( const std::vector& handles, - std::vector& slices, Transport::TransferRequest::OpCode op_code); + std::vector& slices, TransferRequest::OpCode op_code); /** * @brief Submit transfer engine operation asynchronously */ std::optional submitTransferEngineOperation( const std::vector& handles, - std::vector& slices, Transport::TransferRequest::OpCode op_code); + std::vector& slices, TransferRequest::OpCode op_code); std::optional submitFileReadOperation( const Replica::Descriptor& replica, std::vector& slices, - Transport::TransferRequest::OpCode op_code); + TransferRequest::OpCode op_code); /** * @brief Calculate total bytes for transfer operation and update metrics */ void updateTransferMetrics(const std::vector& slices, - Transport::TransferRequest::OpCode op); + TransferRequest::OpCode op); std::optional submitTransfer( - std::vector& requests); + std::vector& requests); }; } // namespace mooncake \ No newline at end of file diff --git a/mooncake-store/src/client.cpp b/mooncake-store/src/client.cpp index 349291213..52bf75215 100644 --- a/mooncake-store/src/client.cpp +++ b/mooncake-store/src/client.cpp @@ -13,7 +13,6 @@ #include "transfer_engine.h" #include "transfer_task.h" -#include "transport/transport.h" #include "config.h" #include "types.h" diff --git a/mooncake-store/src/transfer_task.cpp b/mooncake-store/src/transfer_task.cpp index b09fc15cc..158487d5b 100644 --- a/mooncake-store/src/transfer_task.cpp +++ b/mooncake-store/src/transfer_task.cpp @@ -4,6 +4,7 @@ #include #include +#include "transfer_engine.h" namespace mooncake { @@ -387,7 +388,7 @@ TransferSubmitter::TransferSubmitter(TransferEngine& engine, std::optional TransferSubmitter::submit( const Replica::Descriptor& replica, std::vector& slices, - Transport::TransferRequest::OpCode op_code) { + TransferRequest::OpCode op_code) { std::optional future; if (replica.is_memory_replica()) { @@ -428,9 +429,9 @@ std::optional TransferSubmitter::submit( std::optional TransferSubmitter::submit_batch( const std::vector& replicas, std::vector>& all_slices, - Transport::TransferRequest::OpCode op_code) { + TransferRequest::OpCode op_code) { std::optional future; - std::vector requests; + std::vector requests; for (size_t i = 0; i < replicas.size(); ++i) { auto& replica = replicas[i]; auto& slices = all_slices[i]; @@ -441,15 +442,14 @@ std::optional TransferSubmitter::submit_batch( } auto handle = mem_desc.buffer_descriptors[0]; uint64_t offset = 0; - Transport::SegmentHandle seg = - engine_.openSegment(handle.transport_endpoint_); + SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_); if (seg == static_cast(ERR_INVALID_ARGUMENT)) { LOG(ERROR) << "Failed to open segment " << handle.transport_endpoint_; return std::nullopt; } for (auto slice : slices) { - Transport::TransferRequest request; + TransferRequest request; request.opcode = op_code; request.source = static_cast(slice.ptr); request.target_id = seg; @@ -471,7 +471,7 @@ std::optional TransferSubmitter::submit_batch( std::optional TransferSubmitter::submitMemcpyOperation( const std::vector& handles, - std::vector& slices, Transport::TransferRequest::OpCode op_code) { + std::vector& slices, TransferRequest::OpCode op_code) { auto state = std::make_shared(); // Create memcpy operations @@ -487,7 +487,7 @@ std::optional TransferSubmitter::submitMemcpyOperation( void* dest; const void* src; - if (op_code == Transport::TransferRequest::READ) { + if (op_code == TransferRequest::READ) { // READ: from handle (remote buffer) to slice (local // buffer) dest = slice.ptr; @@ -513,11 +513,11 @@ std::optional TransferSubmitter::submitMemcpyOperation( } std::optional TransferSubmitter::submitTransfer( - std::vector& requests) { + std::vector& requests) { // Allocate batch ID const size_t batch_size = requests.size(); BatchID batch_id = engine_.allocateBatchID(batch_size); - if (batch_id == Transport::INVALID_BATCH_ID) { + if (batch_id == INVALID_BATCH_ID) { LOG(ERROR) << "Failed to allocate batch ID"; return std::nullopt; } @@ -534,7 +534,7 @@ std::optional TransferSubmitter::submitTransfer( return std::nullopt; } - if (batch_id == Transport::INVALID_BATCH_ID) { + if (batch_id == INVALID_BATCH_ID) { // INVALID_BATCH_ID LOG(ERROR) << "Invalid batch ID for transfer engine operation"; return std::nullopt; } @@ -549,9 +549,9 @@ std::optional TransferSubmitter::submitTransfer( std::optional TransferSubmitter::submitTransferEngineOperation( const std::vector& handles, - std::vector& slices, Transport::TransferRequest::OpCode op_code) { + std::vector& slices, TransferRequest::OpCode op_code) { // Create transfer requests - std::vector requests; + std::vector requests; requests.reserve(handles.size()); for (size_t i = 0; i < handles.size(); ++i) { @@ -566,8 +566,7 @@ std::optional TransferSubmitter::submitTransferEngineOperation( return std::nullopt; } - Transport::SegmentHandle seg = - engine_.openSegment(handle.transport_endpoint_); + SegmentHandle seg = engine_.openSegment(handle.transport_endpoint_); if (seg == static_cast(ERR_INVALID_ARGUMENT)) { LOG(ERROR) << "Failed to open segment for endpoint='" @@ -575,7 +574,7 @@ std::optional TransferSubmitter::submitTransferEngineOperation( return std::nullopt; } - Transport::TransferRequest request; + TransferRequest request; request.opcode = op_code; request.source = static_cast(slice.ptr); request.target_id = seg; @@ -589,7 +588,7 @@ std::optional TransferSubmitter::submitTransferEngineOperation( std::optional TransferSubmitter::submitFileReadOperation( const Replica::Descriptor& replica, std::vector& slices, - Transport::TransferRequest::OpCode op_code) { + TransferRequest::OpCode op_code) { auto state = std::make_shared(); auto disk_replica = replica.get_disk_descriptor(); std::string file_path = disk_replica.file_path; @@ -675,9 +674,8 @@ bool TransferSubmitter::validateTransferParams( return true; } -void TransferSubmitter::updateTransferMetrics( - const std::vector& slices, - Transport::TransferRequest::OpCode op_code) { +void TransferSubmitter::updateTransferMetrics(const std::vector& slices, + TransferRequest::OpCode op_code) { size_t total_bytes = 0; for (const auto& slice : slices) { total_bytes += slice.size; @@ -687,10 +685,10 @@ void TransferSubmitter::updateTransferMetrics( return; } - if (op_code == Transport::TransferRequest::READ) { + if (op_code == TransferRequest::READ) { transfer_metric_->total_read_bytes.inc(total_bytes); - } else if (op_code == Transport::TransferRequest::WRITE) { + } else if (op_code == TransferRequest::WRITE) { transfer_metric_->total_write_bytes.inc(total_bytes); } } diff --git a/mooncake-transfer-engine/include/multi_transport.h b/mooncake-transfer-engine/include/multi_transport.h index b5214b58c..b4135d8d1 100644 --- a/mooncake-transfer-engine/include/multi_transport.h +++ b/mooncake-transfer-engine/include/multi_transport.h @@ -27,8 +27,6 @@ class MultiTransport { using TransferStatus = Transport::TransferStatus; using BatchDesc = Transport::BatchDesc; - const static BatchID INVALID_BATCH_ID = Transport::INVALID_BATCH_ID; - MultiTransport(std::shared_ptr metadata, std::string &local_server_name); diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 0807ef690..117416843 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -45,6 +44,7 @@ using TransferStatusEnum = Transport::TransferStatusEnum; using SegmentHandle = Transport::SegmentHandle; using SegmentID = Transport::SegmentID; using BatchID = Transport::BatchID; +const static BatchID INVALID_BATCH_ID = UINT64_MAX; using BufferEntry = Transport::BufferEntry; class TransferEngine { diff --git a/mooncake-transfer-engine/include/transport/transport.h b/mooncake-transfer-engine/include/transport/transport.h index 1aaac3cf7..dcccc7789 100644 --- a/mooncake-transfer-engine/include/transport/transport.h +++ b/mooncake-transfer-engine/include/transport/transport.h @@ -44,7 +44,6 @@ class Transport { using SegmentHandle = SegmentID; using BatchID = uint64_t; - const static BatchID INVALID_BATCH_ID = UINT64_MAX; using BufferDesc = TransferMetadata::BufferDesc; using SegmentDesc = TransferMetadata::SegmentDesc;