Skip to content

Commit

Permalink
set tcp_no_delay option
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan committed Oct 23, 2015
1 parent 0dbfbbd commit 8acfe52
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 55 deletions.
2 changes: 1 addition & 1 deletion src/sofa/pbrpc/ptime.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ inline std::string ptime_to_string(const PTime& t)
PTime::date_type date = lt.date();
TimeDuration tod = lt.time_of_day();
char buf[64];
snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d.%06ld",
snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d.%06lld",
(int)date.year(),
(int)date.month(),
(int)date.day(),
Expand Down
44 changes: 43 additions & 1 deletion src/sofa/pbrpc/rpc_byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@
#include <sofa/pbrpc/common_internal.h>
#include <sofa/pbrpc/rpc_endpoint.h>

// If SOFA_PBRPC_TCP_NO_DELAY == true, means disable the Nagle algorithm.
//
// Nagle algorithm may cause an extra delay in some cases, because if
// the data in a single write spans 2n packets, the last packet will be
// withheld, waiting for the ACK for the previous packet. For more, please
// refer to <https://en.wikipedia.org/wiki/Nagle's_algorithm>.
//
// Disabling the Nagle algorithm would cause these affacts:
// * decrease delay time (positive affact)
// * decrease the qps (negative affact)
#ifndef SOFA_PBRPC_TCP_NO_DELAY
#define SOFA_PBRPC_TCP_NO_DELAY true
#endif

namespace sofa {
namespace pbrpc {

Expand Down Expand Up @@ -89,6 +103,20 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
_last_rw_ticks = _ticks;

boost::system::error_code ec;
_socket.set_option(tcp::no_delay(SOFA_PBRPC_TCP_NO_DELAY), ec);
if (ec)
{
#if defined( LOG )
LOG(ERROR) << "on_connect(): set no_delay option failed: "
<< ec.message();
#else
SLOG(ERROR, "on_connect(): set no_delay option failed: %s",
ec.message().c_str());
#endif
close("init stream failed: " + ec.message());
return;
}

_local_endpoint = _socket.local_endpoint(ec);
if (ec)
{
Expand Down Expand Up @@ -207,7 +235,7 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
void async_write_some(const char* data, size_t size)
{
SOFA_PBRPC_FUNCTION_TRACE;

_socket.async_write_some(boost::asio::buffer(data, size),
boost::bind(&RpcByteStream::on_write_some,
shared_from_this(), _1, _2));
Expand Down Expand Up @@ -253,6 +281,20 @@ class RpcByteStream : public sofa::pbrpc::enable_shared_from_this<RpcByteStream>
}

boost::system::error_code ec;
_socket.set_option(tcp::no_delay(SOFA_PBRPC_TCP_NO_DELAY), ec);
if (ec)
{
#if defined( LOG )
LOG(ERROR) << "on_connect(): set no_delay option failed: "
<< ec.message();
#else
SLOG(ERROR, "on_connect(): set no_delay option failed: %s",
ec.message().c_str());
#endif
close("init stream failed: " + ec.message());
return;
}

_local_endpoint = _socket.local_endpoint(ec);
if (ec)
{
Expand Down
8 changes: 4 additions & 4 deletions src/sofa/pbrpc/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ struct RpcClientOptions {

int keep_alive_time; // keep alive time of idle connections.
// idle connections will be closed if no read/write for this time.
// in seconds, should >= -1, -1 means for ever, default 65.
// in seconds, should >= -1, -1 means for ever, default -1.

int max_pending_buffer_size; // max buffer size of the pending send queue for each connection.
// in MB, should >= 0, 0 means no buffer, default 2.
// in MB, should >= 0, 0 means no buffer, default 100.

// Network throughput limit.
// The network bandwidth is shared by all connections:
Expand All @@ -38,8 +38,8 @@ struct RpcClientOptions {
RpcClientOptions()
: work_thread_num(4)
, callback_thread_num(4)
, keep_alive_time(65)
, max_pending_buffer_size(2)
, keep_alive_time(-1)
, max_pending_buffer_size(100)
, max_throughput_in(-1)
, max_throughput_out(-1)
{}
Expand Down
8 changes: 3 additions & 5 deletions src/sofa/pbrpc/rpc_message_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ class RpcMessageStream : public RpcByteStream
}

if (_role_type == ROLE_TYPE_SERVER
&& pending_buffer_size() > _max_pending_buffer_size)
&& pending_buffer_size() > max_pending_buffer_size())
{
// sending buffer full, should suspend receiving to wait
return false;
Expand Down Expand Up @@ -695,10 +695,8 @@ class RpcMessageStream : public RpcByteStream
TranBufPool::free(_tran_buf);
_tran_buf = NULL;
}

_tran_buf = reinterpret_cast<char*>(TranBufPool::malloc(std::min(
SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR,
_receiving_message->BlockCount())));
_tran_buf = reinterpret_cast<char*>(
TranBufPool::malloc(SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR));
if(_tran_buf == NULL)
{
#if defined( LOG )
Expand Down
4 changes: 2 additions & 2 deletions src/sofa/pbrpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct RpcServerOptions {
// in seconds, should >= -1, -1 means forever, default -1.

int max_pending_buffer_size; // max buffer size of the pending send queue for one connection.
// in MB, should >= 0, 0 means no buffer, default 2.
// in MB, should >= 0, 0 means no buffer, default 100.

// Network throughput limit.
// The network bandwidth is shared by all connections:
Expand Down Expand Up @@ -58,7 +58,7 @@ struct RpcServerOptions {
RpcServerOptions()
: work_thread_num(8)
, keep_alive_time(-1)
, max_pending_buffer_size(2)
, max_pending_buffer_size(100)
, max_throughput_in(-1)
, max_throughput_out(-1)
, disable_builtin_services(false)
Expand Down
11 changes: 3 additions & 8 deletions src/sofa/pbrpc/rpc_server_message_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ class RpcServerMessageStream : public RpcByteStream
}

if (_role_type == ROLE_TYPE_SERVER
&& pending_buffer_size() > _max_pending_buffer_size)
&& pending_buffer_size() > max_pending_buffer_size())
{
// sending buffer full, should suspend receiving to wait
return false;
Expand Down Expand Up @@ -669,13 +669,8 @@ class RpcServerMessageStream : public RpcByteStream
_tran_buf = NULL;
}

int factor = 0;
if (_current_rpc_request_parser)
{
factor = std::min(SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR,
_current_rpc_request_parser->CurrentBlockCount());
}
_tran_buf = reinterpret_cast<char*>(TranBufPool::malloc(factor));
_tran_buf = reinterpret_cast<char*>(
TranBufPool::malloc(SOFA_PBRPC_TRAN_BUF_BLOCK_MAX_FACTOR));
if(_tran_buf == NULL)
{
#if defined( LOG )
Expand Down
2 changes: 1 addition & 1 deletion src/sofa/pbrpc/tran_buf_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <sofa/pbrpc/common_internal.h>

#ifndef SOFA_PBRPC_TRAN_BUF_BLOCK_BASE_SIZE
// block_base_size = 1K
// base_block_size = 1K
#define SOFA_PBRPC_TRAN_BUF_BLOCK_BASE_SIZE (1024u)
#endif

Expand Down
4 changes: 3 additions & 1 deletion test/perf_test/client_serial.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ int main(int argc, char** argv)
}
if (response->message().size() != request->message().size()
|| response->message().at(0) != request->message().at(0)
|| response->message().at(response->message().size() / 2)
!= request->message().at(request->message().size() / 2)
|| response->message().at(response->message().size() - 1)
!= request->message().at(request->message().size() - 1)) {
!= request->message().at(request->message().size() - 1)) {
SLOG(ERROR, "response not matched");
break;
}
Expand Down
51 changes: 19 additions & 32 deletions test/perf_test/test_delay.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,25 @@ then
at_exit
fi

echo
echo "============= test 1K data =============="
echo "./$CLIENT $HOST $PORT 900 &>$CLIENT_LOG &"
./$CLIENT $HOST $PORT 900 &>$CLIENT_LOG &
echo "sleep 3"
sleep 3
grep -o -a 'elapsed time in us: [0-9]*$' $CLIENT_LOG \
| awk 'BEGIN{sum=0;num=0}{if(NR > 10){sum+=$5;++num;}}END{ \
print "Succeed count: " num; \
print "Average elapsed time per request: " (sum/num) "us";}'

echo
echo "============= test 1M data =============="
echo "./$CLIENT $HOST $PORT 1000000 &>$CLIENT_LOG &"
./$CLIENT $HOST $PORT 1000000 &>$CLIENT_LOG &
echo "sleep 10"
sleep 10
grep -o -a 'elapsed time in us: [0-9]*$' $CLIENT_LOG \
| awk 'BEGIN{sum=0;num=0}{sum+=$5;++num;}END{ \
print "Succeed count: " num; \
print "Average elapsed time per request: " (sum/num) "us";}'

echo
echo "============= test 10M data =============="
echo "./$CLIENT $HOST $PORT 10000000 &>$CLIENT_LOG &"
./$CLIENT $HOST $PORT 10000000 &>$CLIENT_LOG &
echo "sleep 10"
sleep 10
grep -o -a 'elapsed time in us: [0-9]*$' $CLIENT_LOG \
| awk 'BEGIN{sum=0;num=0}{sum+=$5;++num;}END{ \
print "Succeed count: " num; \
print "Average elapsed time per request: " (sum/num) "us";}'
for i in "100 0.1K" "900 1K" "10000 10K" "100000 100K" "1000000 1M" "10000000 10M"; do
arr=($i)
num=${arr[0]}
str=${arr[1]}
echo
echo "============= test $str data =============="
echo "./$CLIENT $HOST $PORT $num &>$CLIENT_LOG &"
./$CLIENT $HOST $PORT $num &>$CLIENT_LOG &
PID=$!
SLEEP=5
echo "sleep $SLEEP"
sleep $SLEEP
grep -o -a 'elapsed time in us: [0-9]*$' $CLIENT_LOG \
| awk 'BEGIN{sum=0;num=0}{if(NR > 10){sum+=$5;++num;}}END{ \
print "Succeed count: " num; \
print "Average elapsed time per request: " (sum/num) "us";}'
kill $PID
sleep 1
done

echo
at_exit
Expand Down

0 comments on commit 8acfe52

Please sign in to comment.