Skip to content

Commit

Permalink
Merge branch 'job-rsync_timeout_poll'
Browse files Browse the repository at this point in the history
  • Loading branch information
ydahhrk committed Sep 3, 2024
2 parents 2c29c59 + a38a95a commit 4ee88d1
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 52 deletions.
14 changes: 14 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ description: Guide to use arguments of FORT Validator.
51. [`--rsync.priority`](#--rsyncpriority)
53. [`--rsync.retry.count`](#--rsyncretrycount)
54. [`--rsync.retry.interval`](#--rsyncretryinterval)
40. [`--rsync.transfer-timeout`](#--rsynctransfer-timeout)
55. [`--configuration-file`](#--configuration-file)
56. [`rsync.program`](#rsyncprogram)
57. [`rsync.arguments-recursive`](#rsyncarguments-recursive)
Expand Down Expand Up @@ -103,6 +104,7 @@ description: Guide to use arguments of FORT Validator.
[--rsync.priority=<unsigned integer>]
[--rsync.retry.count=<unsigned integer>]
[--rsync.retry.interval=<unsigned integer>]
[--rsync.transfer-timeout=<unsigned integer>]
[--http.enabled=true|false]
[--http.priority=<unsigned integer>]
[--http.retry.count=<unsigned integer>]
Expand Down Expand Up @@ -931,6 +933,17 @@ Whenever is necessary to execute an RSYNC, the validator will try at least one t

Period of time (in seconds) to wait between each retry to execute an RSYNC.

### `--rsync.transfer-timeout`

- **Type:** Integer
- **Availability:** `argv` and JSON
- **Default:** 900
- **Range:** [0, [`UINT_MAX`](http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html)]

Maximum time in seconds that the rsync transfer can last.

Once the connection is established with the server, the request will last a maximum of `rsync.transfer-timeout` seconds. A value of 0 means unlimited time.

### `--configuration-file`

- **Type:** String (Path to file)
Expand Down Expand Up @@ -974,6 +987,7 @@ The configuration options are mostly the same as the ones from the `argv` interf
"<a href="#--rsyncretrycount">count</a>": 1,
"<a href="#--rsyncretryinterval">interval</a>": 4
},
"<a href="#--rsynctransfer-timeout">transfer-timeout</a>": 0,
"<a href="#rsyncprogram">program</a>": "rsync",
"<a href="#rsyncarguments-recursive">arguments-recursive</a>": [
"-rtz",
Expand Down
1 change: 1 addition & 0 deletions examples/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"count": 2,
"interval": 5
},
"transfer-timeout": 900,
"program": "rsync",
"arguments-recursive": [
"--recursive",
Expand Down
12 changes: 12 additions & 0 deletions man/fort.8
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,18 @@ By default, the value is \fI5\fR.
.RE
.P

.B \-\-rsync.transfer\-timeout=\fIUNSIGNED_INTEGER\fR
.RS 4
Maximum time in seconds that the rsync process can last.
.P
Once the connection is established with the server, the request will last a
maximum of \fBrsync.transfer-timeout\fR seconds. A value of \fI0\fR means
unlimited time (default value).
.P
By default, it has a value of \fI900\fR.
.RE
.P

.B \-\-output.roa=\fIFILE\fR
.RS 4
File where the ROAs will be printed in the configured format (see
Expand Down
16 changes: 16 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ struct rpki_config {
/* Interval (in seconds) between each retry */
unsigned int interval;
} retry;
unsigned int transfer_timeout;
char *program;
struct {
struct string_array flat; /* Deprecated */
Expand Down Expand Up @@ -486,6 +487,14 @@ static const struct option_field options[] = {
/* Unlimited */
.max = 0,
.deprecated = true,
}, {
.id = 3008,
.name = "rsync.transfer-timeout",
.type = &gt_uint,
.offset = offsetof(struct rpki_config, rsync.transfer_timeout),
.doc = "Maximum transfer time before killing the rsync process",
.min = 0,
.max = UINT_MAX,
},

/* HTTP requests parameters */
Expand Down Expand Up @@ -946,6 +955,7 @@ set_default_values(void)
rpki_config.rsync.strategy = pstrdup("<deprecated>");
rpki_config.rsync.retry.count = 1;
rpki_config.rsync.retry.interval = 4;
rpki_config.rsync.transfer_timeout = 900;
rpki_config.rsync.program = pstrdup("rsync");
string_array_init(&rpki_config.rsync.args.flat,
flat_rsync_args, ARRAY_LEN(flat_rsync_args));
Expand Down Expand Up @@ -1342,6 +1352,12 @@ config_get_rsync_retry_interval(void)
return rpki_config.rsync.retry.interval;
}

long
config_get_rsync_transfer_timeout(void)
{
return rpki_config.rsync.transfer_timeout;
}

char *
config_get_rsync_program(void)
{
Expand Down
1 change: 1 addition & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ bool config_get_rsync_enabled(void);
unsigned int config_get_rsync_priority(void);
unsigned int config_get_rsync_retry_count(void);
unsigned int config_get_rsync_retry_interval(void);
long config_get_rsync_transfer_timeout(void);
char *config_get_rsync_program(void);
struct string_array const *config_get_rsync_args(void);
bool config_get_http_enabled(void);
Expand Down
181 changes: 129 additions & 52 deletions src/rsync/rsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <sys/wait.h>
#include <syslog.h>
Expand All @@ -11,6 +12,11 @@
#include "config.h"
#include "log.h"

#define STDERR_WRITE(fds) fds[0][1]
#define STDOUT_WRITE(fds) fds[1][1]
#define STDERR_READ(fds) fds[0][0]
#define STDOUT_READ(fds) fds[1][0]

/*
* Duplicate parent FDs, to pipe rsync output:
* - fds[0] = stderr
Expand All @@ -20,15 +26,15 @@ static void
duplicate_fds(int fds[2][2])
{
/* Use the loop to catch interruptions */
while ((dup2(fds[0][1], STDERR_FILENO) == -1)
while ((dup2(STDERR_WRITE(fds), STDERR_FILENO) == -1)
&& (errno == EINTR)) {}
close(fds[0][1]);
close(fds[0][0]);
close(STDERR_WRITE(fds));
close(STDERR_READ(fds));

while ((dup2(fds[1][1], STDOUT_FILENO) == -1)
while ((dup2(STDOUT_WRITE(fds), STDOUT_FILENO) == -1)
&& (errno == EINTR)) {}
close(fds[1][1]);
close(fds[1][0]);
close(STDOUT_WRITE(fds));
close(STDOUT_READ(fds));
}

static void
Expand Down Expand Up @@ -109,8 +115,8 @@ create_pipes(int fds[2][2])
error = errno;

/* Close pipe previously created */
close(fds[0][0]);
close(fds[0][1]);
close(STDERR_READ(fds));
close(STDERR_WRITE(fds));

pr_op_err_st("Piping rsync stdout: %s", strerror(error));
return -error;
Expand All @@ -119,8 +125,17 @@ create_pipes(int fds[2][2])
return 0;
}

static long
get_current_millis(void)
{
struct timespec now;
if (clock_gettime(CLOCK_MONOTONIC, &now) < 0)
pr_crit("clock_gettime() returned %d", errno);
return 1000L * now.tv_sec + now.tv_nsec / 1000000L;
}

static void
log_buffer(char const *buffer, ssize_t read, int type)
log_buffer(char const *buffer, ssize_t read, bool is_error)
{
#define PRE_RSYNC "[RSYNC exec]: "
char *cpy, *cur, *tmp;
Expand All @@ -138,68 +153,130 @@ log_buffer(char const *buffer, ssize_t read, int type)
cur = tmp + 1;
continue;
}
if (type == 0) {
if (is_error)
pr_val_err(PRE_RSYNC "%s", cur);
} else {
else
pr_val_debug(PRE_RSYNC "%s", cur);
}
cur = tmp + 1;
}
free(cpy);
#undef PRE_RSYNC
}

#define DROP_FD(f, fail) \
do { \
pfd[f].fd = -1; \
error |= fail; \
} while (0)
#define CLOSE_FD(f, fail) \
do { \
close(pfd[f].fd); \
DROP_FD(f, fail); \
} while (0)

/*
* Consumes (and throws away) all the bytes in read streams @fderr and @fdout,
* then closes them once they reach end of stream.
*
* Returns: ok -> 0, error -> 1, timeout -> 2.
*/
static int
read_pipe(int fd_pipe[2][2], int type)
exhaust_read_fds(int fderr, int fdout)
{
char buffer[4096];
ssize_t count;
int error;
struct pollfd pfd[2];
int error, nready, f;
long epoch, delta, timeout;

memset(&pfd, 0, sizeof(pfd));
pfd[0].fd = fderr;
pfd[0].events = POLLIN;
pfd[1].fd = fdout;
pfd[1].events = POLLIN;

error = 0;

epoch = get_current_millis();
delta = 0;
timeout = 1000 * config_get_rsync_transfer_timeout();

while (1) {
count = read(fd_pipe[type][0], buffer, sizeof(buffer));
if (count == -1) {
nready = poll(pfd, 2, timeout - delta);
if (nready == 0)
goto timed_out;
if (nready == -1) {
error = errno;
if (error == EINTR)
continue;
close(fd_pipe[type][0]); /* Close read end */
pr_val_err("rsync buffer read error: %s",
strerror(error));
return -error;
pr_val_err("rsync bad poll: %s", strerror(error));
error = 1;
goto fail;
}

for (f = 0; f < 2; f++) {
if (pfd[f].revents & POLLNVAL) {
pr_val_err("rsync bad fd: %i", pfd[f].fd);
DROP_FD(f, 1);

} else if (pfd[f].revents & POLLERR) {
pr_val_err("Generic error during rsync poll.");
CLOSE_FD(f, 1);

} else if (pfd[f].revents & (POLLIN|POLLHUP)) {
char buffer[4096];
ssize_t count;

count = read(pfd[f].fd, buffer, sizeof(buffer));
if (count == -1) {
error = errno;
if (error == EINTR)
continue;
pr_val_err("rsync buffer read error: %s",
strerror(error));
CLOSE_FD(f, 1);
continue;
}

if (count == 0)
CLOSE_FD(f, 0);
log_buffer(buffer, count, pfd[f].fd == fderr);
}
}
if (count == 0)
break;

log_buffer(buffer, count, type);
if (pfd[0].fd == -1 && pfd[1].fd == -1)
return error; /* Happy path! */

delta = get_current_millis() - epoch;
if (delta < 0) {
pr_val_err("This clock does not seem monotonic. "
"I'm going to have to give up this rsync.");
error = 1;
goto fail;
}
if (delta >= timeout)
goto timed_out; /* Read took too long */
}

close(fd_pipe[type][0]); /* Close read end */
return 0;
timed_out:
pr_val_err("rsync transfer timeout reached");
error = 2;
fail: for (f = 0; f < 2; f++)
if (pfd[f].fd != -1)
close(pfd[f].fd);
return error;
}

/*
* Read the piped output from the child, assures that all pipes are closed on
* success and on error.
* Completely consumes @fds' streams, and closes them.
*
* Allegedly, this is a portable way to wait for the child process to finish.
* (IIRC, waitpid() doesn't do this reliably.)
*/
static int
read_pipes(int fds[2][2])
exhaust_pipes(int fds[2][2])
{
int error;

/* Won't be needed (sterr/stdout write ends) */
close(fds[0][1]);
close(fds[1][1]);

/* stderr pipe */
error = read_pipe(fds, 0);
if (error) {
/* Close the other pipe pending to read */
close(fds[1][0]);
return error;
}

/* stdout pipe, always logs to info */
return read_pipe(fds, 1);
close(STDERR_WRITE(fds));
close(STDOUT_WRITE(fds));
return exhaust_read_fds(STDERR_READ(fds), STDOUT_READ(fds));
}

/*
Expand Down Expand Up @@ -266,17 +343,17 @@ rsync_download(char const *src, char const *dst, bool is_directory)
pr_op_err_st("Couldn't fork to execute rsync: %s",
strerror(error));
/* Close all ends from the created pipes */
close(fork_fds[0][0]);
close(fork_fds[1][0]);
close(fork_fds[0][1]);
close(fork_fds[1][1]);
close(STDERR_READ(fork_fds));
close(STDOUT_READ(fork_fds));
close(STDERR_WRITE(fork_fds));
close(STDOUT_WRITE(fork_fds));
goto release_args;
}

/* This code is run by us. */
error = read_pipes(fork_fds);
error = exhaust_pipes(fork_fds);
if (error)
kill(child_pid, SIGCHLD); /* Stop the child */
kill(child_pid, SIGTERM); /* Stop the child */

error = waitpid(child_pid, &child_status, 0);
do {
Expand Down
Loading

0 comments on commit 4ee88d1

Please sign in to comment.