Skip to content

Commit

Permalink
libflux: convert reactor to libuv
Browse files Browse the repository at this point in the history
This is a partial conversion with the following gaps:
- no periodic watchers
- no child watchers
- no stat watchers
- flux_reactor_active_incref/decref
- no flux_watcher_set_priority

Memory will leak if watchers are destroyed outside of the reactor
because uv_close() works asynchronously.

Currently there is an unexplained segfault in the composite watcher
unit test.

See issue flux-framework#6492
  • Loading branch information
garlick committed Dec 10, 2024
1 parent 3e96bb4 commit 5e924d2
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 222 deletions.
2 changes: 2 additions & 0 deletions src/common/libflux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ AM_CPPFLAGS = \
-DLUADIR=\"$(luadir)\" \
-DLUAEXECDIR=\"$(luaexecdir)\" \
$(JANSSON_CFLAGS) \
$(LIBUV_CFLAGS) \
$(LIBUUID_CFLAGS)

fluxcoreinclude_HEADERS = \
Expand Down Expand Up @@ -137,6 +138,7 @@ test_ldadd = \
$(top_builddir)/src/common/libtap/libtap.la \
$(LIBUUID_LIBS) \
$(JANSSON_LIBS) \
$(LIBUV_LIBS) \
$(LIBPTHREAD) \
$(LIBDL)

Expand Down
63 changes: 28 additions & 35 deletions src/common/libflux/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
#include <errno.h>
#include <stdbool.h>
#include <fcntl.h>
#include <uv.h>
#include <flux/core.h>

#include "src/common/libev/ev.h"

#include "reactor_private.h"

struct flux_reactor {
struct ev_loop *loop;
uv_loop_t loop;
int usecount;
unsigned int errflag:1;
};
Expand All @@ -41,12 +40,7 @@ void flux_reactor_decref (flux_reactor_t *r)
{
if (r && --r->usecount == 0) {
int saved_errno = errno;
if (r->loop) {
if (ev_is_default_loop (r->loop))
ev_default_destroy ();
else
ev_loop_destroy (r->loop);
}
(void)uv_loop_close (&r->loop); // could return -EBUSY
free (r);
errno = saved_errno;
}
Expand All @@ -66,83 +60,82 @@ void flux_reactor_destroy (flux_reactor_t *r)
flux_reactor_t *flux_reactor_create (int flags)
{
flux_reactor_t *r;
int uverr;

if (valid_flags (flags, FLUX_REACTOR_SIGCHLD) < 0)
return NULL;
if (!(r = calloc (1, sizeof (*r))))
return NULL;
if ((flags & FLUX_REACTOR_SIGCHLD))
r->loop = ev_default_loop (EVFLAG_SIGNALFD);
else
r->loop = ev_loop_new (EVFLAG_NOSIGMASK);
if (!r->loop) {
errno = ENOMEM;
flux_reactor_destroy (r);
uverr = uv_loop_init (&r->loop);
if (uverr < 0) {
free (r);
errno = -uverr;
return NULL;
}
ev_set_userdata (r->loop, r);
r->usecount = 1;
return r;
}

int flux_reactor_run (flux_reactor_t *r, int flags)
{
int ev_flags = 0;
uv_run_mode mode;
int count;

if (valid_flags (flags, FLUX_REACTOR_NOWAIT | FLUX_REACTOR_ONCE) < 0)
return -1;
if (flags & FLUX_REACTOR_NOWAIT)
ev_flags |= EVRUN_NOWAIT;
if (flags & FLUX_REACTOR_ONCE)
ev_flags |= EVRUN_ONCE;
if (flags == FLUX_REACTOR_NOWAIT)
mode = UV_RUN_NOWAIT;
else if (flags == FLUX_REACTOR_ONCE)
mode = UV_RUN_ONCE;
else if (flags == 0)
mode = UV_RUN_DEFAULT;
else {
errno = EINVAL;
return-1;
}
r->errflag = 0;
count = ev_run (r->loop, ev_flags);
count = uv_run (&r->loop, mode);
return (r->errflag ? -1 : count);
}

double flux_reactor_time (void)
{
return ev_time ();
return 1E-9 * uv_hrtime ();
}

double flux_reactor_now (flux_reactor_t *r)
{
return ev_now (r->loop);
return 1E-3 * uv_now (&r->loop);
}

void flux_reactor_now_update (flux_reactor_t *r)
{
return ev_now_update (r->loop);
uv_update_time (&r->loop);
}

void flux_reactor_stop (flux_reactor_t *r)
{
r->errflag = 0;
ev_break (r->loop, EVBREAK_ALL);
uv_stop (&r->loop);
}

void flux_reactor_stop_error (flux_reactor_t *r)
{
r->errflag = 1;
ev_break (r->loop, EVBREAK_ALL);
uv_stop (&r->loop);
}

void flux_reactor_active_incref (flux_reactor_t *r)
{
if (r)
ev_ref (r->loop);
// FIXME - see https://docs.libuv.org/en/v1.x/handle.html#refcount
}

void flux_reactor_active_decref (flux_reactor_t *r)
{
if (r)
ev_unref (r->loop);
// FIXME
}

void *reactor_get_loop (flux_reactor_t *r)
{
return r ? r->loop : NULL;
return r ? &r->loop : NULL;
}

/*
Expand Down
29 changes: 25 additions & 4 deletions src/common/libflux/test/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static void oneshot (flux_reactor_t *r,
static void test_timer (flux_reactor_t *reactor)
{
flux_watcher_t *w;
double elapsed, t0, t[] = { 0.001, 0.010, 0.050, 0.100, 0.200 };
double elapsed, t0, t[] = { 0.005, 0.010, 0.050, 0.100, 0.200 };
int i, rc;

/* in case this test runs a while after last reactor run.
Expand Down Expand Up @@ -229,13 +229,15 @@ static void test_timer (flux_reactor_t *reactor)
oneshot_runs = 0;
rc = flux_reactor_run (reactor, 0);
elapsed = flux_reactor_now (reactor) - t0;
ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i],
"timer: reactor ran %.3fs oneshot at >= time (%.3fs)", t[i], elapsed);
// libuv timer rez is 1ms so allow event to fire up to 1ms early
ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i] - 0.001,
"timer: reactor ran %.3fs oneshot punctually", t[i]);
diag ("elapsed time was %.3fs", elapsed);
}
flux_watcher_destroy (w);
}


#if TODO_PERIODIC
/* A reactor callback that immediately stops reactor without error */
static bool do_stop_callback_ran = false;
static void do_stop_reactor (flux_reactor_t *r,
Expand Down Expand Up @@ -362,6 +364,7 @@ static void test_periodic (flux_reactor_t *reactor)
flux_watcher_destroy (w);

}
#endif

static int idle_count = 0;
static void idle_cb (flux_reactor_t *r,
Expand Down Expand Up @@ -515,6 +518,7 @@ static void test_signal (flux_reactor_t *reactor)
flux_watcher_destroy (idle);
}

#if TODO_CHILD
static pid_t child_pid = -1;
static void child_cb (flux_reactor_t *r,
flux_watcher_t *w,
Expand Down Expand Up @@ -563,7 +567,9 @@ static void test_child (flux_reactor_t *reactor)
flux_watcher_destroy (w);
flux_reactor_destroy (r);
}
#endif

#if TODO_STAT
struct stat_ctx {
int fd;
char *path;
Expand Down Expand Up @@ -653,7 +659,9 @@ static void test_stat (flux_reactor_t *reactor)
flux_watcher_destroy (tw);
free (ctx.path);
}
#endif

#if TODO_ACTIVE_REF
static void active_idle_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
Expand Down Expand Up @@ -702,6 +710,7 @@ static void test_active_ref (flux_reactor_t *r)

flux_watcher_destroy (w);
}
#endif

static void reactor_destroy_early (void)
{
Expand All @@ -728,6 +737,7 @@ static void test_reactor_flags (flux_reactor_t *r)
"flux_reactor_create flags=0xffff fails with EINVAL");
}

#if TODO_PRIORITY
static char cblist[6] = {0};
static int cblist_index = 0;
static flux_watcher_t *priority_prep = NULL;
Expand Down Expand Up @@ -800,6 +810,7 @@ static void test_priority (flux_reactor_t *r)
flux_watcher_destroy (priority_prep);
flux_watcher_destroy (priority_idle);
}
#endif

int main (int argc, char *argv[])
{
Expand All @@ -819,16 +830,26 @@ int main (int argc, char *argv[])
"flux_watcher_is_active (NULL) returns false");

test_timer (reactor);
#if TODO_PERIODIC
test_periodic (reactor);
#endif
test_fd (reactor);
test_idle (reactor);
test_prepcheck (reactor);
test_signal (reactor);
#if TODO_CHILD
test_child (reactor);
#endif
#if TODO_STAT
test_stat (reactor);
#endif
#if TODO_ACTIVE_REF
test_active_ref (reactor);
#endif
test_reactor_flags (reactor);
#if TODO_PRIORITY
test_priority (reactor);
#endif

flux_reactor_destroy (reactor);

Expand Down
Loading

0 comments on commit 5e924d2

Please sign in to comment.