diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index d07785146726..cfb92b5b2034 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -44,6 +44,11 @@ void free_scheduler_state(scheduler_state *s) { } utarray_free(s->task_queue); utarray_free(s->available_workers); + available_object *available_obj, *tmp; + HASH_ITER(handle, s->local_objects, available_obj, tmp) { + HASH_DELETE(handle, s->local_objects, available_obj); + free(available_obj); + } free(s); } diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 1e86c55d733b..cfcbbcfaa052 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -363,6 +363,12 @@ void send_notifications(event_loop *loop, LOG_DEBUG( "The socket's send buffer is full, so we are caching this " "notification and will send it later."); + /* Add a callback to the event loop to send queued notifications whenever + * there is room in the socket's send buffer. Callbacks can be added + * more than once here and will be overwritten. The callback is removed + * at the end of the method. */ + event_loop_add_file(plasma_state->loop, client_sock, EVENT_LOOP_WRITE, + send_notifications, plasma_state); break; } else { CHECKM(0, "This code should be unreachable."); @@ -371,6 +377,10 @@ void send_notifications(event_loop *loop, } /* Remove the sent notifications from the array. */ utarray_erase(queue->object_ids, 0, num_processed); + /* If we have sent all notifications, remove the fd from the event loop. */ + if (utarray_len(queue->object_ids) == 0) { + event_loop_remove_file(loop, client_sock); + } } /* Subscribe to notifications about sealed objects. */ @@ -391,10 +401,6 @@ void subscribe_to_updates(client *client_context, int conn) { queue->subscriber_fd = fd; utarray_new(queue->object_ids, &object_table_entry_icd); HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue); - /* Add a callback to the event loop to send queued notifications whenever - * there is room in the socket's send buffer. */ - event_loop_add_file(plasma_state->loop, fd, EVENT_LOOP_WRITE, - send_notifications, plasma_state); } void process_message(event_loop *loop,