From 6b80c6317629fa25b8b6417882f3eb7ded9c3c46 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 13 Mar 2024 16:26:43 +0000 Subject: [PATCH] run fix_all --- examples/doca/run.py | 12 +- morpheus/_lib/doca/include/common.hpp | 48 ++++--- morpheus/_lib/doca/include/doca_rx_pipe.hpp | 4 +- morpheus/_lib/doca/include/doca_semaphore.hpp | 2 +- morpheus/_lib/doca/include/doca_source.hpp | 10 +- .../_lib/doca/include/doca_source_kernels.hpp | 14 +- morpheus/_lib/doca/include/error.hpp | 4 +- morpheus/_lib/doca/module.cpp | 3 +- morpheus/_lib/doca/src/doca_context.cpp | 74 +++++----- morpheus/_lib/doca/src/doca_rx_pipe.cpp | 77 +++++----- morpheus/_lib/doca/src/doca_rx_queue.cpp | 9 +- morpheus/_lib/doca/src/doca_semaphore.cpp | 9 +- morpheus/_lib/doca/src/doca_source.cpp | 134 ++++++++++-------- morpheus/stages/doca/doca_source_stage.py | 9 +- 14 files changed, 226 insertions(+), 183 deletions(-) diff --git a/examples/doca/run.py b/examples/doca/run.py index 6b71b9d5f9..6a38c83fe1 100644 --- a/examples/doca/run.py +++ b/examples/doca/run.py @@ -71,8 +71,13 @@ help="UDP or TCP traffic", required=True, ) -def run_pipeline(pipeline_batch_size, model_max_batch_size, model_fea_length, out_file, nic_addr, - gpu_addr, traffic_type): +def run_pipeline(pipeline_batch_size, + model_max_batch_size, + model_fea_length, + out_file, + nic_addr, + gpu_addr, + traffic_type): # Enable the default logger configure_logging(log_level=logging.DEBUG) @@ -118,7 +123,8 @@ def run_pipeline(pipeline_batch_size, model_max_batch_size, model_fea_length, ou pipeline.add_stage( PreprocessNLPStage( config, - vocab_hash_file='/workspace/models/training-tuning-scripts/sid-models/resources/bert-base-uncased-hash.txt', + vocab_hash_file= + '/workspace/models/training-tuning-scripts/sid-models/resources/bert-base-uncased-hash.txt', do_lower_case=True, truncation=True, add_special_tokens=False, diff --git a/morpheus/_lib/doca/include/common.hpp b/morpheus/_lib/doca/include/common.hpp index 1a86babdb3..b2da26b088 100644 --- a/morpheus/_lib/doca/include/common.hpp +++ b/morpheus/_lib/doca/include/common.hpp @@ -19,14 +19,14 @@ #include #include -#include #include #include +#include uint32_t const PACKETS_PER_THREAD = 4; -uint32_t const THREADS_PER_BLOCK = 1024; //512 +uint32_t const THREADS_PER_BLOCK = 1024; // 512 uint32_t const PACKETS_PER_BLOCK = PACKETS_PER_THREAD * THREADS_PER_BLOCK; -uint32_t const PACKET_RX_TIMEOUT_NS = 1000000; //1ms //500us +uint32_t const PACKET_RX_TIMEOUT_NS = 1000000; // 1ms //500us uint32_t const MAX_PKT_RECEIVE = PACKETS_PER_BLOCK; uint32_t const MAX_PKT_SIZE = 4096; @@ -34,26 +34,28 @@ uint32_t const MAX_PKT_NUM = 65536; uint32_t const MAX_QUEUE = 4; uint32_t const MAX_SEM_X_QUEUE = 32; -enum doca_traffic_type { - DOCA_TRAFFIC_TYPE_UDP = 0, - DOCA_TRAFFIC_TYPE_TCP = 1, +enum doca_traffic_type +{ + DOCA_TRAFFIC_TYPE_UDP = 0, + DOCA_TRAFFIC_TYPE_TCP = 1, }; -struct packets_info { - int32_t packet_count_out; - int32_t payload_size_total_out; - - char *payload_buffer_out; - int32_t *payload_sizes_out; - - int64_t *src_mac_out; - int64_t *dst_mac_out; - int64_t *src_ip_out; - int64_t *dst_ip_out; - uint16_t *src_port_out; - uint16_t *dst_port_out; - int32_t *tcp_flags_out; - int32_t *ether_type_out; - int32_t *next_proto_id_out; - uint32_t *timestamp_out; +struct packets_info +{ + int32_t packet_count_out; + int32_t payload_size_total_out; + + char* payload_buffer_out; + int32_t* payload_sizes_out; + + int64_t* src_mac_out; + int64_t* dst_mac_out; + int64_t* src_ip_out; + int64_t* dst_ip_out; + uint16_t* src_port_out; + uint16_t* dst_port_out; + int32_t* tcp_flags_out; + int32_t* ether_type_out; + int32_t* next_proto_id_out; + uint32_t* timestamp_out; }; \ No newline at end of file diff --git a/morpheus/_lib/doca/include/doca_rx_pipe.hpp b/morpheus/_lib/doca/include/doca_rx_pipe.hpp index fca13e355f..39d44713f7 100644 --- a/morpheus/_lib/doca/include/doca_rx_pipe.hpp +++ b/morpheus/_lib/doca/include/doca_rx_pipe.hpp @@ -43,7 +43,9 @@ struct DocaRxPipe doca_flow_pipe* m_root_pipe; public: - DocaRxPipe(std::shared_ptr context, std::vector> rxq, doca_traffic_type const type); + DocaRxPipe(std::shared_ptr context, + std::vector> rxq, + doca_traffic_type const type); ~DocaRxPipe(); }; diff --git a/morpheus/_lib/doca/include/doca_semaphore.hpp b/morpheus/_lib/doca/include/doca_semaphore.hpp index 1c0c0cb172..9a1fbc3f6c 100644 --- a/morpheus/_lib/doca/include/doca_semaphore.hpp +++ b/morpheus/_lib/doca/include/doca_semaphore.hpp @@ -41,7 +41,7 @@ struct DocaSemaphore doca_gpu_semaphore_gpu* gpu_ptr(); uint16_t size(); - void * get_info_cpu(uint32_t idx); + void* get_info_cpu(uint32_t idx); bool is_ready(uint32_t idx); void set_free(uint32_t idx); }; diff --git a/morpheus/_lib/doca/include/doca_source.hpp b/morpheus/_lib/doca/include/doca_source.hpp index 714f821097..3b9d8ea9d1 100644 --- a/morpheus/_lib/doca/include/doca_source.hpp +++ b/morpheus/_lib/doca/include/doca_source.hpp @@ -17,9 +17,10 @@ #pragma once -#include "morpheus/messages/meta.hpp" #include "common.hpp" +#include "morpheus/messages/meta.hpp" + #include #include @@ -49,7 +50,9 @@ class DocaSourceStage : public mrc::pymrc::PythonSource integers_to_mac( rmm::cuda_stream_view stream = cudf::detail::default_stream_value, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); -void packet_receive_kernel( - doca_gpu_eth_rxq* rxq, - doca_gpu_semaphore_gpu* sem, - uint16_t sem_idx, - bool is_tcp, - uint32_t* exit_condition, - cudaStream_t stream -); +void packet_receive_kernel(doca_gpu_eth_rxq* rxq, + doca_gpu_semaphore_gpu* sem, + uint16_t sem_idx, + bool is_tcp, + uint32_t* exit_condition, + cudaStream_t stream); void packet_gather_kernel( int32_t packet_count, char* packet_buffer, int32_t* payload_sizes, char* payload_chars_out, cudaStream_t stream); diff --git a/morpheus/_lib/doca/include/error.hpp b/morpheus/_lib/doca/include/error.hpp index 8b6335bd39..90cfc97671 100644 --- a/morpheus/_lib/doca/include/error.hpp +++ b/morpheus/_lib/doca/include/error.hpp @@ -38,8 +38,8 @@ namespace detail { inline void throw_doca_error(doca_error_t error, const char* file, unsigned int line) { - throw morpheus::DocaError(MORPHEUS_CONCAT_STR( - "DOCA error encountered at: " << file << ":" << line << ": " << error << " " << doca_error_get_descr(error))); + throw morpheus::DocaError(MORPHEUS_CONCAT_STR("DOCA error encountered at: " << file << ":" << line << ": " << error + << " " << doca_error_get_descr(error))); } inline void throw_rte_error(int error, const char* file, unsigned int line) diff --git a/morpheus/_lib/doca/module.cpp b/morpheus/_lib/doca/module.cpp index 9189d59323..fdf06cdcff 100644 --- a/morpheus/_lib/doca/module.cpp +++ b/morpheus/_lib/doca/module.cpp @@ -42,8 +42,7 @@ PYBIND11_MODULE(doca, m) py::arg("name"), py::arg("nic_pci_address"), py::arg("gpu_pci_address"), - py::arg("traffic_type") - ); + py::arg("traffic_type")); } } // namespace morpheus diff --git a/morpheus/_lib/doca/src/doca_context.cpp b/morpheus/_lib/doca/src/doca_context.cpp index 6cb6eb75bd..7fd8b3b0a9 100644 --- a/morpheus/_lib/doca/src/doca_context.cpp +++ b/morpheus/_lib/doca/src/doca_context.cpp @@ -47,41 +47,45 @@ namespace { using jobs_check_t = doca_error_t (*)(doca_devinfo*); -static doca_error_t open_doca_device_with_pci(const char *pcie_value, struct doca_dev **retval) +static doca_error_t open_doca_device_with_pci(const char* pcie_value, struct doca_dev** retval) { - struct doca_devinfo **dev_list; - uint32_t nb_devs; - doca_error_t res; - size_t i; - uint8_t is_addr_equal = 0; - - /* Set default return value */ - *retval = NULL; - - res = doca_devinfo_create_list(&dev_list, &nb_devs); - if (res != DOCA_SUCCESS) { - MORPHEUS_FAIL("Failed to load doca devices list"); - return res; - } - - /* Search */ - for (i = 0; i < nb_devs; i++) { - res = doca_devinfo_is_equal_pci_addr(dev_list[i], pcie_value, &is_addr_equal); - if (res == DOCA_SUCCESS && is_addr_equal) { - /* if device can be opened */ - res = doca_dev_open(dev_list[i], retval); - if (res == DOCA_SUCCESS) { - doca_devinfo_destroy_list(dev_list); - return res; - } - } - } - - MORPHEUS_FAIL("Matching device not found"); - res = DOCA_ERROR_NOT_FOUND; - - doca_devinfo_destroy_list(dev_list); - return res; + struct doca_devinfo** dev_list; + uint32_t nb_devs; + doca_error_t res; + size_t i; + uint8_t is_addr_equal = 0; + + /* Set default return value */ + *retval = NULL; + + res = doca_devinfo_create_list(&dev_list, &nb_devs); + if (res != DOCA_SUCCESS) + { + MORPHEUS_FAIL("Failed to load doca devices list"); + return res; + } + + /* Search */ + for (i = 0; i < nb_devs; i++) + { + res = doca_devinfo_is_equal_pci_addr(dev_list[i], pcie_value, &is_addr_equal); + if (res == DOCA_SUCCESS && is_addr_equal) + { + /* if device can be opened */ + res = doca_dev_open(dev_list[i], retval); + if (res == DOCA_SUCCESS) + { + doca_devinfo_destroy_list(dev_list); + return res; + } + } + } + + MORPHEUS_FAIL("Matching device not found"); + res = DOCA_ERROR_NOT_FOUND; + + doca_devinfo_destroy_list(dev_list); + return res; } doca_flow_port* init_doca_flow(uint16_t port_id, uint8_t rxq_num) @@ -196,7 +200,7 @@ DocaContext::DocaContext(std::string nic_addr, std::string gpu_addr) : m_max_que DocaContext::~DocaContext() { doca_flow_port_stop(m_flow_port); - doca_flow_destroy(); + doca_flow_destroy(); if (m_gpu != nullptr) { diff --git a/morpheus/_lib/doca/src/doca_rx_pipe.cpp b/morpheus/_lib/doca/src/doca_rx_pipe.cpp index 6f79e0f129..8879da66ad 100644 --- a/morpheus/_lib/doca/src/doca_rx_pipe.cpp +++ b/morpheus/_lib/doca/src/doca_rx_pipe.cpp @@ -15,42 +15,47 @@ * limitations under the License. */ +#include "doca_rx_pipe.hpp" + #include #include -#include "doca_rx_pipe.hpp" - namespace morpheus::doca { /* Create more Queues/Different Flows */ -DocaRxPipe::DocaRxPipe(std::shared_ptr context, std::vector> rxq, enum doca_traffic_type const type) : +DocaRxPipe::DocaRxPipe(std::shared_ptr context, + std::vector> rxq, + enum doca_traffic_type const type) : m_context(context), m_rxq(rxq), m_traffic_type(type), m_pipe(nullptr) { auto rss_queues = std::array(); - for(int idx = 0; idx < m_rxq.size(); idx++) - doca_eth_rxq_get_flow_queue_id(m_rxq[idx]->rxq_info_cpu(), &(rss_queues[idx])); + for (int idx = 0; idx < m_rxq.size(); idx++) + doca_eth_rxq_get_flow_queue_id(m_rxq[idx]->rxq_info_cpu(), &(rss_queues[idx])); doca_flow_match match_mask{0}; doca_flow_match match{}; match.outer.l3_type = DOCA_FLOW_L3_TYPE_IP4; - if (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) { - match.outer.ip4.next_proto = IPPROTO_TCP; - match.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_TCP; - } else { - match.outer.ip4.next_proto = IPPROTO_UDP; - match.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_UDP; + if (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) + { + match.outer.ip4.next_proto = IPPROTO_TCP; + match.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_TCP; + } + else + { + match.outer.ip4.next_proto = IPPROTO_UDP; + match.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_UDP; } doca_flow_fwd fwd{}; fwd.type = DOCA_FLOW_FWD_RSS; if (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) - fwd.rss_outer_flags = DOCA_FLOW_RSS_IPV4 | DOCA_FLOW_RSS_TCP; + fwd.rss_outer_flags = DOCA_FLOW_RSS_IPV4 | DOCA_FLOW_RSS_TCP; else - fwd.rss_outer_flags = DOCA_FLOW_RSS_IPV4 | DOCA_FLOW_RSS_UDP; + fwd.rss_outer_flags = DOCA_FLOW_RSS_IPV4 | DOCA_FLOW_RSS_UDP; fwd.rss_queues = rss_queues.begin(); fwd.num_of_queues = m_rxq.size(); @@ -61,7 +66,7 @@ DocaRxPipe::DocaRxPipe(std::shared_ptr context, std::vector context, std::vectorflow_port(), 0, 0, 0)); uint32_t priority_high = 1; @@ -82,33 +88,36 @@ DocaRxPipe::DocaRxPipe(std::shared_ptr context, std::vectorflow_port(); + root_pipe_cfg.attr.name = "ROOT_PIPE"; + root_pipe_cfg.attr.enable_strict_matching = true; + root_pipe_cfg.attr.is_root = true; + root_pipe_cfg.attr.type = DOCA_FLOW_PIPE_CONTROL; + root_pipe_cfg.monitor = &root_monitor; + root_pipe_cfg.match_mask = &root_match_mask; + root_pipe_cfg.port = context->flow_port(); DOCA_TRY(doca_flow_pipe_create(&root_pipe_cfg, nullptr, nullptr, &m_root_pipe)); struct doca_flow_match root_match_gpu = {}; - struct doca_flow_fwd root_fwd_gpu = {}; + struct doca_flow_fwd root_fwd_gpu = {}; doca_flow_pipe_entry* root_tcp_entry_gpu; - if (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) { - root_match_gpu.outer.l3_type = DOCA_FLOW_L3_TYPE_IP4; - root_match_gpu.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_TCP; - root_fwd_gpu.type = DOCA_FLOW_FWD_PIPE; - root_fwd_gpu.next_pipe = m_pipe; - } else { - root_match_gpu.outer.l3_type = DOCA_FLOW_L3_TYPE_IP4; - root_match_gpu.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_UDP; - root_fwd_gpu.type = DOCA_FLOW_FWD_PIPE; - root_fwd_gpu.next_pipe = m_pipe; + if (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) + { + root_match_gpu.outer.l3_type = DOCA_FLOW_L3_TYPE_IP4; + root_match_gpu.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_TCP; + root_fwd_gpu.type = DOCA_FLOW_FWD_PIPE; + root_fwd_gpu.next_pipe = m_pipe; + } + else + { + root_match_gpu.outer.l3_type = DOCA_FLOW_L3_TYPE_IP4; + root_match_gpu.outer.l4_type_ext = DOCA_FLOW_L4_TYPE_EXT_UDP; + root_fwd_gpu.type = DOCA_FLOW_FWD_PIPE; + root_fwd_gpu.next_pipe = m_pipe; } DOCA_TRY(doca_flow_pipe_control_add_entry(0, diff --git a/morpheus/_lib/doca/src/doca_rx_queue.cpp b/morpheus/_lib/doca/src/doca_rx_queue.cpp index d7946ed34b..0e9b9c1dfd 100644 --- a/morpheus/_lib/doca/src/doca_rx_queue.cpp +++ b/morpheus/_lib/doca/src/doca_rx_queue.cpp @@ -36,7 +36,8 @@ DocaRxQueue::DocaRxQueue(std::shared_ptr context) : uint32_t cyclic_buffer_size; DOCA_TRY(doca_eth_rxq_create(context->dev(), MAX_PKT_NUM, MAX_PKT_SIZE, &(m_rxq_info_cpu))); DOCA_TRY(doca_eth_rxq_set_type(m_rxq_info_cpu, DOCA_ETH_RXQ_TYPE_CYCLIC)); - DOCA_TRY(doca_eth_rxq_estimate_packet_buf_size(DOCA_ETH_RXQ_TYPE_CYCLIC, 0, 0, MAX_PKT_SIZE, MAX_PKT_NUM, 0, &cyclic_buffer_size)); + DOCA_TRY(doca_eth_rxq_estimate_packet_buf_size( + DOCA_ETH_RXQ_TYPE_CYCLIC, 0, 0, MAX_PKT_SIZE, MAX_PKT_NUM, 0, &cyclic_buffer_size)); DOCA_TRY(doca_mmap_create(&m_packet_mmap)); DOCA_TRY(doca_mmap_add_dev(m_packet_mmap, context->dev())); @@ -61,7 +62,8 @@ DocaRxQueue::~DocaRxQueue() { doca_error_t doca_ret; - if (m_rxq_info_cpu != nullptr) { + if (m_rxq_info_cpu != nullptr) + { doca_ret = doca_ctx_stop(m_doca_ctx); if (doca_ret != DOCA_SUCCESS) LOG(WARNING) << "doca_eth_rxq_destroy failed (" << doca_ret << ")" << std::endl; @@ -71,7 +73,8 @@ DocaRxQueue::~DocaRxQueue() LOG(WARNING) << "doca_eth_rxq_destroy failed (" << doca_ret << ")" << std::endl; } - if (m_packet_mmap != nullptr) { + if (m_packet_mmap != nullptr) + { doca_ret = doca_mmap_destroy(m_packet_mmap); if (doca_ret != DOCA_SUCCESS) LOG(WARNING) << "doca_mmap_destroy failed (" << doca_ret << ")" << std::endl; diff --git a/morpheus/_lib/doca/src/doca_semaphore.cpp b/morpheus/_lib/doca/src/doca_semaphore.cpp index 4c3dbf0812..71298d2e5b 100644 --- a/morpheus/_lib/doca/src/doca_semaphore.cpp +++ b/morpheus/_lib/doca/src/doca_semaphore.cpp @@ -26,7 +26,8 @@ DocaSemaphore::DocaSemaphore(std::shared_ptr context, uint16_t size DOCA_TRY(doca_gpu_semaphore_create(m_context->gpu(), &m_semaphore)); DOCA_TRY(doca_gpu_semaphore_set_memory_type(m_semaphore, DOCA_GPU_MEM_TYPE_CPU_GPU)); DOCA_TRY(doca_gpu_semaphore_set_items_num(m_semaphore, size)); - DOCA_TRY(doca_gpu_semaphore_set_custom_info(m_semaphore, sizeof(struct packets_info), DOCA_GPU_MEM_TYPE_CPU_GPU)); // GPU_CPU + DOCA_TRY(doca_gpu_semaphore_set_custom_info( + m_semaphore, sizeof(struct packets_info), DOCA_GPU_MEM_TYPE_CPU_GPU)); // GPU_CPU DOCA_TRY(doca_gpu_semaphore_start(m_semaphore)); DOCA_TRY(doca_gpu_semaphore_get_gpu_handle(m_semaphore, &m_semaphore_gpu)); } @@ -46,10 +47,10 @@ uint16_t DocaSemaphore::size() return m_size; } -void * DocaSemaphore::get_info_cpu(uint32_t idx) +void* DocaSemaphore::get_info_cpu(uint32_t idx) { - void *addr; - DOCA_TRY(doca_gpu_semaphore_get_custom_info_addr(m_semaphore, idx, (void **)&(addr))); + void* addr; + DOCA_TRY(doca_gpu_semaphore_get_custom_info_addr(m_semaphore, idx, (void**)&(addr))); return addr; } diff --git a/morpheus/_lib/doca/src/doca_source.cpp b/morpheus/_lib/doca/src/doca_source.cpp index 45d4edd15e..107dc2b69a 100644 --- a/morpheus/_lib/doca/src/doca_source.cpp +++ b/morpheus/_lib/doca/src/doca_source.cpp @@ -38,11 +38,11 @@ #include #include #include +#include #include #include #include -#include #define BE_IPV4_ADDR(a, b, c, d) (RTE_BE32((a << 24) + (b << 16) + (c << 8) + d)) /* Big endian conversion */ @@ -72,7 +72,9 @@ std::optional ip_to_int(std::string const& ip_address) namespace morpheus { -DocaSourceStage::DocaSourceStage(std::string const& nic_pci_address, std::string const& gpu_pci_address, std::string const& traffic_type) : +DocaSourceStage::DocaSourceStage(std::string const& nic_pci_address, + std::string const& gpu_pci_address, + std::string const& traffic_type) : PythonSource(build()) { m_context = std::make_shared(nic_pci_address, gpu_pci_address); @@ -83,19 +85,19 @@ DocaSourceStage::DocaSourceStage(std::string const& nic_pci_address, std::string m_rxq.reserve(MAX_QUEUE); m_semaphore.reserve(MAX_QUEUE); - for (int idx = 0; idx < MAX_QUEUE; idx++) { + for (int idx = 0; idx < MAX_QUEUE; idx++) + { m_rxq.push_back(std::make_shared(m_context)); m_semaphore.push_back(std::make_shared(m_context, MAX_SEM_X_QUEUE)); } - m_rxpipe = std::make_shared(m_context, m_rxq, m_traffic_type); - + m_rxpipe = std::make_shared(m_context, m_rxq, m_traffic_type); } static uint64_t now_ns() { struct timespec t; - if(clock_gettime(CLOCK_REALTIME, &t) != 0) + if (clock_gettime(CLOCK_REALTIME, &t) != 0) return 0; return (uint64_t)t.tv_nsec + (uint64_t)t.tv_sec * 1000 * 1000 * 1000; } @@ -103,11 +105,11 @@ static uint64_t now_ns() DocaSourceStage::subscriber_fn_t DocaSourceStage::build() { return [this](rxcpp::subscriber output) { - struct packets_info *pkt_ptr; - int sem_idx = 0; + struct packets_info* pkt_ptr; + int sem_idx = 0; cudaStream_t rstream = nullptr; cudaStream_t pstream = nullptr; - cudf::table_view *fixed_width_inputs_table_view[MAX_SEM_X_QUEUE]; + cudf::table_view* fixed_width_inputs_table_view[MAX_SEM_X_QUEUE]; std::vector> payload_buffer_d; std::vector> payload_sizes_d; @@ -124,7 +126,8 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() int thread_idx = mrc::runnable::Context::get_runtime_context().rank(); - if (thread_idx >= MAX_QUEUE) { + if (thread_idx >= MAX_QUEUE) + { MORPHEUS_LOCAL(MORPHEUS_CONCAT_STR("Thread ID " << thread_idx << " bigger than MAX_QUEUE " << MAX_QUEUE)); return; } @@ -152,8 +155,8 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() cudaStreamDestroy(pstream); }); - - for (int idxs = 0; idxs < MAX_SEM_X_QUEUE; idxs++) { + for (int idxs = 0; idxs < MAX_SEM_X_QUEUE; idxs++) + { payload_buffer_d.push_back(rmm::device_uvector(MAX_PKT_RECEIVE * MAX_PKT_SIZE, pstream_cpp)); payload_sizes_d.push_back(rmm::device_uvector(MAX_PKT_RECEIVE, pstream_cpp)); src_mac_out_d.push_back(rmm::device_uvector(MAX_PKT_RECEIVE, pstream_cpp)); @@ -167,35 +170,36 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() next_proto_id_out_d.push_back(rmm::device_uvector(MAX_PKT_RECEIVE, pstream_cpp)); timestamp_out_d.push_back(rmm::device_uvector(MAX_PKT_RECEIVE, pstream_cpp)); - pkt_ptr = static_cast(m_semaphore[thread_idx]->get_info_cpu(idxs)); - pkt_ptr->payload_buffer_out = payload_buffer_d[idxs].data(); - pkt_ptr->payload_sizes_out = payload_sizes_d[idxs].data(); - pkt_ptr->src_mac_out = src_mac_out_d[idxs].data(); - pkt_ptr->dst_mac_out = dst_mac_out_d[idxs].data(); - pkt_ptr->src_ip_out = src_ip_out_d[idxs].data(); - pkt_ptr->dst_ip_out = dst_ip_out_d[idxs].data(); - pkt_ptr->src_port_out = src_port_out_d[idxs].data(); - pkt_ptr->dst_port_out = dst_port_out_d[idxs].data(); - pkt_ptr->tcp_flags_out = tcp_flags_out_d[idxs].data(); - pkt_ptr->ether_type_out = ether_type_out_d[idxs].data(); - pkt_ptr->next_proto_id_out = next_proto_id_out_d[idxs].data(); - pkt_ptr->timestamp_out = timestamp_out_d[idxs].data(); + pkt_ptr = static_cast(m_semaphore[thread_idx]->get_info_cpu(idxs)); + pkt_ptr->payload_buffer_out = payload_buffer_d[idxs].data(); + pkt_ptr->payload_sizes_out = payload_sizes_d[idxs].data(); + pkt_ptr->src_mac_out = src_mac_out_d[idxs].data(); + pkt_ptr->dst_mac_out = dst_mac_out_d[idxs].data(); + pkt_ptr->src_ip_out = src_ip_out_d[idxs].data(); + pkt_ptr->dst_ip_out = dst_ip_out_d[idxs].data(); + pkt_ptr->src_port_out = src_port_out_d[idxs].data(); + pkt_ptr->dst_port_out = dst_port_out_d[idxs].data(); + pkt_ptr->tcp_flags_out = tcp_flags_out_d[idxs].data(); + pkt_ptr->ether_type_out = ether_type_out_d[idxs].data(); + pkt_ptr->next_proto_id_out = next_proto_id_out_d[idxs].data(); + pkt_ptr->timestamp_out = timestamp_out_d[idxs].data(); fixed_width_inputs_table_view[idxs] = new cudf::table_view(std::vector{ - cudf::column_view(cudf::device_span(src_mac_out_d[idxs])), - cudf::column_view(cudf::device_span(dst_mac_out_d[idxs])), - cudf::column_view(cudf::device_span(src_ip_out_d[idxs])), - cudf::column_view(cudf::device_span(dst_ip_out_d[idxs])), - cudf::column_view(cudf::device_span(src_port_out_d[idxs])), - cudf::column_view(cudf::device_span(dst_port_out_d[idxs])), - cudf::column_view(cudf::device_span(tcp_flags_out_d[idxs])), - cudf::column_view(cudf::device_span(ether_type_out_d[idxs])), - cudf::column_view(cudf::device_span(next_proto_id_out_d[idxs])), - cudf::column_view(cudf::device_span(timestamp_out_d[idxs])), - }); + cudf::column_view(cudf::device_span(src_mac_out_d[idxs])), + cudf::column_view(cudf::device_span(dst_mac_out_d[idxs])), + cudf::column_view(cudf::device_span(src_ip_out_d[idxs])), + cudf::column_view(cudf::device_span(dst_ip_out_d[idxs])), + cudf::column_view(cudf::device_span(src_port_out_d[idxs])), + cudf::column_view(cudf::device_span(dst_port_out_d[idxs])), + cudf::column_view(cudf::device_span(tcp_flags_out_d[idxs])), + cudf::column_view(cudf::device_span(ether_type_out_d[idxs])), + cudf::column_view(cudf::device_span(next_proto_id_out_d[idxs])), + cudf::column_view(cudf::device_span(timestamp_out_d[idxs])), + }); } - auto exit_condition = std::make_unique>(m_context, 1, DOCA_GPU_MEM_TYPE_GPU_CPU); + auto exit_condition = + std::make_unique>(m_context, 1, DOCA_GPU_MEM_TYPE_GPU_CPU); DOCA_GPUNETIO_VOLATILE(*(exit_condition->cpu_ptr())) = 0; auto cancel_thread = std::thread([&] { @@ -203,9 +207,10 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() DOCA_GPUNETIO_VOLATILE(*(exit_condition->cpu_ptr())) = 1; }); - while (output.is_subscribed()) { - - if (DOCA_GPUNETIO_VOLATILE(*(exit_condition->cpu_ptr())) == 1) { + while (output.is_subscribed()) + { + if (DOCA_GPUNETIO_VOLATILE(*(exit_condition->cpu_ptr())) == 1) + { output.unsubscribe(); continue; } @@ -213,51 +218,58 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() // printf("Launching kernel with idx0 %d idx1 %d idx2 %d\n", sem_idx[0], sem_idx[1], sem_idx[2]); // const auto start_kernel = now_ns(); morpheus::doca::packet_receive_kernel(m_rxq[thread_idx]->rxq_info_gpu(), - m_semaphore[thread_idx]->gpu_ptr(), - sem_idx, - (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) ? true : false, - exit_condition->gpu_ptr(), - rstream); + m_semaphore[thread_idx]->gpu_ptr(), + sem_idx, + (m_traffic_type == DOCA_TRAFFIC_TYPE_TCP) ? true : false, + exit_condition->gpu_ptr(), + rstream); cudaStreamSynchronize(rstream); - if (m_semaphore[thread_idx]->is_ready(sem_idx)) { + if (m_semaphore[thread_idx]->is_ready(sem_idx)) + { // const auto start = now_ns(); // LOG(WARNING) << "CPU READY sem " << idxs << " queue " << thread_idx << std::endl; - pkt_ptr = static_cast(m_semaphore[thread_idx]->get_info_cpu(sem_idx)); + pkt_ptr = static_cast(m_semaphore[thread_idx]->get_info_cpu(sem_idx)); // const auto table_stop = now_ns(); - auto packet_count = pkt_ptr->packet_count_out; + auto packet_count = pkt_ptr->packet_count_out; auto payload_size_total = pkt_ptr->payload_size_total_out; - // LOG(WARNING) << "CPU packet_count " << packet_count << " payload_size_total " << payload_size_total << std::endl; + // LOG(WARNING) << "CPU packet_count " << packet_count << " payload_size_total " << payload_size_total + // << std::endl; - //Should not be necessary + // Should not be necessary if (packet_count == 0) continue; // gather payload data - auto payload_col = doca::gather_payload(packet_count, pkt_ptr->payload_buffer_out, pkt_ptr->payload_sizes_out, pstream_cpp); + auto payload_col = doca::gather_payload( + packet_count, pkt_ptr->payload_buffer_out, pkt_ptr->payload_sizes_out, pstream_cpp); // const auto gather_payload_stop = now_ns(); auto iota_col = [packet_count]() { using scalar_type_t = cudf::scalar_type_t; - auto zero = cudf::make_numeric_scalar(cudf::data_type(cudf::data_type{cudf::type_to_id()})); + auto zero = + cudf::make_numeric_scalar(cudf::data_type(cudf::data_type{cudf::type_to_id()})); static_cast(zero.get())->set_value(0); zero->set_valid_async(false); return cudf::sequence(packet_count, *zero); }(); // Accept the stream now? - auto gathered_table = cudf::gather(*fixed_width_inputs_table_view[sem_idx], iota_col->view(), cudf::out_of_bounds_policy::DONT_CHECK, pstream_cpp); - auto gathered_columns = gathered_table->release(); + auto gathered_table = cudf::gather(*fixed_width_inputs_table_view[sem_idx], + iota_col->view(), + cudf::out_of_bounds_policy::DONT_CHECK, + pstream_cpp); + auto gathered_columns = gathered_table->release(); // const auto table_create_stop = now_ns(); // post-processing for mac addresses - auto src_mac_col = gathered_columns[0].release(); + auto src_mac_col = gathered_columns[0].release(); // Accept the stream now? auto src_mac_str_col = morpheus::doca::integers_to_mac(src_mac_col->view(), pstream_cpp); gathered_columns[0].reset(src_mac_str_col.release()); @@ -267,12 +279,12 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() gathered_columns[1].reset(dst_mac_str_col.release()); // post-processing for ip addresses - auto src_ip_col = gathered_columns[2].release(); + auto src_ip_col = gathered_columns[2].release(); // Accept the stream now? auto src_ip_str_col = cudf::strings::integers_to_ipv4(src_ip_col->view(), pstream_cpp); gathered_columns[2].reset(src_ip_str_col.release()); - auto dst_ip_col = gathered_columns[3].release(); + auto dst_ip_col = gathered_columns[3].release(); // Accept the stream now? auto dst_ip_str_col = cudf::strings::integers_to_ipv4(dst_ip_col->view(), pstream_cpp); gathered_columns[3].reset(dst_ip_str_col.release()); @@ -295,7 +307,7 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() gathered_metadata.schema_info.emplace_back("timestamp"); gathered_metadata.schema_info.emplace_back("data"); - //After this point buffers can be reused -> copies actual packets' data + // After this point buffers can be reused -> copies actual packets' data gathered_table = std::make_unique(std::move(gathered_columns)); // const auto gather_table_meta = now_ns(); @@ -307,14 +319,14 @@ DocaSourceStage::subscriber_fn_t DocaSourceStage::build() auto meta = MessageMeta::create_from_cpp(std::move(gathered_table_w_metadata), 0); - //Do we still need this synchronize? - // const auto gather_meta_stop = now_ns(); + // Do we still need this synchronize? + // const auto gather_meta_stop = now_ns(); cudaStreamSynchronize(pstream_cpp); output.on_next(std::move(meta)); m_semaphore[thread_idx]->set_free(sem_idx); - sem_idx = (sem_idx+1)%MAX_SEM_X_QUEUE; + sem_idx = (sem_idx + 1) % MAX_SEM_X_QUEUE; // const auto end = now_ns(); // LOG(WARNING) << "Queue " << thread_idx diff --git a/morpheus/stages/doca/doca_source_stage.py b/morpheus/stages/doca/doca_source_stage.py index 3844ff897b..56a761f577 100644 --- a/morpheus/stages/doca/doca_source_stage.py +++ b/morpheus/stages/doca/doca_source_stage.py @@ -70,7 +70,8 @@ def __init__( self._gpu_pci_address = gpu_pci_address self._traffic_type = traffic_type if self._traffic_type != 'udp' and self._traffic_type != 'tcp': - raise NotImplementedError("The Morpheus DOCA source stage allows a only udp or tcp types of traffic flow " + self._traffic_type) + raise NotImplementedError("The Morpheus DOCA source stage allows a only udp or tcp types of traffic flow " + + self._traffic_type) @property def name(self) -> str: @@ -91,7 +92,11 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: if self._build_cpp_node(): # return self._doca_source_class(builder, self.unique_name, self._nic_pci_address, self._gpu_pci_address, self._traffic_type) - node = self._doca_source_class(builder, self.unique_name, self._nic_pci_address, self._gpu_pci_address, self._traffic_type) + node = self._doca_source_class(builder, + self.unique_name, + self._nic_pci_address, + self._gpu_pci_address, + self._traffic_type) node.launch_options.pe_count = self._max_concurrent return node