Skip to content

Commit

Permalink
Merge pull request #2 from cwharris/branch-24.03-doca
Browse files Browse the repository at this point in the history
Run `./ci/scripts/fix_all.sh`
  • Loading branch information
e-ago authored Mar 13, 2024
2 parents faa3d1e + f081ceb commit 870b066
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 186 deletions.
12 changes: 9 additions & 3 deletions examples/doca/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@
help="UDP or TCP traffic",
required=True,
)
def run_pipeline(pipeline_batch_size, model_max_batch_size, model_fea_length, out_file, nic_addr,
gpu_addr, traffic_type):
def run_pipeline(pipeline_batch_size,
model_max_batch_size,
model_fea_length,
out_file,
nic_addr,
gpu_addr,
traffic_type):
# Enable the default logger
configure_logging(log_level=logging.DEBUG)

Expand Down Expand Up @@ -118,7 +123,8 @@ def run_pipeline(pipeline_batch_size, model_max_batch_size, model_fea_length, ou
pipeline.add_stage(
PreprocessNLPStage(
config,
vocab_hash_file='/workspace/models/training-tuning-scripts/sid-models/resources/bert-base-uncased-hash.txt',
vocab_hash_file=
'/workspace/models/training-tuning-scripts/sid-models/resources/bert-base-uncased-hash.txt',
do_lower_case=True,
truncation=True,
add_special_tokens=False,
Expand Down
48 changes: 25 additions & 23 deletions morpheus/_lib/doca/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,43 @@

#include <cstdint>
#include <memory>
#include <vector>
#include <string>
#include <type_traits>
#include <vector>

uint32_t const PACKETS_PER_THREAD = 4;
uint32_t const THREADS_PER_BLOCK = 1024; //512
uint32_t const THREADS_PER_BLOCK = 1024; // 512
uint32_t const PACKETS_PER_BLOCK = PACKETS_PER_THREAD * THREADS_PER_BLOCK;
uint32_t const PACKET_RX_TIMEOUT_NS = 1000000; //1ms //500us
uint32_t const PACKET_RX_TIMEOUT_NS = 1000000; // 1ms //500us

uint32_t const MAX_PKT_RECEIVE = PACKETS_PER_BLOCK;
uint32_t const MAX_PKT_SIZE = 4096;
uint32_t const MAX_PKT_NUM = 65536;
uint32_t const MAX_QUEUE = 4;
uint32_t const MAX_SEM_X_QUEUE = 32;

enum doca_traffic_type {
DOCA_TRAFFIC_TYPE_UDP = 0,
DOCA_TRAFFIC_TYPE_TCP = 1,
enum doca_traffic_type
{
DOCA_TRAFFIC_TYPE_UDP = 0,
DOCA_TRAFFIC_TYPE_TCP = 1,
};

struct packets_info {
int32_t packet_count_out;
int32_t payload_size_total_out;

char *payload_buffer_out;
int32_t *payload_sizes_out;

int64_t *src_mac_out;
int64_t *dst_mac_out;
int64_t *src_ip_out;
int64_t *dst_ip_out;
uint16_t *src_port_out;
uint16_t *dst_port_out;
int32_t *tcp_flags_out;
int32_t *ether_type_out;
int32_t *next_proto_id_out;
uint32_t *timestamp_out;
struct packets_info
{
int32_t packet_count_out;
int32_t payload_size_total_out;

char* payload_buffer_out;
int32_t* payload_sizes_out;

int64_t* src_mac_out;
int64_t* dst_mac_out;
int64_t* src_ip_out;
int64_t* dst_ip_out;
uint16_t* src_port_out;
uint16_t* dst_port_out;
int32_t* tcp_flags_out;
int32_t* ether_type_out;
int32_t* next_proto_id_out;
uint32_t* timestamp_out;
};
4 changes: 3 additions & 1 deletion morpheus/_lib/doca/include/doca_rx_pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct DocaRxPipe
doca_flow_pipe* m_root_pipe;

public:
DocaRxPipe(std::shared_ptr<DocaContext> context, std::vector<std::shared_ptr<morpheus::doca::DocaRxQueue>> rxq, doca_traffic_type const type);
DocaRxPipe(std::shared_ptr<DocaContext> context,
std::vector<std::shared_ptr<morpheus::doca::DocaRxQueue>> rxq,
doca_traffic_type const type);
~DocaRxPipe();
};

Expand Down
2 changes: 1 addition & 1 deletion morpheus/_lib/doca/include/doca_semaphore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct DocaSemaphore

doca_gpu_semaphore_gpu* gpu_ptr();
uint16_t size();
void * get_info_cpu(uint32_t idx);
void* get_info_cpu(uint32_t idx);
bool is_ready(uint32_t idx);
void set_free(uint32_t idx);
};
Expand Down
10 changes: 6 additions & 4 deletions morpheus/_lib/doca/include/doca_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

#pragma once

#include "morpheus/messages/meta.hpp"
#include "common.hpp"

#include "morpheus/messages/meta.hpp"

#include <mrc/segment/builder.hpp>
#include <pymrc/node.hpp>

Expand Down Expand Up @@ -49,7 +50,9 @@ class DocaSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageM
using typename base_t::source_type_t;
using typename base_t::subscriber_fn_t;

DocaSourceStage(std::string const& nic_pci_address, std::string const& gpu_pci_address, std::string const& traffic_type);
DocaSourceStage(std::string const& nic_pci_address,
std::string const& gpu_pci_address,
std::string const& traffic_type);

private:
subscriber_fn_t build();
Expand All @@ -75,8 +78,7 @@ struct DocaSourceStageInterfaceProxy
std::string const& name,
std::string const& nic_pci_address,
std::string const& gpu_pci_address,
std::string const& traffic_type
);
std::string const& traffic_type);
};

#pragma GCC visibility pop
Expand Down
14 changes: 6 additions & 8 deletions morpheus/_lib/doca/include/doca_source_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ std::unique_ptr<cudf::column> integers_to_mac(
rmm::cuda_stream_view stream = cudf::detail::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

void packet_receive_kernel(
doca_gpu_eth_rxq* rxq,
doca_gpu_semaphore_gpu* sem,
uint16_t sem_idx,
bool is_tcp,
uint32_t* exit_condition,
cudaStream_t stream
);
void packet_receive_kernel(doca_gpu_eth_rxq* rxq,
doca_gpu_semaphore_gpu* sem,
uint16_t sem_idx,
bool is_tcp,
uint32_t* exit_condition,
cudaStream_t stream);

void packet_gather_kernel(
int32_t packet_count, char* packet_buffer, int32_t* payload_sizes, char* payload_chars_out, cudaStream_t stream);
Expand Down
4 changes: 2 additions & 2 deletions morpheus/_lib/doca/include/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ namespace detail {

inline void throw_doca_error(doca_error_t error, const char* file, unsigned int line)
{
throw morpheus::DocaError(MORPHEUS_CONCAT_STR(
"DOCA error encountered at: " << file << ":" << line << ": " << error << " " << doca_error_get_descr(error)));
throw morpheus::DocaError(MORPHEUS_CONCAT_STR("DOCA error encountered at: " << file << ":" << line << ": " << error
<< " " << doca_error_get_descr(error)));
}

inline void throw_rte_error(int error, const char* file, unsigned int line)
Expand Down
3 changes: 1 addition & 2 deletions morpheus/_lib/doca/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ PYBIND11_MODULE(doca, m)
py::arg("name"),
py::arg("nic_pci_address"),
py::arg("gpu_pci_address"),
py::arg("traffic_type")
);
py::arg("traffic_type"));
}

} // namespace morpheus
74 changes: 39 additions & 35 deletions morpheus/_lib/doca/src/doca_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,41 +47,45 @@ namespace {

using jobs_check_t = doca_error_t (*)(doca_devinfo*);

static doca_error_t open_doca_device_with_pci(const char *pcie_value, struct doca_dev **retval)
static doca_error_t open_doca_device_with_pci(const char* pcie_value, struct doca_dev** retval)
{
struct doca_devinfo **dev_list;
uint32_t nb_devs;
doca_error_t res;
size_t i;
uint8_t is_addr_equal = 0;

/* Set default return value */
*retval = NULL;

res = doca_devinfo_create_list(&dev_list, &nb_devs);
if (res != DOCA_SUCCESS) {
LOG(ERROR) << "Failed to load doca devices list";
return res;
}

/* Search */
for (i = 0; i < nb_devs; i++) {
res = doca_devinfo_is_equal_pci_addr(dev_list[i], pcie_value, &is_addr_equal);
if (res == DOCA_SUCCESS && is_addr_equal) {
/* if device can be opened */
res = doca_dev_open(dev_list[i], retval);
if (res == DOCA_SUCCESS) {
doca_devinfo_destroy_list(dev_list);
return res;
}
}
}

MORPHEUS_FAIL("Matching device not found");
res = DOCA_ERROR_NOT_FOUND;

doca_devinfo_destroy_list(dev_list);
return res;
struct doca_devinfo** dev_list;
uint32_t nb_devs;
doca_error_t res;
size_t i;
uint8_t is_addr_equal = 0;

/* Set default return value */
*retval = NULL;

res = doca_devinfo_create_list(&dev_list, &nb_devs);
if (res != DOCA_SUCCESS)
{
LOG(ERROR) << "Failed to load doca devices list";
return res;
}

/* Search */
for (i = 0; i < nb_devs; i++)
{
res = doca_devinfo_is_equal_pci_addr(dev_list[i], pcie_value, &is_addr_equal);
if (res == DOCA_SUCCESS && is_addr_equal)
{
/* if device can be opened */
res = doca_dev_open(dev_list[i], retval);
if (res == DOCA_SUCCESS)
{
doca_devinfo_destroy_list(dev_list);
return res;
}
}
}

MORPHEUS_FAIL("Matching device not found");
res = DOCA_ERROR_NOT_FOUND;

doca_devinfo_destroy_list(dev_list);
return res;
}

doca_flow_port* init_doca_flow(uint16_t port_id, uint8_t rxq_num)
Expand Down Expand Up @@ -196,7 +200,7 @@ DocaContext::DocaContext(std::string nic_addr, std::string gpu_addr) : m_max_que
DocaContext::~DocaContext()
{
doca_flow_port_stop(m_flow_port);
doca_flow_destroy();
doca_flow_destroy();

if (m_gpu != nullptr)
{
Expand Down
Loading

0 comments on commit 870b066

Please sign in to comment.