Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement plasma.wait #12

Merged
merged 6 commits into from
Oct 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions src/plasma/lib/python/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ class PlasmaID(ctypes.Structure):
def make_plasma_id(string):
if len(string) != PLASMA_ID_SIZE:
raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE))
object_id = map(ord, string)
return PlasmaID(plasma_id=ID(*object_id))
return PlasmaID(plasma_id=ID.from_buffer_copy(string))

def plasma_id_to_str(plasma_id):
return str(bytearray(plasma_id.plasma_id))

class PlasmaBuffer(object):
"""This is the type of objects returned by calls to get with a PlasmaClient.
Expand Down Expand Up @@ -217,6 +219,31 @@ def fetch(self, object_ids):
success_array);
return [bool(success) for success in success_array]

def wait(self, object_ids, timeout, num_returns):
"""Wait until num_returns objects in object_ids are ready.

Args:
object_ids (List[str]): List of object IDs to wait for.
timeout (int): Return to the caller after timeout milliseconds.
num_returns (int): We are waiting for this number of objects to be ready.

Returns:
ready_ids, waiting_ids (List[str], List[str]): List of object IDs that
are ready and list of object IDs we might still wait on respectively.
"""
if not self.has_manager_conn:
raise Exception("Not connected to the plasma manager socket")
object_id_array = (len(object_ids) * PlasmaID)()
for i, object_id in enumerate(object_ids):
object_id_array[i] = make_plasma_id(object_id)
return_id_array = (num_returns * PlasmaID)()
num_return_objects = self.client.plasma_wait(self.plasma_conn,
object_id_array._length_,
object_id_array,
timeout, num_returns, return_id_array)
ready_ids = map(plasma_id_to_str, return_id_array[num_returns-num_return_objects:])
return ready_ids, list(set(object_ids) - set(ready_ids))

def subscribe(self):
"""Subscribe to notifications about sealed objects."""
fd = self.client.plasma_subscribe(self.plasma_conn)
Expand Down
14 changes: 12 additions & 2 deletions src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,19 @@ enum plasma_message_type {
PLASMA_DATA,
/** Request a fetch of an object in another store. */
PLASMA_FETCH,
/** Wait until an object becomes available. */
PLASMA_WAIT
};

typedef struct {
/** The size of the object's data. */
int64_t data_size;
/** The size of the object's metadata. */
int64_t metadata_size;
/** The timeout of the request. */
uint64_t timeout;
/** The number of objects we wait for for wait. */
int num_returns;
/** In a transfer request, this is the IP address of the Plasma Manager to
* transfer the object to. */
uint8_t addr[4];
Expand All @@ -82,15 +88,19 @@ typedef struct {
} plasma_request;

typedef struct {
/** The object ID that this reply refers to. */
object_id object_id;
/** The object that is returned with this reply. */
plasma_object object;
/** This is used only to respond to requests of type
* PLASMA_CONTAINS or PLASMA_FETCH. It is 1 if the object is
* present and 0 otherwise. Used for plasma_contains and
* plasma_fetch. */
int has_object;
/** Number of object IDs a wait is returning. */
int num_objects_returned;
/** The number of object IDs that will be included in this reply. */
int num_object_ids;
/** The IDs of the objects that this reply refers to. */
object_id object_ids[1];
} plasma_reply;

#endif
29 changes: 28 additions & 1 deletion src/plasma/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ int plasma_subscribe(plasma_connection *conn) {
* message because otherwise it seems to hang on Linux. */
char dummy = '\0';
send_fd(conn->store_conn, fd[1], &dummy, 1);
close(fd[1]);
/* Return the file descriptor that the client should use to read notifications
* about sealed objects. */
return fd[0];
Expand Down Expand Up @@ -440,10 +441,12 @@ void plasma_fetch(plasma_connection *conn,
CHECK(nbytes == sizeof(reply));
success = reply.has_object;
}
CHECK(reply.num_object_ids == 1);
/* Update the correct index in is_fetched. */
int i = 0;
for (; i < num_object_ids; i++) {
if (memcmp(&object_ids[i], &reply.object_id, sizeof(object_id)) == 0) {
if (memcmp(&object_ids[i], &reply.object_ids[0], sizeof(object_id)) ==
0) {
/* Check that this isn't a duplicate response. */
CHECK(!is_fetched[i]);
is_fetched[i] = success;
Expand All @@ -455,6 +458,30 @@ void plasma_fetch(plasma_connection *conn,
}
}

int plasma_wait(plasma_connection *conn,
int num_object_ids,
object_id object_ids[],
uint64_t timeout,
int num_returns,
object_id return_object_ids[]) {
CHECK(conn->manager_conn >= 0);
plasma_request *req =
make_plasma_multiple_request(num_object_ids, object_ids);
req->num_returns = num_returns;
req->timeout = timeout;
plasma_send_request(conn->manager_conn, PLASMA_WAIT, req);
free(req);
int64_t return_size =
sizeof(plasma_reply) + (num_returns - 1) * sizeof(object_id);
plasma_reply *reply = malloc(return_size);
int nbytes = recv(conn->manager_conn, (uint8_t *) reply, return_size, 0);
CHECK(nbytes == return_size);
memcpy(return_object_ids, reply->object_ids, num_returns * sizeof(object_id));
int num_objects_returned = reply->num_objects_returned;
free(reply);
return num_objects_returned;
}

int get_manager_fd(plasma_connection *conn) {
return conn->manager_conn;
}
22 changes: 22 additions & 0 deletions src/plasma/plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,28 @@ void plasma_fetch(plasma_connection *conn,
object_id object_ids[],
int is_fetched[]);

/**
* Wait for objects to be created (right now, wait for local objects).
*
* @param conn The object containing the connection state.
* @param num_object_ids Number of object IDs wait is called on.
* @param object_ids Object IDs wait is called on.
* @param timeout Wait will time out and return after this number of ms.
* @param num_returns Number of object IDs wait will return if it doesn't time
* out.
* @param return_object_ids Out parameter for the object IDs returned by wait.
* This is an array of size num_returns. If the number of objects that
* are ready when we time out, the objects will be stored in the last
* slots of the array and the number of objects is returned.
* @return Number of objects that are actually ready.
*/
int plasma_wait(plasma_connection *conn,
int num_object_ids,
object_id object_ids[],
uint64_t timeout,
int num_returns,
object_id return_object_ids[]);

/**
* Subscribe to notifications when objects are sealed in the object store.
* Whenever an object is sealed, a message will be written to the client socket
Expand Down
Loading