From cd2794c01fc84a4118f79e31071cb1bca78918f5 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Wed, 21 Aug 2013 01:43:09 +0400 Subject: [PATCH] fsevents: use shared FSEventStream It seems that number of simultaneously opened FSEventStreams is limited on OSX (i.e. you can have only fixed number of them on one running system), getting past through this limit will cause `FSEventStreamCreate` to return false and write following message to stderr: (CarbonCore.framework) FSEventStreamStart: register_with_server: ERROR: f2d_register_rpc() => (null) (-21) To prevent this, we must use only one shared FSEventStream with a paths for all uv_fsevent_t handles, and then filter out events for each handle using this paths again. See https://github.com/joyent/node/issues/5463 --- include/uv-darwin.h | 10 +- src/unix/darwin.c | 2 +- src/unix/fsevents.c | 490 ++++++++++++++++++++++++++++++-------------- src/unix/kqueue.c | 1 - 4 files changed, 345 insertions(+), 158 deletions(-) diff --git a/include/uv-darwin.h b/include/uv-darwin.h index 43b261f5fb..dcdd42ba6d 100644 --- a/include/uv-darwin.h +++ b/include/uv-darwin.h @@ -36,8 +36,8 @@ #define UV_PLATFORM_LOOP_FIELDS \ uv_thread_t cf_thread; \ - void* cf_cb; \ - void* cf_loop; \ + void* _cf_reserved; \ + void* cf_state; \ uv_mutex_t cf_mutex; \ uv_sem_t cf_sem; \ void* cf_signals[2]; \ @@ -47,10 +47,10 @@ char* realpath; \ int realpath_len; \ int cf_flags; \ - void* cf_eventstream; \ + void* cf_event; \ uv_async_t* cf_cb; \ - void* cf_events[2]; \ - uv_sem_t cf_sem; \ + void* cf_member[2]; \ + uv_sem_t _cf_reserved; \ uv_mutex_t cf_mutex; \ #define UV_STREAM_PRIVATE_PLATFORM_FIELDS \ diff --git a/src/unix/darwin.c b/src/unix/darwin.c index 8a9b4bab63..a03ef2a9e0 100644 --- a/src/unix/darwin.c +++ b/src/unix/darwin.c @@ -38,7 +38,7 @@ int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - loop->cf_loop = NULL; + loop->cf_state = NULL; if (uv__kqueue_init(loop)) return -errno; diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index 79ad198bae..4d5e87fe52 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -49,70 +49,89 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { typedef struct uv__fsevents_event_s uv__fsevents_event_t; typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t; -typedef void (*cf_loop_signal_cb)(void* arg); +typedef struct uv__cf_loop_state_s uv__cf_loop_state_t; + +struct uv__cf_loop_state_s { + CFRunLoopRef loop; + CFRunLoopSourceRef signal_source; + volatile int fsevent_need_reschedule; + FSEventStreamRef fsevent_stream; + uv_sem_t fsevent_sem; + uv_mutex_t fsevent_mutex; + void* fsevent_handles[2]; + int fsevent_handle_count; +}; struct uv__cf_loop_signal_s { - cf_loop_signal_cb cb; QUEUE member; - void* arg; + uv_fs_event_t* handle; }; struct uv__fsevents_event_s { int events; - QUEUE member; + void* next; char path[1]; }; +static const int kFSEventsModified = kFSEventStreamEventFlagItemFinderInfoMod | + kFSEventStreamEventFlagItemModified | + kFSEventStreamEventFlagItemInodeMetaMod | + kFSEventStreamEventFlagItemChangeOwner | + kFSEventStreamEventFlagItemXattrMod; +static const int kFSEventsRenamed = kFSEventStreamEventFlagItemCreated | + kFSEventStreamEventFlagItemRemoved | + kFSEventStreamEventFlagItemRenamed; +static const int kFSEventsSystem = kFSEventStreamEventFlagUserDropped | + kFSEventStreamEventFlagKernelDropped | + kFSEventStreamEventFlagEventIdsWrapped | + kFSEventStreamEventFlagHistoryDone | + kFSEventStreamEventFlagMount | + kFSEventStreamEventFlagUnmount | + kFSEventStreamEventFlagRootChanged; + /* Forward declarations */ static void uv__cf_loop_cb(void* arg); static void* uv__cf_loop_runner(void* arg); -static void uv__cf_loop_signal(uv_loop_t* loop, - cf_loop_signal_cb cb, - void* arg); - -#define UV__FSEVENTS_WALK(handle, block) \ - { \ - QUEUE* curr; \ - QUEUE split_head; \ +static int uv__cf_loop_signal(uv_loop_t* loop, uv_fs_event_t* handle); + +#define UV__FSEVENTS_PROCESS(handle, block) \ + do { \ uv__fsevents_event_t* event; \ + uv__fsevents_event_t* next; \ uv_mutex_lock(&(handle)->cf_mutex); \ - QUEUE_INIT(&split_head); \ - if (!QUEUE_EMPTY(&(handle)->cf_events)) { \ - QUEUE* split_pos = QUEUE_HEAD(&(handle)->cf_events); \ - QUEUE_SPLIT(&(handle)->cf_events, split_pos, &split_head); \ - } \ + event = (handle)->cf_event; \ + (handle)->cf_event = NULL; \ uv_mutex_unlock(&(handle)->cf_mutex); \ - while (!QUEUE_EMPTY(&split_head)) { \ - curr = QUEUE_HEAD(&split_head); \ + while (event != NULL) { \ /* Invoke callback */ \ - event = QUEUE_DATA(curr, uv__fsevents_event_t, member); \ - QUEUE_REMOVE(curr); \ /* Invoke block code, but only if handle wasn't closed */ \ - if (((handle)->flags & (UV_CLOSING | UV_CLOSED)) == 0) \ + if (!uv__is_closing((handle))) \ block \ /* Free allocated data */ \ + next = event->next; \ free(event); \ + event = next; \ } \ - } + } while (0) +/* Runs in UV loop's thread, when there're events to report to handle */ static void uv__fsevents_cb(uv_async_t* cb, int status) { uv_fs_event_t* handle; handle = cb->data; - UV__FSEVENTS_WALK(handle, { + UV__FSEVENTS_PROCESS(handle, { if (handle->event_watcher.fd != -1) handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0); }); - if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 && - handle->event_watcher.fd == -1) { + if (!uv__is_closing(handle) && handle->event_watcher.fd == -1) uv__fsevents_close(handle); - } } +/* Runs in CF thread, when there're events in FSEventStream */ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, void* info, size_t numEvents, @@ -125,42 +144,35 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, char* path; char* pos; uv_fs_event_t* handle; + QUEUE* q; + uv_loop_t* loop; + uv__cf_loop_state_t* state; uv__fsevents_event_t* event; - QUEUE add_list; - int kFSEventsModified; - int kFSEventsRenamed; - - kFSEventsModified = kFSEventStreamEventFlagItemFinderInfoMod | - kFSEventStreamEventFlagItemModified | - kFSEventStreamEventFlagItemInodeMetaMod | - kFSEventStreamEventFlagItemChangeOwner | - kFSEventStreamEventFlagItemXattrMod; - kFSEventsRenamed = kFSEventStreamEventFlagItemCreated | - kFSEventStreamEventFlagItemRemoved | - kFSEventStreamEventFlagItemRenamed; - - handle = info; + uv__fsevents_event_t* tail; + + loop = info; + state = loop->cf_state; + assert(state != NULL); paths = eventPaths; - QUEUE_INIT(&add_list); - - for (i = 0; i < numEvents; i++) { - /* Ignore system events */ - if (eventFlags[i] & (kFSEventStreamEventFlagUserDropped | - kFSEventStreamEventFlagKernelDropped | - kFSEventStreamEventFlagEventIdsWrapped | - kFSEventStreamEventFlagHistoryDone | - kFSEventStreamEventFlagMount | - kFSEventStreamEventFlagUnmount | - kFSEventStreamEventFlagRootChanged)) { - continue; - } - /* TODO: Report errors */ - path = paths[i]; - len = strlen(path); + /* For each handle */ + QUEUE_FOREACH(q, &state->fsevent_handles) { + handle = QUEUE_DATA(q, uv_fs_event_t, cf_member); + tail = NULL; + + /* Process and filter out events */ + for (i = 0; i < numEvents; i++) { + /* Ignore system events */ + if (eventFlags[i] & kFSEventsSystem) + continue; + + path = paths[i]; + len = strlen(path); + + /* Filter out paths that are outside handle's request */ + if (strncmp(path, handle->realpath, handle->realpath_len) != 0) + continue; - /* Remove absolute path prefix */ - if (strstr(path, handle->realpath) == path) { path += handle->realpath_len; len -= handle->realpath_len; @@ -169,79 +181,81 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, path++; len--; } - } #ifdef MAC_OS_X_VERSION_10_7 - /* Ignore events with path equal to directory itself */ - if (len == 0) - continue; + /* Ignore events with path equal to directory itself */ + if (len == 0) + continue; #endif /* MAC_OS_X_VERSION_10_7 */ - /* Do not emit events from subdirectories (without option set) */ - pos = strchr(path, '/'); - if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0 && - pos != NULL && - pos != path + 1) - continue; + /* Do not emit events from subdirectories (without option set) */ + if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0) { + pos = strchr(path, '/'); + if (pos != NULL && pos != path + 1) + continue; + } #ifndef MAC_OS_X_VERSION_10_7 - path = ""; - len = 0; + path = ""; + len = 0; #endif /* MAC_OS_X_VERSION_10_7 */ - event = malloc(sizeof(*event) + len); - if (event == NULL) - break; + event = malloc(sizeof(*event) + len); + if (event == NULL) + break; - memcpy(event->path, path, len + 1); + memset(event, 0, sizeof(*event)); + memcpy(event->path, path, len + 1); - if ((eventFlags[i] & kFSEventsModified) != 0 && - (eventFlags[i] & kFSEventsRenamed) == 0) - event->events = UV_CHANGE; - else - event->events = UV_RENAME; + if ((eventFlags[i] & kFSEventsModified) != 0 && + (eventFlags[i] & kFSEventsRenamed) == 0) + event->events = UV_CHANGE; + else + event->events = UV_RENAME; - QUEUE_INSERT_TAIL(&add_list, &event->member); - } - uv_mutex_lock(&handle->cf_mutex); - QUEUE_ADD(&handle->cf_events, &add_list); - uv_mutex_unlock(&handle->cf_mutex); + if (tail != NULL) + tail->next = event; + tail = event; + } + + if (tail != NULL) { + uv_mutex_lock(&handle->cf_mutex); + tail->next = handle->cf_event; + handle->cf_event = tail; + uv_mutex_unlock(&handle->cf_mutex); - uv_async_send(handle->cf_cb); + uv_async_send(handle->cf_cb); + } + } } -static void uv__fsevents_schedule(void* arg) { - uv_fs_event_t* handle; +/* Runs in CF thread */ +static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) { + uv__cf_loop_state_t* state; FSEventStreamContext ctx; FSEventStreamRef ref; - CFStringRef path; - CFArrayRef paths; CFAbsoluteTime latency; FSEventStreamCreateFlags flags; - handle = arg; - /* Initialize context */ ctx.version = 0; - ctx.info = handle; + ctx.info = loop; ctx.retain = NULL; ctx.release = NULL; ctx.copyDescription = NULL; - /* Initialize paths array */ - path = CFStringCreateWithCString(NULL, - handle->filename, - CFStringGetSystemEncoding()); - assert(path != NULL); - paths = CFArrayCreate(NULL, (const void**)&path, 1, NULL); - assert(paths != NULL); - latency = 0.15; /* Set appropriate flags */ flags = kFSEventStreamCreateFlagFileEvents; + /* + * NOTE: It might sound like a good idea to remember last seen StreamEventId, + * but in reality one dir might have last StreamEventId less than, the other, + * that is being watched now. Which will cause FSEventStream API to report + * changes to files from the past. + */ ref = FSEventStreamCreate(NULL, &uv__fsevents_event_cb, &ctx, @@ -250,43 +264,120 @@ static void uv__fsevents_schedule(void* arg) { latency, flags); assert(ref != NULL); - handle->cf_eventstream = ref; - FSEventStreamScheduleWithRunLoop(handle->cf_eventstream, - handle->loop->cf_loop, + state = loop->cf_state; + FSEventStreamScheduleWithRunLoop(ref, + state->loop, kCFRunLoopDefaultMode); - if (!FSEventStreamStart(handle->cf_eventstream)) + if (!FSEventStreamStart(ref)) abort(); + + state->fsevent_stream = ref; } -static void uv__fsevents_unschedule(void* arg) { - uv_fs_event_t* handle; +/* Runs in CF thread */ +static void uv__fsevents_destroy_stream(uv_loop_t* loop) { + uv__cf_loop_state_t* state; + + state = loop->cf_state; + + if (state->fsevent_stream == NULL) + return; - handle = arg; + /* Flush all accumulated events */ + FSEventStreamFlushSync(state->fsevent_stream); /* Stop emitting events */ - FSEventStreamStop(handle->cf_eventstream); + FSEventStreamStop(state->fsevent_stream); /* Release stream */ - FSEventStreamInvalidate(handle->cf_eventstream); - FSEventStreamRelease(handle->cf_eventstream); - handle->cf_eventstream = NULL; + FSEventStreamInvalidate(state->fsevent_stream); + FSEventStreamRelease(state->fsevent_stream); + state->fsevent_stream = NULL; +} + + +/* Runs in CF thread, when there're new fsevent handles to add to stream */ +static void uv__fsevents_reschedule(uv_fs_event_t* handle) { + uv__cf_loop_state_t* state; + QUEUE* q; + uv_fs_event_t* curr; + CFArrayRef cf_paths; + CFStringRef* paths; + int i; + int path_count; + + state = handle->loop->cf_state; + + /* Optimization to prevent O(n^2) time spent when starting to watch + * many files simultaneously + */ + if (!state->fsevent_need_reschedule) + return; + state->fsevent_need_reschedule = 0; + + /* Destroy previous FSEventStream */ + uv__fsevents_destroy_stream(handle->loop); + + /* Create list of all watched paths */ + uv_mutex_lock(&state->fsevent_mutex); + path_count = state->fsevent_handle_count; + if (path_count != 0) { + paths = malloc(sizeof(*paths) * path_count); + if (paths == NULL) + abort(); + + q = &state->fsevent_handles; + for (i = 0; i < path_count; i++) { + q = QUEUE_NEXT(q); + assert(q != &state->fsevent_handles); + curr = QUEUE_DATA(q, uv_fs_event_t, cf_member); + + assert(curr->realpath != NULL); + paths[i] = CFStringCreateWithCString(NULL, + curr->realpath, + CFStringGetSystemEncoding()); + if (paths[i] == NULL) + abort(); + } + } + uv_mutex_unlock(&state->fsevent_mutex); - /* Notify main thread that we're done here */ - uv_sem_post(&handle->cf_sem); + if (path_count != 0) { + /* Create new FSEventStream */ + cf_paths = CFArrayCreate(NULL, (const void**) paths, path_count, NULL); + if (cf_paths == NULL) + abort(); + uv__fsevents_create_stream(handle->loop, cf_paths); + } + + /* + * Main thread will block until the removal of handle from the list, + * we must tell it when we're ready. + * + * NOTE: This is coupled with `uv_sem_wait()` in `uv__fsevents_close` + */ + if (uv__is_closing(handle)) + uv_sem_post(&state->fsevent_sem); } +/* Runs in UV loop */ static int uv__fsevents_loop_init(uv_loop_t* loop) { CFRunLoopSourceContext ctx; + uv__cf_loop_state_t* state; pthread_attr_t attr_storage; pthread_attr_t* attr; int err; - if (loop->cf_loop != NULL) + if (loop->cf_state != NULL) return 0; + state = calloc(1, sizeof(*state)); + if (state == NULL) + return -ENOMEM; + err = uv_mutex_init(&loop->cf_mutex); if (err) return err; @@ -296,10 +387,27 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { goto fail_sem_init; QUEUE_INIT(&loop->cf_signals); + + err = uv_sem_init(&state->fsevent_sem, 0); + if (err) + goto fail_fsevent_sem_init; + + err = uv_mutex_init(&state->fsevent_mutex); + if (err) + goto fail_fsevent_mutex_init; + + QUEUE_INIT(&state->fsevent_handles); + state->fsevent_need_reschedule = 0; + state->fsevent_handle_count = 0; + memset(&ctx, 0, sizeof(ctx)); ctx.info = loop; ctx.perform = uv__cf_loop_cb; - loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx); + state->signal_source = CFRunLoopSourceCreate(NULL, 0, &ctx); + if (state->signal_source == NULL) { + err = -ENOMEM; + goto fail_signal_source_create; + } /* In the unlikely event that pthread_attr_init() fails, create the thread * with the default stack size. We'll use a little more address space but @@ -313,6 +421,8 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { if (pthread_attr_setstacksize(attr, 3 * PTHREAD_STACK_MIN)) abort(); + loop->cf_state = state; + /* uv_thread_t is an alias for pthread_t. */ err = -pthread_create(&loop->cf_thread, attr, uv__cf_loop_runner, loop); @@ -324,26 +434,39 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { /* Synchronize threads */ uv_sem_wait(&loop->cf_sem); - assert(loop->cf_loop != NULL); return 0; fail_thread_create: + loop->cf_state = NULL; + +fail_signal_source_create: + uv_mutex_destroy(&state->fsevent_mutex); + +fail_fsevent_mutex_init: + uv_sem_destroy(&state->fsevent_sem); + +fail_fsevent_sem_init: uv_sem_destroy(&loop->cf_sem); fail_sem_init: uv_mutex_destroy(&loop->cf_mutex); + free(state); return err; } +/* Runs in UV loop */ void uv__fsevents_loop_delete(uv_loop_t* loop) { uv__cf_loop_signal_t* s; + uv__cf_loop_state_t* state; QUEUE* q; - if (loop->cf_loop == NULL) + if (loop->cf_state == NULL) return; - uv__cf_loop_signal(loop, NULL, NULL); + if (uv__cf_loop_signal(loop, NULL) != 0) + abort(); + uv_thread_join(&loop->cf_thread); uv_sem_destroy(&loop->cf_sem); uv_mutex_destroy(&loop->cf_mutex); @@ -355,40 +478,54 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { QUEUE_REMOVE(q); free(s); } + + /* Destroy state */ + state = loop->cf_state; + uv_sem_destroy(&state->fsevent_sem); + uv_mutex_destroy(&state->fsevent_mutex); + CFRelease(state->signal_source); + free(state); + loop->cf_state = NULL; } +/* Runs in CF thread. This is the CF loop's body */ static void* uv__cf_loop_runner(void* arg) { uv_loop_t* loop; + uv__cf_loop_state_t* state; loop = arg; - loop->cf_loop = CFRunLoopGetCurrent(); + state = loop->cf_state; + state->loop = CFRunLoopGetCurrent(); - CFRunLoopAddSource(loop->cf_loop, - loop->cf_cb, + CFRunLoopAddSource(state->loop, + state->signal_source, kCFRunLoopDefaultMode); uv_sem_post(&loop->cf_sem); CFRunLoopRun(); - CFRunLoopRemoveSource(loop->cf_loop, - loop->cf_cb, + CFRunLoopRemoveSource(state->loop, + state->signal_source, kCFRunLoopDefaultMode); return NULL; } +/* Runs in CF thread, executed after `uv__cf_loop_signal()` */ static void uv__cf_loop_cb(void* arg) { uv_loop_t* loop; + uv__cf_loop_state_t* state; QUEUE* item; QUEUE split_head; uv__cf_loop_signal_t* s; loop = arg; + state = loop->cf_state; + QUEUE_INIT(&split_head); uv_mutex_lock(&loop->cf_mutex); - QUEUE_INIT(&split_head); if (!QUEUE_EMPTY(&loop->cf_signals)) { QUEUE* split_pos = QUEUE_HEAD(&loop->cf_signals); QUEUE_SPLIT(&loop->cf_signals, split_pos, &split_head); @@ -401,10 +538,10 @@ static void uv__cf_loop_cb(void* arg) { s = QUEUE_DATA(item, uv__cf_loop_signal_t, member); /* This was a termination signal */ - if (s->cb == NULL) - CFRunLoopStop(loop->cf_loop); + if (s->handle == NULL) + CFRunLoopStop(state->loop); else - s->cb(s->arg); + uv__fsevents_reschedule(s->handle); QUEUE_REMOVE(item); free(s); @@ -412,29 +549,34 @@ static void uv__cf_loop_cb(void* arg) { } -void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { +/* Runs in UV loop to notify CF thread */ +int uv__cf_loop_signal(uv_loop_t* loop, uv_fs_event_t* handle) { uv__cf_loop_signal_t* item; + uv__cf_loop_state_t* state; item = malloc(sizeof(*item)); - /* XXX: Fail */ if (item == NULL) - abort(); + return -ENOMEM; - item->arg = arg; - item->cb = cb; + item->handle = handle; uv_mutex_lock(&loop->cf_mutex); QUEUE_INSERT_TAIL(&loop->cf_signals, &item->member); uv_mutex_unlock(&loop->cf_mutex); - assert(loop->cf_loop != NULL); - CFRunLoopSourceSignal(loop->cf_cb); - CFRunLoopWakeUp(loop->cf_loop); + state = loop->cf_state; + assert(state != NULL); + CFRunLoopSourceSignal(state->signal_source); + CFRunLoopWakeUp(state->loop); + + return 0; } +/* Runs in UV loop to initialize handle */ int uv__fsevents_init(uv_fs_event_t* handle) { int err; + uv__cf_loop_state_t* state; err = uv__fsevents_loop_init(handle->loop); if (err) @@ -442,52 +584,98 @@ int uv__fsevents_init(uv_fs_event_t* handle) { /* Get absolute path to file */ handle->realpath = realpath(handle->filename, NULL); - if (handle->realpath != NULL) - handle->realpath_len = strlen(handle->realpath); + if (handle->realpath == NULL) + return -errno; + handle->realpath_len = strlen(handle->realpath); + + /* Initialize singly-linked list */ + handle->cf_event = NULL; - handle->cf_eventstream = NULL; /* * Events will occur in other thread. * Initialize callback for getting them back into event loop's thread */ handle->cf_cb = malloc(sizeof(*handle->cf_cb)); - if (handle->cf_cb == NULL) - return -ENOMEM; + if (handle->cf_cb == NULL) { + err = -ENOMEM; + goto fail_cf_cb_malloc; + } handle->cf_cb->data = handle; uv_async_init(handle->loop, handle->cf_cb, uv__fsevents_cb); handle->cf_cb->flags |= UV__HANDLE_INTERNAL; uv_unref((uv_handle_t*) handle->cf_cb); - uv_mutex_init(&handle->cf_mutex); - uv_sem_init(&handle->cf_sem, 0); - QUEUE_INIT(&handle->cf_events); - - uv__cf_loop_signal(handle->loop, uv__fsevents_schedule, handle); + err = uv_mutex_init(&handle->cf_mutex); + if (err) + goto fail_cf_mutex_init; + + /* Insert handle into the list */ + state = handle->loop->cf_state; + uv_mutex_lock(&state->fsevent_mutex); + QUEUE_INSERT_TAIL(&state->fsevent_handles, &handle->cf_member); + state->fsevent_handle_count++; + state->fsevent_need_reschedule = 1; + uv_mutex_unlock(&state->fsevent_mutex); + + /* Reschedule FSEventStream */ + assert(handle != NULL); + err = uv__cf_loop_signal(handle->loop, handle); + if (err) + goto fail_loop_signal; return 0; + +fail_loop_signal: + uv_mutex_destroy(&handle->cf_mutex); + +fail_cf_mutex_init: + free(handle->cf_cb); + handle->cf_cb = NULL; + +fail_cf_cb_malloc: + free(handle->realpath); + handle->realpath = NULL; + handle->realpath_len = 0; + + return err; } +/* Runs in UV loop to de-initialize handle */ int uv__fsevents_close(uv_fs_event_t* handle) { + int err; + uv__cf_loop_state_t* state; + if (handle->cf_cb == NULL) return -EINVAL; - uv__cf_loop_signal(handle->loop, uv__fsevents_unschedule, handle); + /* Remove handle from the list */ + state = handle->loop->cf_state; + uv_mutex_lock(&state->fsevent_mutex); + QUEUE_REMOVE(&handle->cf_member); + state->fsevent_handle_count--; + state->fsevent_need_reschedule = 1; + uv_mutex_unlock(&state->fsevent_mutex); + + /* Reschedule FSEventStream */ + assert(handle != NULL); + err = uv__cf_loop_signal(handle->loop, handle); + if (err) + return -err; /* Wait for deinitialization */ - uv_sem_wait(&handle->cf_sem); + uv_sem_wait(&state->fsevent_sem); uv_close((uv_handle_t*) handle->cf_cb, (uv_close_cb) free); handle->cf_cb = NULL; /* Free data in queue */ - UV__FSEVENTS_WALK(handle, { + UV__FSEVENTS_PROCESS(handle, { /* NOP */ - }) + }); uv_mutex_destroy(&handle->cf_mutex); - uv_sem_destroy(&handle->cf_sem); free(handle->realpath); handle->realpath = NULL; handle->realpath_len = 0; diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index 2c68bf36f2..391ab615de 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -320,7 +320,6 @@ int uv_fs_event_init(uv_loop_t* loop, #if defined(__APPLE__) /* Nullify field to perform checks later */ handle->cf_cb = NULL; - handle->cf_eventstream = NULL; handle->realpath = NULL; handle->realpath_len = 0; handle->cf_flags = flags;