Skip to content

Commit

Permalink
allow IOBuf::attach_user_data specify any valid address in registered…
Browse files Browse the repository at this point in the history
… rdma memory region
  • Loading branch information
Tuvie committed Nov 22, 2022
1 parent 3d90221 commit b723184
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 102 deletions.
2 changes: 2 additions & 0 deletions docs/cn/rdma.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ RdmaEndpoint数据传输逻辑的第三个重要特性是事件聚合。每个

RDMA要求数据收发所使用的内存空间必须被注册(memory register),把对应的页表映射注册给网卡,这一操作非常耗时,所以通常都会使用内存池方案来加速。brpc内部的数据收发都使用IOBuf,为了在兼容IOBuf的情况下实现完全零拷贝,整个IOBuf所使用的内存空间整体由统一内存池接管(参见src/brpc/rdma/block_pool.cpp)。注意,由于IOBuf内存池不由用户直接控制,因此实际使用中需要注意IOBuf所消耗的总内存,建议根据实际业务需求,一次性注册足够的内存池以实现性能最大化。

应用程序可以自己管理内存,然后通过IOBuf::append_user_data_with_meta把数据发送出去。在这种情况下,应用程序应该自己使用rdma::RegisterMemoryForRdma注册内存(参见src/brpc/rdma/rdma_helper.h)。注意,RegisterMemoryForRdma会返回注册内存对应的lkey,请在append_user_data_with_meta时以meta形式提供给brpc。

RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。

# 参数
Expand Down
2 changes: 2 additions & 0 deletions docs/en/rdma.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ The third key feature in RdmaEndpoint data transmission is event suppression. Th

All the memory used for data transmission in RDMA must be registered, which is very inefficient. Generally, a memory pool is employed to avoid frequent memory registration. In fact, brpc uses IOBuf for data transmission. In order to realize total zerocopy and compatibility with IOBuf, the memory used by IOBuf is taken over by the RDMA memory pool (see src/brpc/rdma/block_pool.cpp). Since IOBuf buffer cannot be controlled by user directly, the total memory consumption in IOBuf should be carefully managed. It is suggested that the application registers enough memory at one time according to its requirement.

The application can manage memory by itself and send data with IOBuf::append_user_data_with_meta. In this case, the application should register memory by itself with rdma::RegisterMemoryForRdma (see src/brpc/rdma/rdma_helper.h). Note that RegisterMemoryForRdma returns the lkey for registered memory. Please provide this lkey with data together when calling append_user_data_with_meta.

RDMA is hardware-related. It has some different concepts such as device, port, GID, LID, MaxSge and so on. These parameters can be read from NICs at initialization, and brpc will make the default choice (see src/brpc/rdma/rdma_helper.cpp). Sometimes the default choice is not the expectation, then it can be changed in the flag way.

# Parameters
Expand Down
19 changes: 15 additions & 4 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
if (s->_rdma_state != Socket::RDMA_OFF) {
flags |= ACK_MSG_RDMA_OK;
}
*(uint32_t*)data = butil::HostToNet32(flags);
uint32_t* tmp = (uint32_t*)data; // avoid GCC warning on strict-aliasing
*tmp = butil::HostToNet32(flags);
if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to send Ack Message to server:" << s->description();
Expand Down Expand Up @@ -668,7 +669,8 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
}

// Check RDMA enable flag
uint32_t flags = butil::NetToHost32(*(uint32_t*)data);
uint32_t* tmp = (uint32_t*)data; // avoid GCC warning on strict-aliasing
uint32_t flags = butil::NetToHost32(*tmp);
if (flags & ACK_MSG_RDMA_OK) {
if (s->_rdma_state == Socket::RDMA_OFF) {
LOG(WARNING) << "Fail to parse Hello Message length from client:"
Expand Down Expand Up @@ -722,7 +724,16 @@ friend class RdmaEndpoint;
butil::IOBuf::BlockRef const& r = _ref_at(0);
CHECK(r.length > 0);
const void* start = fetch1();
uint32_t lkey = GetLKey((char*)start - r.offset);
uint32_t lkey = GetRegionId(start);
if (lkey == 0) { // get lkey for user registered memory
uint64_t meta = get_first_data_meta();
if (meta <= UINT_MAX) {
lkey = (uint32_t)meta;
}
}
if (BAIDU_UNLIKELY(lkey == 0)) { // only happens when meta is not specified
lkey = GetLKey((char*)start - r.offset);
}
if (lkey == 0) {
LOG(WARNING) << "Memory not registered for rdma. "
<< "Is this iobuf allocated before calling "
Expand Down Expand Up @@ -975,7 +986,7 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) {
ibv_sge sge;
sge.addr = (uint64_t)block;
sge.length = block_size;
sge.lkey = GetLKey((char*)block - IOBUF_BLOCK_HEADER_LEN);
sge.lkey = GetRegionId(block);
wr.num_sge = 1;
wr.sg_list = &sge;

Expand Down
119 changes: 64 additions & 55 deletions src/brpc/rdma/rdma_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,37 +99,36 @@ static ibv_device** g_devices = NULL;
static ibv_context* g_context = NULL;
static SocketId g_async_socket;
static ibv_pd* g_pd = NULL;
static std::vector<ibv_mr*>* g_mrs = NULL;
static std::vector<ibv_mr*>* g_mrs = NULL; // mr registered by brpc

static butil::FlatMap<void*, ibv_mr*>* g_user_mrs; // mr registered by user
static butil::Mutex* g_user_mrs_lock = NULL;

// Store the original IOBuf memalloc and memdealloc functions
static void* (*g_mem_alloc)(size_t) = NULL;
static void (*g_mem_dealloc)(void*) = NULL;

butil::Mutex* g_addr_map_lock;
typedef butil::FlatMap<const void*, ibv_mr*> AddrMap;
static AddrMap* g_addr_map = NULL; // for mr not in memory pool

static void GlobalRelease() {
g_rdma_available.store(false, butil::memory_order_release);
usleep(100000); // to avoid unload library too early

// We do not set `g_async_socket' to failed explicitly to avoid
// close async_fd twice.

if (g_addr_map_lock) {
BAIDU_SCOPED_LOCK(*g_addr_map_lock);
if (g_addr_map) {
for (AddrMap::iterator it = g_addr_map->begin();
it != g_addr_map->end(); ++it) {
IbvDeregMr(it->second);
}
delete g_addr_map;
g_addr_map = NULL; // must set it to NULL
RdmaEndpoint::GlobalRelease();

if (g_user_mrs_lock) {
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
for (butil::FlatMap<void*, ibv_mr*>::iterator it = g_user_mrs->begin();
it != g_user_mrs->end(); ++it) {
IbvDeregMr(it->second);
}
g_user_mrs->clear();
delete g_user_mrs;
g_user_mrs = NULL;
}
delete g_addr_map_lock;

RdmaEndpoint::GlobalRelease();
delete g_user_mrs_lock;
g_user_mrs_lock = NULL;

if (g_mrs) {
for (size_t i = 0; i < g_mrs->size(); ++i) {
Expand Down Expand Up @@ -463,6 +462,23 @@ static void GlobalRdmaInitializeOrDieImpl() {
ExitWithError();
}

g_user_mrs_lock = new (std::nothrow) butil::Mutex;
if (!g_user_mrs_lock) {
PLOG(WARNING) << "Fail to construct g_user_mrs_lock";
ExitWithError();
}

g_user_mrs = new (std::nothrow) butil::FlatMap<void*, ibv_mr*>();
if (!g_user_mrs) {
PLOG(WARNING) << "Fail to construct g_user_mrs";
ExitWithError();
}

if (g_user_mrs->init(65536) < 0) {
PLOG(WARNING) << "Fail to initialize g_user_mrs";
ExitWithError();
}

g_mrs = new (std::nothrow) std::vector<ibv_mr*>;
if (!g_mrs) {
PLOG(ERROR) << "Fail to allocate a RDMA MR list";
Expand Down Expand Up @@ -494,23 +510,6 @@ static void GlobalRdmaInitializeOrDieImpl() {
ExitWithError();
}

g_addr_map_lock = new (std::nothrow) butil::Mutex;
if (!g_addr_map_lock) {
PLOG(WARNING) << "Fail to construct g_addr_map_lock";
ExitWithError();
}

g_addr_map = new (std::nothrow) AddrMap;
if (!g_addr_map) {
PLOG(WARNING) << "Fail to construct g_addr_map";
ExitWithError();
}

if (g_addr_map->init(65536) < 0) {
PLOG(WARNING) << "Fail to initialize g_addr_map";
ExitWithError();
}

SocketOptions opt;
opt.fd = g_context->async_fd;
butil::make_close_on_exec(opt.fd);
Expand Down Expand Up @@ -543,25 +542,38 @@ void GlobalRdmaInitializeOrDie() {
}
}

int RegisterMemoryForRdma(void* buf, size_t len) {
uint32_t RegisterMemoryForRdma(void* buf, size_t len) {
ibv_mr* mr = IbvRegMr(g_pd, buf, len, IBV_ACCESS_LOCAL_WRITE);
if (!mr) {
return -1;
return 0;
}
BAIDU_SCOPED_LOCK(*g_addr_map_lock);
if (!g_addr_map->insert(buf, mr)) {
IbvDeregMr(mr);
return -1;
{
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
if (!g_user_mrs->insert(buf, mr)) {
LOG(WARNING) << "Fail to insert to user mr maps (now there are "
<< g_user_mrs->size() << " mrs already";
} else {
return mr->lkey;
}
}
IbvDeregMr(mr);
return 0;
}

void DeregisterMemoryForRdma(void* buf) {
BAIDU_SCOPED_LOCK(*g_addr_map_lock);
ibv_mr** mr = g_addr_map->seek(buf);
if (mr && *mr) {
IbvDeregMr(*mr);
g_addr_map->erase(buf);
ibv_mr* mr = NULL;
{
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
if (mr_ptr) {
mr = *mr_ptr;
g_user_mrs->erase(buf);
}
}
if (mr) {
IbvDeregMr(mr);
} else {
LOG(WARNING) << "Try to deregister a buffer which is not registered";
}
}

Expand All @@ -575,7 +587,7 @@ int GetRdmaCompVector() {
}
// g_comp_vector_index is not an atomic variable. If more than
// one CQ is created at the same time, some CQs will share the
// same index. However, this vector is only used to assign a
// same index. However, this vector is only used to assign an
// event queue for the CQ. Sharing the same event queue is not
// a problem.
return (g_comp_vector_index++) % g_context->num_comp_vectors;
Expand All @@ -589,16 +601,13 @@ ibv_pd* GetRdmaPd() {
return g_pd;
}

uint32_t GetLKey(const void* buf) {
uint32_t lkey = GetRegionId(buf);
if (lkey == 0) {
BAIDU_SCOPED_LOCK(*g_addr_map_lock);
ibv_mr** mr = g_addr_map->seek(buf);
if (mr && *mr) {
lkey = (*mr)->lkey;
}
uint32_t GetLKey(void* buf) {
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
if (mr_ptr) {
return (*mr_ptr)->lkey;
}
return lkey;
return 0;
}

ibv_gid GetRdmaGid() {
Expand Down
8 changes: 5 additions & 3 deletions src/brpc/rdma/rdma_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ namespace rdma {
void GlobalRdmaInitializeOrDie();

// Register the given memory
// Return 0 if success, -1 if failed and errno set
int RegisterMemoryForRdma(void* buf, size_t len);
// Return the memory lkey for the given memory, Return 0 when fails
// To use the memory in IOBuf, append_user_data_with_meta must be called
// and take the lkey as the data meta
uint32_t RegisterMemoryForRdma(void* buf, size_t len);

// Deregister the given memory
void DeregisterMemoryForRdma(void* buf);
Expand All @@ -45,7 +47,7 @@ ibv_context* GetRdmaContext();
ibv_pd* GetRdmaPd();

// Return lkey of the given address
uint32_t GetLKey(const void* buf);
uint32_t GetLKey(void* buf);

// Return GID Index
uint8_t GetRdmaGidIndex();
Expand Down
Loading

0 comments on commit b723184

Please sign in to comment.