Skip to content

Commit

Permalink
Merge pull request sahlberg#428 from dgaleano-ilm/awaiting_limit
Browse files Browse the repository at this point in the history
Allow setting a limit on the number of commands in-flight.
  • Loading branch information
sahlberg authored Jul 12, 2023
2 parents 9798380 + 4eb85e9 commit 133ba79
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 2 deletions.
2 changes: 2 additions & 0 deletions include/libnfs-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ struct rpc_context {
uint32_t num_hashes;
struct rpc_queue *waitpdu;
uint32_t waitpdu_len;
uint32_t max_waitpdu_len;

#ifdef HAVE_MULTITHREADING
int multithreading_enabled;
libnfs_mutex_t rpc_mutex;
Expand Down
19 changes: 19 additions & 0 deletions include/nfsc/libnfs-raw.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,28 @@ EXTERN int rpc_service(struct rpc_context *rpc, int revents);
* Returns the number of commands in-flight. Can be used by the application
* to check if there are any more responses we are awaiting from the server
* or if the connection is completely idle.
* The number returned includes the commands on the output queue and the
* commands waiting from a response from the server.
*/
EXTERN int rpc_queue_length(struct rpc_context *rpc);

/*
* Returns the number of commands awaiting from the server.
* Can be used by the application to check if there are any
* more responses we are awaiting from the server
* or if the connection is completely idle.
*/
EXTERN int rpc_get_num_awaiting(struct rpc_context *rpc);

/*
* Used to limit the total number of commands awaiting from the server.
* By default there is no limit, all commands will be sent as soon as possible.
* If a limit is set and it is reached then new commands will be kept on
* the output queue until the total number of commands in-flight goes below
* the limit again.
*/
EXTERN void rpc_set_awaiting_limit(struct rpc_context *rpc, int limit);

/*
* Set which UID/GIDs to use in the authenticator.
* By default libnfs will use getuid()/getgid() where available
Expand Down
2 changes: 2 additions & 0 deletions lib/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ struct rpc_context *rpc_init_context(void)
rpc->gid = getgid();
#endif
rpc_reset_queue(&rpc->outqueue);
/* Default is no limit */
rpc->max_waitpdu_len = 0;

/* Default is no timeout */
rpc->timeout = -1;
Expand Down
22 changes: 20 additions & 2 deletions lib/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,11 @@ rpc_write_to_socket(struct rpc_context *rpc)
#endif /* HAVE_MULTITHREADING */

/* Write several pdus at once */
while ((pdu = rpc->outqueue.head) != NULL) {
while ((rpc->max_waitpdu_len == 0 ||
rpc->max_waitpdu_len > rpc->waitpdu_len) &&
(pdu = rpc->outqueue.head) != NULL) {
int niov = 0;
uint32_t num_pdus = 0;
char *last_buf = NULL;
ssize_t count;

Expand Down Expand Up @@ -289,8 +292,11 @@ rpc_write_to_socket(struct rpc_context *rpc)
}
}

num_pdus++;
pdu = pdu->next;
} while (pdu != NULL && niov < RPC_MAX_VECTORS);
} while ((rpc->max_waitpdu_len == 0 ||
rpc->max_waitpdu_len > (rpc->waitpdu_len + num_pdus)) &&
pdu != NULL && niov < RPC_MAX_VECTORS);

count = writev(rpc->fd, iov, niov);
if (count == -1) {
Expand Down Expand Up @@ -1200,6 +1206,18 @@ rpc_queue_length(struct rpc_context *rpc)
return i;
}

int rpc_get_num_awaiting(struct rpc_context *rpc)
{
return rpc->waitpdu_len;
}

void rpc_set_awaiting_limit(struct rpc_context *rpc, int limit)
{
assert(rpc->magic == RPC_CONTEXT_MAGIC);

rpc->max_waitpdu_len = limit;
}

void
rpc_set_fd(struct rpc_context *rpc, int fd)
{
Expand Down

0 comments on commit 133ba79

Please sign in to comment.