Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
darwin: create fsevents thread on demand
Browse files Browse the repository at this point in the history
* Move CF run loop code to fsevents.c.

* Create the fsevents thread on demand rather than at startup.

* Remove use of ACCESS_ONCE. All accesses to loop->cf_loop are
  protected by full memory barriers so no reordering can take place.

Fixes #872.

Conflicts:
	src/unix/darwin.c
  • Loading branch information
bnoordhuis authored and indutny committed Aug 22, 2013
1 parent 24a42a4 commit 9bae606
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 140 deletions.
131 changes: 2 additions & 129 deletions src/unix/darwin.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,153 +28,26 @@
#include <ifaddrs.h>
#include <net/if.h>

#include <CoreFoundation/CFRunLoop.h>

#include <mach/mach.h>
#include <mach/mach_time.h>
#include <mach-o/dyld.h> /* _NSGetExecutablePath */
#include <sys/resource.h>
#include <sys/sysctl.h>
#include <unistd.h> /* sysconf */

/* Forward declarations */
static void uv__cf_loop_runner(void* arg);
static void uv__cf_loop_cb(void* arg);

typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
struct uv__cf_loop_signal_s {
void* arg;
cf_loop_signal_cb cb;
ngx_queue_t member;
};


int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
CFRunLoopSourceContext ctx;
int r;
loop->cf_loop = NULL;

if (uv__kqueue_init(loop))
return -1;

loop->cf_loop = NULL;
if ((r = uv_mutex_init(&loop->cf_mutex)))
return r;
if ((r = uv_sem_init(&loop->cf_sem, 0)))
return r;
ngx_queue_init(&loop->cf_signals);

memset(&ctx, 0, sizeof(ctx));
ctx.info = loop;
ctx.perform = uv__cf_loop_cb;
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);

if ((r = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop)))
return r;

/* Synchronize threads */
uv_sem_wait(&loop->cf_sem);
assert(ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) != NULL);

return 0;
}


void uv__platform_loop_delete(uv_loop_t* loop) {
ngx_queue_t* item;
uv__cf_loop_signal_t* s;

assert(loop->cf_loop != NULL);
uv__cf_loop_signal(loop, NULL, NULL);
uv_thread_join(&loop->cf_thread);

uv_sem_destroy(&loop->cf_sem);
uv_mutex_destroy(&loop->cf_mutex);

/* Free any remaining data */
while (!ngx_queue_empty(&loop->cf_signals)) {
item = ngx_queue_head(&loop->cf_signals);

s = ngx_queue_data(item, uv__cf_loop_signal_t, member);

ngx_queue_remove(item);
free(s);
}
}


static void uv__cf_loop_runner(void* arg) {
uv_loop_t* loop;

loop = arg;

/* Get thread's loop */
ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) = CFRunLoopGetCurrent();

CFRunLoopAddSource(loop->cf_loop,
loop->cf_cb,
kCFRunLoopDefaultMode);

uv_sem_post(&loop->cf_sem);

CFRunLoopRun();

CFRunLoopRemoveSource(loop->cf_loop,
loop->cf_cb,
kCFRunLoopDefaultMode);
}


static void uv__cf_loop_cb(void* arg) {
uv_loop_t* loop;
ngx_queue_t* item;
ngx_queue_t split_head;
uv__cf_loop_signal_t* s;

loop = arg;

uv_mutex_lock(&loop->cf_mutex);
ngx_queue_init(&split_head);
if (!ngx_queue_empty(&loop->cf_signals)) {
ngx_queue_t* split_pos = ngx_queue_next(&loop->cf_signals);
ngx_queue_split(&loop->cf_signals, split_pos, &split_head);
}
uv_mutex_unlock(&loop->cf_mutex);

while (!ngx_queue_empty(&split_head)) {
item = ngx_queue_head(&split_head);

s = ngx_queue_data(item, uv__cf_loop_signal_t, member);

/* This was a termination signal */
if (s->cb == NULL)
CFRunLoopStop(loop->cf_loop);
else
s->cb(s->arg);

ngx_queue_remove(item);
free(s);
}
}


void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
uv__cf_loop_signal_t* item;

item = malloc(sizeof(*item));
/* XXX: Fail */
if (item == NULL)
abort();

item->arg = arg;
item->cb = cb;

uv_mutex_lock(&loop->cf_mutex);
ngx_queue_insert_tail(&loop->cf_signals, &item->member);
uv_mutex_unlock(&loop->cf_mutex);

assert(loop->cf_loop != NULL);
CFRunLoopSourceSignal(loop->cf_cb);
CFRunLoopWakeUp(loop->cf_loop);
uv__fsevents_loop_delete(loop);
}


Expand Down
175 changes: 167 additions & 8 deletions src/unix/fsevents.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,40 @@ int uv__fsevents_close(uv_fs_event_t* handle) {
return 0;
}


void uv__fsevents_loop_delete(uv_loop_t* loop) {
return 0;
}

#else /* TARGET_OS_IPHONE */

#include <assert.h>
#include <stdlib.h>
#include <CoreFoundation/CFRunLoop.h>
#include <CoreServices/CoreServices.h>

typedef struct uv__fsevents_event_s uv__fsevents_event_t;
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
typedef void (*cf_loop_signal_cb)(void* arg);

struct uv__cf_loop_signal_s {
cf_loop_signal_cb cb;
ngx_queue_t member;
void* arg;
};

struct uv__fsevents_event_s {
int events;
ngx_queue_t member;
char path[1];
};

/* Forward declarations */
static void uv__cf_loop_cb(void* arg);
static void uv__cf_loop_runner(void* arg);
static void uv__cf_loop_signal(uv_loop_t* loop,
cf_loop_signal_cb cb,
void* arg);

#define UV__FSEVENTS_WALK(handle, block) \
{ \
Expand Down Expand Up @@ -75,7 +95,7 @@ struct uv__fsevents_event_s {
}


void uv__fsevents_cb(uv_async_t* cb, int status) {
static void uv__fsevents_cb(uv_async_t* cb, int status) {
uv_fs_event_t* handle;

handle = cb->data;
Expand All @@ -92,12 +112,12 @@ void uv__fsevents_cb(uv_async_t* cb, int status) {
}


void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
void* info,
size_t numEvents,
void* eventPaths,
const FSEventStreamEventFlags eventFlags[],
const FSEventStreamEventId eventIds[]) {
static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
void* info,
size_t numEvents,
void* eventPaths,
const FSEventStreamEventFlags eventFlags[],
const FSEventStreamEventId eventIds[]) {
size_t i;
int len;
char** paths;
Expand Down Expand Up @@ -190,7 +210,7 @@ void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
}


void uv__fsevents_schedule(void* arg) {
static void uv__fsevents_schedule(void* arg) {
uv_fs_event_t* handle;

handle = arg;
Expand All @@ -202,13 +222,152 @@ void uv__fsevents_schedule(void* arg) {
}


static int uv__fsevents_loop_init(uv_loop_t* loop) {
CFRunLoopSourceContext ctx;
int err;

if (loop->cf_loop != NULL)
return 0;

err = uv_mutex_init(&loop->cf_mutex);
if (err)
return err;

err = uv_sem_init(&loop->cf_sem, 0);
if (err)
goto fail_sem_init;

ngx_queue_init(&loop->cf_signals);
memset(&ctx, 0, sizeof(ctx));
ctx.info = loop;
ctx.perform = uv__cf_loop_cb;
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);

err = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop);
if (err)
goto fail_thread_create;

/* Synchronize threads */
uv_sem_wait(&loop->cf_sem);
assert(loop->cf_loop != NULL);
return 0;

fail_thread_create:
uv_sem_destroy(&loop->cf_sem);

fail_sem_init:
uv_mutex_destroy(&loop->cf_mutex);
return err;
}


void uv__fsevents_loop_delete(uv_loop_t* loop) {
uv__cf_loop_signal_t* s;
ngx_queue_t* q;

if (loop->cf_loop == NULL)
return;

uv__cf_loop_signal(loop, NULL, NULL);
uv_thread_join(&loop->cf_thread);
uv_sem_destroy(&loop->cf_sem);
uv_mutex_destroy(&loop->cf_mutex);

/* Free any remaining data */
while (!ngx_queue_empty(&loop->cf_signals)) {
q = ngx_queue_head(&loop->cf_signals);
s = ngx_queue_data(q, uv__cf_loop_signal_t, member);
ngx_queue_remove(q);
free(s);
}
}


static void uv__cf_loop_runner(void* arg) {
uv_loop_t* loop;

loop = arg;
loop->cf_loop = CFRunLoopGetCurrent();

CFRunLoopAddSource(loop->cf_loop,
loop->cf_cb,
kCFRunLoopDefaultMode);

uv_sem_post(&loop->cf_sem);

CFRunLoopRun();
CFRunLoopRemoveSource(loop->cf_loop,
loop->cf_cb,
kCFRunLoopDefaultMode);
}


static void uv__cf_loop_cb(void* arg) {
uv_loop_t* loop;
ngx_queue_t* item;
ngx_queue_t split_head;
uv__cf_loop_signal_t* s;

loop = arg;

uv_mutex_lock(&loop->cf_mutex);
ngx_queue_init(&split_head);
if (!ngx_queue_empty(&loop->cf_signals)) {
ngx_queue_t* split_pos = ngx_queue_head(&loop->cf_signals);
ngx_queue_split(&loop->cf_signals, split_pos, &split_head);
}
uv_mutex_unlock(&loop->cf_mutex);

while (!ngx_queue_empty(&split_head)) {
item = ngx_queue_head(&split_head);

s = ngx_queue_data(item, uv__cf_loop_signal_t, member);

/* This was a termination signal */
if (s->cb == NULL)
CFRunLoopStop(loop->cf_loop);
else
s->cb(s->arg);

ngx_queue_remove(item);
free(s);
}
}


void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
uv__cf_loop_signal_t* item;

item = malloc(sizeof(*item));
/* XXX: Fail */
if (item == NULL)
abort();

item->arg = arg;
item->cb = cb;

uv_mutex_lock(&loop->cf_mutex);
ngx_queue_insert_tail(&loop->cf_signals, &item->member);
uv_mutex_unlock(&loop->cf_mutex);

assert(loop->cf_loop != NULL);
CFRunLoopSourceSignal(loop->cf_cb);
CFRunLoopWakeUp(loop->cf_loop);
}


int uv__fsevents_init(uv_fs_event_t* handle) {
FSEventStreamContext ctx;
FSEventStreamRef ref;
CFStringRef path;
CFArrayRef paths;
CFAbsoluteTime latency;
FSEventStreamCreateFlags flags;
int err;

err = uv__fsevents_loop_init(handle->loop);
if (err)
return err;

/* Initialize context */
ctx.version = 0;
Expand Down
Loading

0 comments on commit 9bae606

Please sign in to comment.