Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/photon/photon_algorithm.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ void free_scheduler_state(scheduler_state *s) {
}
utarray_free(s->task_queue);
utarray_free(s->available_workers);
available_object *available_obj, *tmp;
HASH_ITER(handle, s->local_objects, available_obj, tmp) {
HASH_DELETE(handle, s->local_objects, available_obj);
free(available_obj);
}
free(s);
}

Expand Down
14 changes: 10 additions & 4 deletions src/plasma/plasma_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ void send_notifications(event_loop *loop,
LOG_DEBUG(
"The socket's send buffer is full, so we are caching this "
"notification and will send it later.");
/* Add a callback to the event loop to send queued notifications whenever
* there is room in the socket's send buffer. Callbacks can be added
* more than once here and will be overwritten. The callback is removed
* at the end of the method. */
event_loop_add_file(plasma_state->loop, client_sock, EVENT_LOOP_WRITE,
send_notifications, plasma_state);
break;
} else {
CHECKM(0, "This code should be unreachable.");
Expand All @@ -371,6 +377,10 @@ void send_notifications(event_loop *loop,
}
/* Remove the sent notifications from the array. */
utarray_erase(queue->object_ids, 0, num_processed);
/* If we have sent all notifications, remove the fd from the event loop. */
if (utarray_len(queue->object_ids) == 0) {
event_loop_remove_file(loop, client_sock);
}
}

/* Subscribe to notifications about sealed objects. */
Expand All @@ -391,10 +401,6 @@ void subscribe_to_updates(client *client_context, int conn) {
queue->subscriber_fd = fd;
utarray_new(queue->object_ids, &object_table_entry_icd);
HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);
/* Add a callback to the event loop to send queued notifications whenever
* there is room in the socket's send buffer. */
event_loop_add_file(plasma_state->loop, fd, EVENT_LOOP_WRITE,
send_notifications, plasma_state);
}

void process_message(event_loop *loop,
Expand Down