diff --git a/docs/cn/rdma.md b/docs/cn/rdma.md index 57a1396493..86c22e1e84 100644 --- a/docs/cn/rdma.md +++ b/docs/cn/rdma.md @@ -43,6 +43,8 @@ RdmaEndpoint数据传输逻辑的第三个重要特性是事件聚合。每个 RDMA要求数据收发所使用的内存空间必须被注册(memory register),把对应的页表映射注册给网卡,这一操作非常耗时,所以通常都会使用内存池方案来加速。brpc内部的数据收发都使用IOBuf,为了在兼容IOBuf的情况下实现完全零拷贝,整个IOBuf所使用的内存空间整体由统一内存池接管(参见src/brpc/rdma/block_pool.cpp)。注意,由于IOBuf内存池不由用户直接控制,因此实际使用中需要注意IOBuf所消耗的总内存,建议根据实际业务需求,一次性注册足够的内存池以实现性能最大化。 +应用程序可以自己管理内存,然后通过IOBuf::append_user_data_with_meta把数据发送出去。在这种情况下,应用程序应该自己使用rdma::RegisterMemoryForRdma注册内存(参见src/brpc/rdma/rdma_helper.h)。注意,RegisterMemoryForRdma会返回注册内存对应的lkey,请在append_user_data_with_meta时以meta形式提供给brpc。 + RDMA是硬件相关的通信技术,有很多独特的概念,比如device、port、GID、LID、MaxSge等。这些参数在初始化时会从对应的网卡中读取出来,并且做出默认的选择(参见src/brpc/rdma/rdma_helper.cpp)。有时默认的选择并非用户的期望,则可以通过flag参数方式指定。 # 参数 diff --git a/docs/en/rdma.md b/docs/en/rdma.md index e187807fbd..c0e88ce9b2 100644 --- a/docs/en/rdma.md +++ b/docs/en/rdma.md @@ -43,6 +43,8 @@ The third key feature in RdmaEndpoint data transmission is event suppression. Th All the memory used for data transmission in RDMA must be registered, which is very inefficient. Generally, a memory pool is employed to avoid frequent memory registration. In fact, brpc uses IOBuf for data transmission. In order to realize total zerocopy and compatibility with IOBuf, the memory used by IOBuf is taken over by the RDMA memory pool (see src/brpc/rdma/block_pool.cpp). Since IOBuf buffer cannot be controlled by user directly, the total memory consumption in IOBuf should be carefully managed. It is suggested that the application registers enough memory at one time according to its requirement. +The application can manage memory by itself and send data with IOBuf::append_user_data_with_meta. In this case, the application should register memory by itself with rdma::RegisterMemoryForRdma (see src/brpc/rdma/rdma_helper.h). Note that RegisterMemoryForRdma returns the lkey for registered memory. Please provide this lkey with data together when calling append_user_data_with_meta. + RDMA is hardware-related. It has some different concepts such as device, port, GID, LID, MaxSge and so on. These parameters can be read from NICs at initialization, and brpc will make the default choice (see src/brpc/rdma/rdma_helper.cpp). Sometimes the default choice is not the expectation, then it can be changed in the flag way. # Parameters diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 299443525f..49daa8f8c9 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -516,7 +516,8 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { if (s->_rdma_state != Socket::RDMA_OFF) { flags |= ACK_MSG_RDMA_OK; } - *(uint32_t*)data = butil::HostToNet32(flags); + uint32_t* tmp = (uint32_t*)data; // avoid GCC warning on strict-aliasing + *tmp = butil::HostToNet32(flags); if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send Ack Message to server:" << s->description(); @@ -668,7 +669,8 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } // Check RDMA enable flag - uint32_t flags = butil::NetToHost32(*(uint32_t*)data); + uint32_t* tmp = (uint32_t*)data; // avoid GCC warning on strict-aliasing + uint32_t flags = butil::NetToHost32(*tmp); if (flags & ACK_MSG_RDMA_OK) { if (s->_rdma_state == Socket::RDMA_OFF) { LOG(WARNING) << "Fail to parse Hello Message length from client:" @@ -722,7 +724,16 @@ friend class RdmaEndpoint; butil::IOBuf::BlockRef const& r = _ref_at(0); CHECK(r.length > 0); const void* start = fetch1(); - uint32_t lkey = GetLKey((char*)start - r.offset); + uint32_t lkey = GetRegionId(start); + if (lkey == 0) { // get lkey for user registered memory + uint64_t meta = get_first_data_meta(); + if (meta <= UINT_MAX) { + lkey = (uint32_t)meta; + } + } + if (BAIDU_UNLIKELY(lkey == 0)) { // only happens when meta is not specified + lkey = GetLKey((char*)start - r.offset); + } if (lkey == 0) { LOG(WARNING) << "Memory not registered for rdma. " << "Is this iobuf allocated before calling " @@ -975,7 +986,7 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) { ibv_sge sge; sge.addr = (uint64_t)block; sge.length = block_size; - sge.lkey = GetLKey((char*)block - IOBUF_BLOCK_HEADER_LEN); + sge.lkey = GetRegionId(block); wr.num_sge = 1; wr.sg_list = &sge; diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 536398c9ac..a31b2a2456 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -99,16 +99,15 @@ static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; static SocketId g_async_socket; static ibv_pd* g_pd = NULL; -static std::vector* g_mrs = NULL; +static std::vector* g_mrs = NULL; // mr registered by brpc + +static butil::FlatMap* g_user_mrs; // mr registered by user +static butil::Mutex* g_user_mrs_lock = NULL; // Store the original IOBuf memalloc and memdealloc functions static void* (*g_mem_alloc)(size_t) = NULL; static void (*g_mem_dealloc)(void*) = NULL; -butil::Mutex* g_addr_map_lock; -typedef butil::FlatMap AddrMap; -static AddrMap* g_addr_map = NULL; // for mr not in memory pool - static void GlobalRelease() { g_rdma_available.store(false, butil::memory_order_release); usleep(100000); // to avoid unload library too early @@ -116,20 +115,20 @@ static void GlobalRelease() { // We do not set `g_async_socket' to failed explicitly to avoid // close async_fd twice. - if (g_addr_map_lock) { - BAIDU_SCOPED_LOCK(*g_addr_map_lock); - if (g_addr_map) { - for (AddrMap::iterator it = g_addr_map->begin(); - it != g_addr_map->end(); ++it) { - IbvDeregMr(it->second); - } - delete g_addr_map; - g_addr_map = NULL; // must set it to NULL + RdmaEndpoint::GlobalRelease(); + + if (g_user_mrs_lock) { + BAIDU_SCOPED_LOCK(*g_user_mrs_lock); + for (butil::FlatMap::iterator it = g_user_mrs->begin(); + it != g_user_mrs->end(); ++it) { + IbvDeregMr(it->second); } + g_user_mrs->clear(); + delete g_user_mrs; + g_user_mrs = NULL; } - delete g_addr_map_lock; - - RdmaEndpoint::GlobalRelease(); + delete g_user_mrs_lock; + g_user_mrs_lock = NULL; if (g_mrs) { for (size_t i = 0; i < g_mrs->size(); ++i) { @@ -463,6 +462,23 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } + g_user_mrs_lock = new (std::nothrow) butil::Mutex; + if (!g_user_mrs_lock) { + PLOG(WARNING) << "Fail to construct g_user_mrs_lock"; + ExitWithError(); + } + + g_user_mrs = new (std::nothrow) butil::FlatMap(); + if (!g_user_mrs) { + PLOG(WARNING) << "Fail to construct g_user_mrs"; + ExitWithError(); + } + + if (g_user_mrs->init(65536) < 0) { + PLOG(WARNING) << "Fail to initialize g_user_mrs"; + ExitWithError(); + } + g_mrs = new (std::nothrow) std::vector; if (!g_mrs) { PLOG(ERROR) << "Fail to allocate a RDMA MR list"; @@ -494,23 +510,6 @@ static void GlobalRdmaInitializeOrDieImpl() { ExitWithError(); } - g_addr_map_lock = new (std::nothrow) butil::Mutex; - if (!g_addr_map_lock) { - PLOG(WARNING) << "Fail to construct g_addr_map_lock"; - ExitWithError(); - } - - g_addr_map = new (std::nothrow) AddrMap; - if (!g_addr_map) { - PLOG(WARNING) << "Fail to construct g_addr_map"; - ExitWithError(); - } - - if (g_addr_map->init(65536) < 0) { - PLOG(WARNING) << "Fail to initialize g_addr_map"; - ExitWithError(); - } - SocketOptions opt; opt.fd = g_context->async_fd; butil::make_close_on_exec(opt.fd); @@ -543,25 +542,38 @@ void GlobalRdmaInitializeOrDie() { } } -int RegisterMemoryForRdma(void* buf, size_t len) { +uint32_t RegisterMemoryForRdma(void* buf, size_t len) { ibv_mr* mr = IbvRegMr(g_pd, buf, len, IBV_ACCESS_LOCAL_WRITE); if (!mr) { - return -1; + return 0; } - BAIDU_SCOPED_LOCK(*g_addr_map_lock); - if (!g_addr_map->insert(buf, mr)) { - IbvDeregMr(mr); - return -1; + { + BAIDU_SCOPED_LOCK(*g_user_mrs_lock); + if (!g_user_mrs->insert(buf, mr)) { + LOG(WARNING) << "Fail to insert to user mr maps (now there are " + << g_user_mrs->size() << " mrs already"; + } else { + return mr->lkey; + } } + IbvDeregMr(mr); return 0; } void DeregisterMemoryForRdma(void* buf) { - BAIDU_SCOPED_LOCK(*g_addr_map_lock); - ibv_mr** mr = g_addr_map->seek(buf); - if (mr && *mr) { - IbvDeregMr(*mr); - g_addr_map->erase(buf); + ibv_mr* mr = NULL; + { + BAIDU_SCOPED_LOCK(*g_user_mrs_lock); + ibv_mr** mr_ptr = g_user_mrs->seek(buf); + if (mr_ptr) { + mr = *mr_ptr; + g_user_mrs->erase(buf); + } + } + if (mr) { + IbvDeregMr(mr); + } else { + LOG(WARNING) << "Try to deregister a buffer which is not registered"; } } @@ -575,7 +587,7 @@ int GetRdmaCompVector() { } // g_comp_vector_index is not an atomic variable. If more than // one CQ is created at the same time, some CQs will share the - // same index. However, this vector is only used to assign a + // same index. However, this vector is only used to assign an // event queue for the CQ. Sharing the same event queue is not // a problem. return (g_comp_vector_index++) % g_context->num_comp_vectors; @@ -589,16 +601,13 @@ ibv_pd* GetRdmaPd() { return g_pd; } -uint32_t GetLKey(const void* buf) { - uint32_t lkey = GetRegionId(buf); - if (lkey == 0) { - BAIDU_SCOPED_LOCK(*g_addr_map_lock); - ibv_mr** mr = g_addr_map->seek(buf); - if (mr && *mr) { - lkey = (*mr)->lkey; - } +uint32_t GetLKey(void* buf) { + BAIDU_SCOPED_LOCK(*g_user_mrs_lock); + ibv_mr** mr_ptr = g_user_mrs->seek(buf); + if (mr_ptr) { + return (*mr_ptr)->lkey; } - return lkey; + return 0; } ibv_gid GetRdmaGid() { diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 72acd52478..5d176b4e7c 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -32,8 +32,10 @@ namespace rdma { void GlobalRdmaInitializeOrDie(); // Register the given memory -// Return 0 if success, -1 if failed and errno set -int RegisterMemoryForRdma(void* buf, size_t len); +// Return the memory lkey for the given memory, Return 0 when fails +// To use the memory in IOBuf, append_user_data_with_meta must be called +// and take the lkey as the data meta +uint32_t RegisterMemoryForRdma(void* buf, size_t len); // Deregister the given memory void DeregisterMemoryForRdma(void* buf); @@ -45,7 +47,7 @@ ibv_context* GetRdmaContext(); ibv_pd* GetRdmaPd(); // Return lkey of the given address -uint32_t GetLKey(const void* buf); +uint32_t GetLKey(void* buf); // Return GID Index uint8_t GetRdmaGidIndex(); diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index b637700aeb..cb98e71943 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -199,7 +199,12 @@ struct IOBuf::Block { uint16_t abi_check; // original cap, never be zero. uint32_t size; uint32_t cap; - Block* portal_next; + // When flag is 0, portal_next is valid. + // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data_meta is valid. + union { + Block* portal_next; + uint64_t data_meta; + } u; // When flag is 0, data points to `size` bytes starting at `(char*)this+sizeof(Block)' // When flag & IOBUF_BLOCK_FLAGS_USER_DATA is non-0, data points to the user data and // the deleter is put in UserDataExtension at `(char*)this+sizeof(Block)' @@ -211,7 +216,7 @@ struct IOBuf::Block { , abi_check(0) , size(0) , cap(data_size) - , portal_next(NULL) + , u({NULL}) , data(data_in) { iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed); iobuf::g_blockmem.fetch_add(data_size + sizeof(Block), @@ -224,7 +229,7 @@ struct IOBuf::Block { , abi_check(0) , size(data_size) , cap(data_size) - , portal_next(NULL) + , u({0}) , data(data_in) { get_user_data_extension()->deleter = deleter; } @@ -281,7 +286,7 @@ namespace iobuf { int block_shared_count(IOBuf::Block const* b) { return b->ref_count(); } IOBuf::Block* get_portal_next(IOBuf::Block const* b) { - return b->portal_next; + return b->u.portal_next; } uint32_t block_cap(IOBuf::Block const* b) { @@ -346,7 +351,7 @@ void remove_tls_block_chain() { tls_data.block_head = NULL; int n = 0; do { - IOBuf::Block* const saved_next = b->portal_next; + IOBuf::Block* const saved_next = b->u.portal_next; b->dec_ref(); b = saved_next; ++n; @@ -367,7 +372,7 @@ IOBuf::Block* share_tls_block() { if (b) { new_block = b; while (new_block && new_block->full()) { - IOBuf::Block* const saved_next = new_block->portal_next; + IOBuf::Block* const saved_next = new_block->u.portal_next; new_block->dec_ref(); --tls_data.num_blocks; new_block = saved_next; @@ -399,7 +404,7 @@ inline void release_tls_block(IOBuf::Block *b) { b->dec_ref(); g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed); } else { - b->portal_next = tls_data.block_head; + b->u.portal_next = tls_data.block_head; tls_data.block_head = b; ++tls_data.num_blocks; if (!tls_data.registered) { @@ -417,7 +422,7 @@ void release_tls_block_chain(IOBuf::Block* b) { if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) { do { ++n; - IOBuf::Block* const saved_next = b->portal_next; + IOBuf::Block* const saved_next = b->u.portal_next; b->dec_ref(); b = saved_next; } while (b); @@ -429,13 +434,13 @@ void release_tls_block_chain(IOBuf::Block* b) { do { ++n; CHECK(!b->full()); - if (b->portal_next == NULL) { + if (b->u.portal_next == NULL) { last_b = b; break; } - b = b->portal_next; + b = b->u.portal_next; } while (true); - last_b->portal_next = tls_data.block_head; + last_b->u.portal_next = tls_data.block_head; tls_data.block_head = first_b; tls_data.num_blocks += n; if (!tls_data.registered) { @@ -452,7 +457,7 @@ IOBuf::Block* acquire_tls_block() { return create_block(); } while (b->full()) { - IOBuf::Block* const saved_next = b->portal_next; + IOBuf::Block* const saved_next = b->u.portal_next; b->dec_ref(); tls_data.block_head = saved_next; --tls_data.num_blocks; @@ -461,9 +466,9 @@ IOBuf::Block* acquire_tls_block() { return create_block(); } } - tls_data.block_head = b->portal_next; + tls_data.block_head = b->u.portal_next; --tls_data.num_blocks; - b->portal_next = NULL; + b->u.portal_next = NULL; return b; } @@ -1206,7 +1211,10 @@ int IOBuf::appendv(const const_iovec* vec, size_t n) { return 0; } -int IOBuf::append_user_data(void* data, size_t size, void (*deleter)(void*)) { +int IOBuf::append_user_data_with_meta(void* data, + size_t size, + void (*deleter)(void*), + uint64_t meta) { if (size > 0xFFFFFFFFULL - 100) { LOG(FATAL) << "data_size=" << size << " is too large"; return -1; @@ -1219,11 +1227,23 @@ int IOBuf::append_user_data(void* data, size_t size, void (*deleter)(void*)) { deleter = ::free; } IOBuf::Block* b = new (mem) IOBuf::Block((char*)data, size, deleter); + b->u.data_meta = meta; const IOBuf::BlockRef r = { 0, b->cap, b }; _move_back_ref(r); return 0; } +uint64_t IOBuf::get_first_data_meta() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + if (!(r.block->flags & IOBUF_BLOCK_FLAGS_USER_DATA)) { + return 0; + } + return r.block->u.data_meta; +} + int IOBuf::resize(size_t n, char c) { const size_t saved_len = length(); if (n < saved_len) { @@ -1563,7 +1583,7 @@ ssize_t IOPortal::pappend_from_file_descriptor( return -1; } if (prev_p != NULL) { - prev_p->portal_next = p; + prev_p->u.portal_next = p; } else { _block = p; } @@ -1576,7 +1596,7 @@ ssize_t IOPortal::pappend_from_file_descriptor( break; } prev_p = p; - p = p->portal_next; + p = p->u.portal_next; } while (1); ssize_t nr = 0; @@ -1601,7 +1621,7 @@ ssize_t IOPortal::pappend_from_file_descriptor( _push_back_ref(r); _block->size += len; if (_block->full()) { - Block* const saved_next = _block->portal_next; + Block* const saved_next = _block->u.portal_next; _block->dec_ref(); // _block may be deleted _block = saved_next; } @@ -1624,7 +1644,7 @@ ssize_t IOPortal::append_from_reader(IReader* reader, size_t max_count) { return -1; } if (prev_p != NULL) { - prev_p->portal_next = p; + prev_p->u.portal_next = p; } else { _block = p; } @@ -1637,7 +1657,7 @@ ssize_t IOPortal::append_from_reader(IReader* reader, size_t max_count) { break; } prev_p = p; - p = p->portal_next; + p = p->u.portal_next; } while (1); const ssize_t nr = reader->ReadV(vec, nvec); @@ -1656,7 +1676,7 @@ ssize_t IOPortal::append_from_reader(IReader* reader, size_t max_count) { _push_back_ref(r); _block->size += len; if (_block->full()) { - Block* const saved_next = _block->portal_next; + Block* const saved_next = _block->u.portal_next; _block->dec_ref(); // _block may be deleted _block = saved_next; } @@ -1686,7 +1706,7 @@ ssize_t IOPortal::append_from_SSL_channel( _push_back_ref(r); _block->size += rc; if (_block->full()) { - Block* const saved_next = _block->portal_next; + Block* const saved_next = _block->u.portal_next; _block->dec_ref(); // _block may be deleted _block = saved_next; } diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 61b3e31a5f..8f4f823aa9 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -248,6 +248,15 @@ friend class IOBufCutter; // deleted using the deleter func when no IOBuf references it anymore. int append_user_data(void* data, size_t size, void (*deleter)(void*)); + // Append the user-data to back side WITHOUT copying. + // The meta is associated with this piece of user-data. + int append_user_data_with_meta(void* data, size_t size, void (*deleter)(void*), uint64_t meta); + + // Get the data meta of the first byte in this IOBuf. + // The meta is specified with append_user_data_with_meta before. + // 0 means the meta is invalid. + uint64_t get_first_data_meta(); + // Resizes the buf to a length of n characters. // If n is smaller than the current length, all bytes after n will be // truncated. diff --git a/src/butil/iobuf_inl.h b/src/butil/iobuf_inl.h index 6a3d215a76..f900b82dbb 100644 --- a/src/butil/iobuf_inl.h +++ b/src/butil/iobuf_inl.h @@ -37,6 +37,10 @@ inline ssize_t IOBuf::cut_multiple_into_file_descriptor( return pcut_multiple_into_file_descriptor(fd, -1, pieces, count); } +inline int IOBuf::append_user_data(void* data, size_t size, void (*deleter)(void*)) { + return append_user_data_with_meta(data, size, deleter, 0); +} + inline ssize_t IOPortal::append_from_file_descriptor(int fd, size_t max_count) { return pappend_from_file_descriptor(fd, -1, max_count); } diff --git a/test/brpc_rdma_unittest.cpp b/test/brpc_rdma_unittest.cpp index 158ba4107e..5c52bd639e 100644 --- a/test/brpc_rdma_unittest.cpp +++ b/test/brpc_rdma_unittest.cpp @@ -959,7 +959,8 @@ TEST_F(RdmaTest, server_hello_invalid_version) { usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); - ASSERT_EQ(0, butil::NetToHost32(*(uint32_t*)data)); + uint32_t* tmp = (uint32_t*)data; + ASSERT_EQ(0, butil::NetToHost32(*tmp)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); @@ -1010,7 +1011,8 @@ TEST_F(RdmaTest, server_hello_invalid_sq_rq_size) { usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); - ASSERT_EQ(0, butil::NetToHost32(*(uint32_t*)data)); + uint32_t* tmp = (uint32_t*)data; + ASSERT_EQ(0, butil::NetToHost32(*tmp)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); @@ -1061,7 +1063,8 @@ TEST_F(RdmaTest, server_miss_after_ack) { usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); - ASSERT_EQ(1, butil::NetToHost32(*(uint32_t*)data)); + uint32_t* tmp = (uint32_t*)data; + ASSERT_EQ(1, butil::NetToHost32(*tmp)); bthread_id_join(cntl.call_id()); ASSERT_EQ(ERPCTIMEDOUT, cntl.ErrorCode()); @@ -1112,7 +1115,8 @@ TEST_F(RdmaTest, server_close_after_ack) { usleep(100000); ASSERT_EQ(rdma::RdmaEndpoint::ESTABLISHED, s->_rdma_ep->_state); ASSERT_EQ(4, read(acc_fd, data, 4)); - ASSERT_EQ(1, butil::NetToHost32(*(uint32_t*)data)); + uint32_t* tmp = (uint32_t*)data; + ASSERT_EQ(1, butil::NetToHost32(*tmp)); close(acc_fd); bthread_id_join(cntl.call_id()); @@ -1841,6 +1845,8 @@ TEST_F(RdmaTest, rdma_use_selective_channel) { StopServer(); } +static void MockFree(void* buf) { } + TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) { if (!FLAGS_rdma_test_enable) { return; @@ -1870,27 +1876,36 @@ TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) { ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode()); attach.clear(); sleep(2); // wait for client recover from EHOSTDOWN - - void* mr[RPC_NUM]; - butil::IOBuf attachment[RPC_NUM]; - for (int i = 1; i < RPC_NUM; ++i) { - mr[i] = malloc(4096); - memset(mr[i], i % 100, 4096); - ASSERT_EQ(0, rdma::RegisterMemoryForRdma(mr[i], 4096)); - attachment[i].append_user_data(mr[i], 4096, NULL); + cntl[0].Reset(); + + char* mr[2 * RPC_NUM]; + uint32_t lkey[2 * RPC_NUM]; + for (size_t i = 0; i < RPC_NUM; ++i) { + mr[2 * i] = (char*)malloc(4096); + memset(mr[2 * i], i % 100, 4096); + lkey[2 * i] = rdma::RegisterMemoryForRdma(mr[2 * i], 4096); + ASSERT_TRUE(lkey[2 * i] != 0); + cntl[i].request_attachment().append_user_data_with_meta(mr[2 * i] + i, 4096 - i, MockFree, lkey[2 * i]); + mr[2 * i + 1] = (char*)malloc(4096); + memset(mr[2 * i + 1], i % 100, 4096); + lkey[2 * i + 1] = rdma::RegisterMemoryForRdma(mr[2 * i + 1], 4096); + ASSERT_TRUE(lkey[2 * i + 1] != 0); + cntl[i].request_attachment().append_user_data_with_meta(mr[2 * i + 1] + i, 4096 - i, MockFree, lkey[2 * i + 1]); req[i].set_message(__FUNCTION__); - cntl[i].request_attachment().append(attachment[i]); google::protobuf::Closure* done = DoNothing(); ::test::EchoService::Stub(&channel).Echo(&cntl[i], &req[i], &res[i], done); } - for (int i = 1; i < RPC_NUM; ++i) { + for (size_t i = 0; i < RPC_NUM; ++i) { bthread_id_join(cntl[i].call_id()); ASSERT_EQ(0, cntl[i].ErrorCode()) << "req[" << i << "]"; rdma::DeregisterMemoryForRdma(mr[i]); - ASSERT_EQ(4096, cntl[i].response_attachment().size()); - char tmp[4096]; - cntl[i].response_attachment().copy_to(tmp, 4096); - ASSERT_EQ(0, memcmp(mr[i], tmp, 4096)); + ASSERT_EQ(2 * (4096 - i), cntl[i].response_attachment().size()); + char tmp[8192]; + cntl[i].response_attachment().copy_to(tmp, 2 * (4096 - i)); + ASSERT_EQ(0, memcmp(mr[2 * i] + i, tmp, 4096 - i)); + ASSERT_EQ(0, memcmp(mr[2 * i + 1] + i, tmp + 4096 - i, 4096 - i)); + free(mr[2 * i]); + free(mr[2 * i + 1]); } StopServer(); diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp index 7691d7977d..5add0104b6 100644 --- a/test/iobuf_unittest.cpp +++ b/test/iobuf_unittest.cpp @@ -1657,6 +1657,24 @@ TEST_F(IOBufTest, append_user_data_and_share) { ASSERT_EQ(data, my_free_params); } +TEST_F(IOBufTest, append_user_data_with_meta) { + butil::IOBuf b0; + const int REP = 16; + const size_t len = 256; + char* data[REP]; + for (int i = 0; i < REP; ++i) { + data[i] = (char*)malloc(len); + ASSERT_EQ(0, b0.append_user_data_with_meta(data[i], len, my_free, i)); + } + for (int i = 0; i < REP; ++i) { + ASSERT_EQ(i, b0.get_first_data_meta()); + butil::IOBuf out; + ASSERT_EQ(len / 2, b0.cutn(&out, len / 2)); + ASSERT_EQ(i, b0.get_first_data_meta()); + ASSERT_EQ(len / 2, b0.cutn(&out, len / 2)); + } +} + TEST_F(IOBufTest, share_tls_block) { butil::iobuf::remove_tls_block_chain(); butil::IOBuf::Block* b = butil::iobuf::acquire_tls_block();