Skip to content

Commit

Permalink
Merge pull request #9 from ray-project/port
Browse files Browse the repository at this point in the history
Unit testing for the plasma manager.
  • Loading branch information
robertnishihara authored Oct 28, 2016
2 parents 5c50d97 + f3a4317 commit c94f961
Show file tree
Hide file tree
Showing 9 changed files with 609 additions and 110 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ matrix:
- sudo apt-get update -qq
- sudo apt-get install -qq valgrind
script:
- cd src/plasma
- make valgrind
- cd ../..

- python src/plasma/test/test.py valgrind

- python src/photon/test/test.py valgrind

install:
Expand Down
17 changes: 14 additions & 3 deletions src/plasma/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L -I. -I../common -I../common/thirdparty
TEST_CFLAGS = -DPLASMA_TEST=1 -I.
BUILD = build

all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example $(BUILD)/libplasma_client.a
Expand All @@ -12,6 +13,9 @@ clean:
cd ../common; make clean
rm -r $(BUILD)/*

$(BUILD)/manager_tests: test/manager_tests.c plasma.h plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common
$(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/manager_tests.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a

$(BUILD)/plasma_store: plasma_store.c plasma.h fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common
$(CC) $(CFLAGS) plasma_store.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store

Expand All @@ -28,12 +32,19 @@ $(BUILD)/example: plasma_client.c plasma.h example.c fling.h fling.c common
$(CC) $(CFLAGS) plasma_client.c example.c fling.c ../common/build/libcommon.a -o $(BUILD)/example

common: FORCE
cd ../common; make
git submodule update --init --recursive
cd ../common; make

# Set the request timeout low for testing purposes.
test: CFLAGS += -DRAY_TIMEOUT=50
test: FORCE
cd ../common; make redis
# First, build and run all the unit tests.
test: $(BUILD)/manager_tests FORCE
./build/manager_tests
cd ../common; make redis
# Next, build all the executables for Python testing.
test: all

valgrind: test
valgrind --leak-check=full --error-exitcode=1 ./build/manager_tests

FORCE:
41 changes: 34 additions & 7 deletions src/plasma/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include "fling.h"
#include "uthash.h"

/* Number of times we try connecting to a socket. */
#define NUM_CONNECT_ATTEMPTS 50

typedef struct {
/** Key that uniquely identifies the memory mapped file. In practice, we
* take the numerical value of the file descriptor in the object store. */
Expand Down Expand Up @@ -311,7 +314,8 @@ plasma_connection *plasma_connect(const char *store_socket_name,
*/
int fd = -1;
int connected_successfully = 0;
for (int num_attempts = 0; num_attempts < 50; ++num_attempts) {
for (int num_attempts = 0; num_attempts < NUM_CONNECT_ATTEMPTS;
++num_attempts) {
fd = connect_ipc_sock(store_socket_name);
if (fd >= 0) {
connected_successfully = 1;
Expand All @@ -330,6 +334,10 @@ plasma_connection *plasma_connect(const char *store_socket_name,
result->store_conn = fd;
if (manager_addr != NULL) {
result->manager_conn = plasma_manager_connect(manager_addr, manager_port);
if (result->manager_conn < 0) {
LOG_ERR("Could not connect to Plasma manager %s:%d", manager_addr,
manager_port);
}
} else {
result->manager_conn = -1;
}
Expand All @@ -348,18 +356,17 @@ void plasma_disconnect(plasma_connection *conn) {

#define h_addr h_addr_list[0]

/* TODO(swang): Return the error to the caller. */
int plasma_manager_connect(const char *ip_addr, int port) {
int plasma_manager_try_connect(const char *ip_addr, int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
LOG_ERR("could not create socket");
exit(-1);
return -1;
}

struct hostent *manager = gethostbyname(ip_addr); /* TODO(pcm): cache this */
if (!manager) {
LOG_ERR("plasma manager %s not found", ip_addr);
exit(-1);
return -1;
}

struct sockaddr_in addr;
Expand All @@ -370,10 +377,26 @@ int plasma_manager_connect(const char *ip_addr, int port) {
int r = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
LOG_ERR(
"could not establish connection to manager with id %s:%d (probably ran "
"could not establish connection to manager with id %s:%d (may have run "
"out of ports)",
&ip_addr[0], port);
exit(-1);
return -1;
}
return fd;
}

int plasma_manager_connect(const char *ip_addr, int port) {
/* Try to connect to the Plasma manager. If unsuccessful, retry several times.
*/
int fd = -1;
for (int num_attempts = 0; num_attempts < NUM_CONNECT_ATTEMPTS;
++num_attempts) {
fd = plasma_manager_try_connect(ip_addr, port);
if (fd >= 0) {
break;
}
/* Sleep for 100 milliseconds. */
usleep(100000);
}
return fd;
}
Expand Down Expand Up @@ -432,3 +455,7 @@ void plasma_fetch(plasma_connection *conn,
"Received unexpected object ID from manager during fetch.");
}
}

int get_manager_fd(plasma_connection *conn) {
return conn->manager_conn;
}
15 changes: 13 additions & 2 deletions src/plasma/plasma_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ plasma_connection *plasma_connect(const char *store_socket_name,
void plasma_disconnect(plasma_connection *conn);

/**
* Connect to a possibly remote Plasma Manager.
* Try to connect to a possibly remote Plasma Manager.
*
* @param addr The IP address of the Plasma Manager to connect to.
* @param port The port of the Plasma Manager to connect to.
* @return The file descriptor to use to send messages to the Plasma Manager.
* @return The file descriptor to use to send messages to the
* Plasma Manager. If connection was unsuccessful, this
* value is -1.
*/
int plasma_manager_connect(const char *addr, int port);

Expand Down Expand Up @@ -195,4 +197,13 @@ void plasma_fetch(plasma_connection *conn,
*/
int plasma_subscribe(plasma_connection *conn);

/**
* Get the file descriptor for the socket connection to the plasma manager.
*
* @param conn The plasma connection.
* @return The file descriptor for the manager connection. If there is no
* connection to the manager, this is -1.
*/
int get_manager_fd(plasma_connection *conn);

#endif
Loading

0 comments on commit c94f961

Please sign in to comment.