Skip to content

Commit

Permalink
Merge pull request #490 from cole-miller/context-instant
Browse files Browse the repository at this point in the history
client/protocol: Switch context from budget to deadline
  • Loading branch information
Mathieu Borderé authored Apr 3, 2023
2 parents 63eaabe + 09927c5 commit 1bba922
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 97 deletions.
167 changes: 82 additions & 85 deletions src/client/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ static void makeValueOwned(struct value *val)
}

/* Free the owned data of a value, which must have had makeValueOwned called
* on it previously. */
* on it previously. This takes its argument by value because it does *not*
* free the memory that stores the `struct value` itself, only the pointers
* held by `struct value`. */
static void freeOwnedValue(struct value val)
{
switch (val.type) {
Expand Down Expand Up @@ -98,162 +100,145 @@ static int peekUint64(struct cursor cursor, uint64_t *val)
*
* - The full count n of bytes is read.
* - A read returns 0 (EOF).
* - The time budget is exhausted.
* - The context's deadline is reached.
* - An error occurs.
*
* On error, -1 is returned. Otherwise the return value is the count
* of bytes read. This may be less than n if either EOF happened or
* the time budget was exhausted. */
static ssize_t doRead(int fd, void *buf, size_t n, struct client_context *context)
* the deadline kicked in. */
static ssize_t doRead(int fd, void *buf, size_t buf_len, struct client_context *context)
{
bool have_budget = context != NULL && context->budget_millis >= 0;
size_t got = 0;
ssize_t total;
struct pollfd pfd;
struct timespec prev;
struct timespec cur;
int diff_millis;
ssize_t rv;

if (have_budget) {
rv = clock_gettime(CLOCK_MONOTONIC, &prev);
if (rv != 0) {
return -1;
}
}
struct timespec now;
long millis;
ssize_t n;
int rv;

pfd.fd = fd;
pfd.events = POLLIN;
pfd.revents = 0;

while (got < n) {
rv = poll(&pfd, 1, (context == NULL) ? -1 : context->budget_millis);
total = 0;
while ((size_t)total < buf_len) {
rv = clock_gettime(CLOCK_REALTIME, &now);
assert(rv == 0);
if (context != NULL) {
millis = (context->deadline.tv_sec - now.tv_sec) * 1000
+ (context->deadline.tv_nsec - now.tv_nsec) / 1000000;
if (millis < 0) {
/* poll(2) will block indefinitely if the timeout argument is
* negative, and we don't want that here. Signal a timeout. */
break;
}
} else {
/* The caller has explicitly asked us to block indefinitely. */
millis = -1;
}
rv = poll(&pfd, 1, (millis > INT_MAX) ? INT_MAX : (int)millis);
if (rv < 0) {
if (errno == EINTR) {
continue;
} else {
return rv;
return -1;
}
} else if (rv == 0) {
/* Timeout */
break;
}
assert(rv == 1);
if (pfd.revents != POLLIN) {
/* If some other bits are set in the out parameter, an error occurred. */
return -1;
}

/* Update the time budget */
if (have_budget) {
rv = clock_gettime(CLOCK_MONOTONIC, &cur);
if (rv != 0) {
return -1;
}
diff_millis = (int)(cur.tv_sec - prev.tv_sec) * 1000 +
(int)((cur.tv_nsec - prev.tv_nsec) / 1000000);
assert(diff_millis >= 0);
context->budget_millis -= diff_millis;
if (context->budget_millis <= 0) {
/* Timeout */
break;
}
prev = cur;
}

rv = read(fd, (char *)buf + got, n - got);
if (rv < 0) {
n = read(fd, (char *)buf + (size_t)total, buf_len - (size_t)total);
if (n < 0) {
if (errno == EINTR) {
continue;
} else {
return rv;
return -1;
}
} else if (rv == 0) {
} else if (n == 0) {
/* EOF */
break;
}
got += (size_t)rv;
total += n;
}
return (ssize_t)got;
return total;
}

/* Write data into fd from buf until one of the following occurs:
*
* - The full count n of bytes is written.
* - A write returns 0 (EOF).
* - The time budget is exhausted.
* - The context's deadline is reached.
* - An error occurs.
*
* On error, -1 is returned. Otherwise the return value is the count
* of bytes written. This may be less than n if either EOF happened or
* the time budget was exhausted. */
static ssize_t doWrite(int fd, void *buf, size_t n, struct client_context *context)
* the deadline kicked in. */
static ssize_t doWrite(int fd, void *buf, size_t buf_len, struct client_context *context)
{
bool have_budget = context != NULL && context->budget_millis >= 0;
size_t wrote = 0;
ssize_t total;
struct pollfd pfd;
struct timespec prev;
struct timespec cur;
int diff_millis;
ssize_t rv;

if (have_budget) {
rv = clock_gettime(CLOCK_MONOTONIC, &prev);
if (rv != 0) {
return -1;
}
}
struct timespec now;
long millis;
ssize_t n;
int rv;

pfd.fd = fd;
pfd.events = POLLOUT;
pfd.revents = 0;

while (wrote < n) {
rv = poll(&pfd, 1, (context == NULL) ? -1 : context->budget_millis);
total = 0;
while ((size_t)total < buf_len) {
rv = clock_gettime(CLOCK_REALTIME, &now);
assert(rv == 0);
if (context != NULL) {
millis = (context->deadline.tv_sec - now.tv_sec) * 1000
+ (context->deadline.tv_nsec - now.tv_nsec) / 1000000;
if (millis < 0) {
/* poll(2) will block indefinitely if the timeout argument is
* negative, and we don't want that here. Signal a timeout. */
break;
}
} else {
/* The caller has explicitly asked us to block indefinitely. */
millis = -1;
}
rv = poll(&pfd, 1, (millis > INT_MAX) ? INT_MAX : (int)millis);
if (rv < 0) {
if (errno == EINTR) {
continue;
} else {
return rv;
return -1;
}
} else if (rv == 0) {
/* Timeout */
break;
}
assert(rv == 1);
if (pfd.revents != POLLOUT) {
/* If some other bits are set in the out parameter, an error
* occurred. */
return -1;
}

/* Update the time budget */
if (have_budget) {
rv = clock_gettime(CLOCK_MONOTONIC, &cur);
if (rv != 0) {
return -1;
}
diff_millis = (int)(cur.tv_sec - prev.tv_sec) * 1000 +
(int)((cur.tv_nsec - prev.tv_nsec) / 1000000);
assert(diff_millis >= 0);
context->budget_millis -= diff_millis;
if (context->budget_millis <= 0) {
/* Timeout */
break;
}
prev = cur;
}

rv = write(fd, (char *)buf + wrote, n - wrote);
if (rv < 0) {
n = write(fd, (char *)buf + (size_t)total, buf_len - (size_t)total);
if (n < 0) {
if (errno == EINTR) {
continue;
} else {
return rv;
return -1;
}
} else if (rv == 0) {
} else if (n == 0) {
/* EOF */
break;
}
wrote += (size_t)rv;
total += n;
}
return (ssize_t)wrote;
return total;
}

static int handleFailure(struct client_proto *c)
Expand All @@ -280,6 +265,18 @@ static int handleFailure(struct client_proto *c)
return DQLITE_CLIENT_PROTO_RECEIVED_FAILURE;
}

void clientContextMillis(struct client_context *context, long millis)
{
int rv;
rv = clock_gettime(CLOCK_REALTIME, &context->deadline);
assert(rv == 0);
context->deadline.tv_nsec += millis * 1000000;
while (context->deadline.tv_nsec >= 1000000000) {
context->deadline.tv_nsec -= 1000000000;
context->deadline.tv_sec += 1;
}
}

int clientInit(struct client_proto *c, int fd)
{
tracef("init client");
Expand Down
24 changes: 12 additions & 12 deletions src/client/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum {
* It is not generally safe to continue using the client_proto object
* after receiving this error code. */
DQLITE_CLIENT_PROTO_SHORT,
/* Another kind of error occurred, like a syscall or malloc failure.
/* Another kind of error occurred, like a syscall failure.
*
* It is not generally safe to continue using the client_proto object
* after receiving this error code. */
Expand All @@ -50,19 +50,15 @@ struct client_proto
* Passing NULL for the context argument is permitted and disables all timeouts. */
struct client_context
{
/* If budget_millis is negative when a Send or Recv function is called, reading
* or writing may block indefinitely and the value is not modified. Otherwise,
* the initial value caps the number of milliseconds that will be spent attempting
* the send or receive operation (potentially split between multiple read/write syscalls).
* If it's not negative initially, budget_millis is modified by subtracting the
* number of milliseconds actually spent. If the time budget runs out without
* completing the operation, DQLITE_CLIENT_PROTO_SHORT is returned and the values
* of any output parameters are undefined. Because this implies that we failed to
* read/write a complete message from/to the fd, it's important to call clientClose
* immediately and not keep using the client_proto object. */
int budget_millis;
/* An absolute CLOCK_REALTIME timestamp that limits how long will be spent
* trying to complete the requested send or receive operation. Whenever we
* are about to make a blocking syscall (read or write), we first poll(2)
* using a timeout computed based on how much time remains before the deadline.
* If the poll times out, we return early instead of completing the operation. */
struct timespec deadline;
};

/* TODO Consider using a dynamic array instead of a linked list here? */
struct row
{
struct value *values;
Expand Down Expand Up @@ -90,6 +86,10 @@ struct client_file
void *blob;
};

/* Initialize a context whose deadline will fall after the given duration
* in milliseconds. */
void clientContextMillis(struct client_context *context, long millis);

/* Initialize a new client, writing requests to fd. */
int clientInit(struct client_proto *c, int fd);

Expand Down

0 comments on commit 1bba922

Please sign in to comment.