Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
76237ba
Add TableRequestNotifications and TableCancelNotifications to Redis m…
stephanie-wang Mar 20, 2018
2f90177
Add RequestNotifications and CancelNotifications to generic GCS Table
stephanie-wang Mar 20, 2018
80e3440
Add tests for subscribing to specific keys
stephanie-wang Mar 20, 2018
b81ad88
Remove TODO!
stephanie-wang Mar 20, 2018
e6b68ff
Return the current value at the key directly from RequestNotification…
stephanie-wang Mar 21, 2018
916dfe3
Add unit test for Lookup failure callback
stephanie-wang Mar 21, 2018
9ae2372
Modify tests to account for empty subscription response
stephanie-wang Mar 21, 2018
a7b9db1
Remove ObjectTable notification methods
stephanie-wang Mar 21, 2018
a3651a4
Clean up message parsing and doc in redis context
stephanie-wang Mar 21, 2018
ce6ed97
Use vectors of DataT in all GCS callbacks
stephanie-wang Mar 21, 2018
f37160d
Clean up SubscriptionCallback
stephanie-wang Mar 21, 2018
dd1e64d
Move Table definitions into tables.cc
stephanie-wang Mar 21, 2018
a787d16
Refactor and document redis modules
stephanie-wang Mar 21, 2018
a8a5e5f
doc
stephanie-wang Mar 21, 2018
6e3e690
Fix new GCS build
stephanie-wang Mar 21, 2018
23aa4a2
Cleanups
stephanie-wang Mar 21, 2018
de54e58
Revert "Fix new GCS build"
stephanie-wang Mar 21, 2018
1add01c
Use vectors for internal callback interface, user-facing interface ta…
stephanie-wang Mar 21, 2018
35c607f
Fix new GCS build
stephanie-wang Mar 21, 2018
5ff6962
Add unit test for Lookup failure callback
stephanie-wang Mar 21, 2018
8ce7d1a
Fix compiler errors
stephanie-wang Mar 21, 2018
b8f80f6
Cleanup
stephanie-wang Mar 22, 2018
c2f4732
Publish the entry ID with the notification
stephanie-wang Mar 22, 2018
57e0653
Check that the ID for a notification matches in client tests
stephanie-wang Mar 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
444 changes: 347 additions & 97 deletions src/common/redis_module/ray_redis_module.cc

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1332,10 +1332,10 @@ void log_object_hash_mismatch_error_result_callback(ObjectID object_id,
RAY_CHECK_OK(state->gcs_client.task_table().Lookup(
ray::JobID::nil(), task_id,
[user_context](gcs::AsyncGcsClient *, const TaskID &,
std::shared_ptr<TaskTableDataT> t) {
const TaskTableDataT &t) {
Task *task = Task_alloc(
t->task_info.data(), t->task_info.size(), t->scheduling_state,
DBClientID::from_binary(t->scheduler_id), std::vector<ObjectID>());
t.task_info.data(), t.task_info.size(), t.scheduling_state,
DBClientID::from_binary(t.scheduler_id), std::vector<ObjectID>());
log_object_hash_mismatch_error_task_callback(task, user_context);
Task_free(task);
},
Expand Down
256 changes: 217 additions & 39 deletions src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static inline void flushall_redis(void) {

class TestGcs : public ::testing::Test {
public:
TestGcs() {
TestGcs() : num_callbacks_(0) {
client_ = std::make_shared<gcs::AsyncGcsClient>();
ClientTableDataT client_info;
client_info.client_id = ClientID::from_random().binary();
Expand All @@ -42,7 +42,12 @@ class TestGcs : public ::testing::Test {

virtual void Stop() = 0;

int64_t NumCallbacks() const { return num_callbacks_; }

void IncrementNumCallbacks() { num_callbacks_++; }

protected:
int64_t num_callbacks_;
std::shared_ptr<gcs::AsyncGcsClient> client_;
JobID job_id_;
};
Expand Down Expand Up @@ -87,14 +92,14 @@ class TestGcsWithAsio : public TestGcs {
};

void ObjectAdded(gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ObjectTableDataT> data) {
ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"}));
const ObjectTableDataT &data) {
ASSERT_EQ(data.managers, std::vector<std::string>({"A", "B"}));
}

void Lookup(gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ObjectTableDataT> data) {
const ObjectTableDataT &data) {
// Check that the object entry was added.
ASSERT_EQ(data->managers, std::vector<std::string>({"A", "B"}));
ASSERT_EQ(data.managers, std::vector<std::string>({"A", "B"}));
test->Stop();
}

Expand Down Expand Up @@ -126,23 +131,46 @@ TEST_F(TestGcsWithAsio, TestObjectTable) {
TestObjectTable(job_id_, client_);
}

void TestLookupFailure(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
auto object_id = ObjectID::from_random();
// Looking up an empty object ID should call the failure callback.
auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) {
test->Stop();
};
RAY_CHECK_OK(
client->object_table().Lookup(job_id, object_id, nullptr, failure_callback));
// Run the event loop. The loop will only stop if the failure callback is
// called.
test->Start();
}

TEST_F(TestGcsWithAe, TestLookupFailure) {
test = this;
TestLookupFailure(job_id_, client_);
}

TEST_F(TestGcsWithAsio, TestLookupFailure) {
test = this;
TestLookupFailure(job_id_, client_);
}

void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED);
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
}

void TaskLookup(gcs::AsyncGcsClient *client, const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED);
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
}

void TaskLookupFailure(gcs::AsyncGcsClient *client, const TaskID &id) {
RAY_CHECK(false);
}

void TaskLookupAfterUpdate(gcs::AsyncGcsClient *client, const TaskID &id,
std::shared_ptr<TaskTableDataT> data) {
ASSERT_EQ(data->scheduling_state, SchedulingState_LOST);
const TaskTableDataT &data) {
ASSERT_EQ(data.scheduling_state, SchedulingState_LOST);
test->Stop();
}

Expand All @@ -153,8 +181,8 @@ void TaskLookupAfterUpdateFailure(gcs::AsyncGcsClient *client, const TaskID &id)

void TaskUpdateCallback(gcs::AsyncGcsClient *client, const TaskID &task_id,
const TaskTableDataT &task, bool updated) {
RAY_CHECK_OK(client->task_table().Lookup(
DriverID::nil(), task_id, &TaskLookupAfterUpdate, &TaskLookupAfterUpdateFailure));
RAY_CHECK_OK(client->task_table().Lookup(DriverID::nil(), task_id,
&TaskLookupAfterUpdate, &TaskLookupFailure));
}

void TestTaskTable(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
Expand Down Expand Up @@ -189,28 +217,40 @@ TEST_F(TestGcsWithAsio, TestTaskTable) {
TestTaskTable(job_id_, client_);
}

void ObjectTableSubscribed(gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ObjectTableDataT> data) {
test->Stop();
}

void TestSubscribeAll(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Subscribe to all object table notifications. The registered callback for
// notifications will check whether the object below is added.
RAY_CHECK_OK(client->object_table().Subscribe(job_id, ClientID::nil(), &Lookup,
&ObjectTableSubscribed));
// Run the event loop. The loop will only stop if the subscription succeeds.
test->Start();

// We have subscribed. Add an object table entry.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("A");
data->managers.push_back("B");
ObjectID object_id = ObjectID::from_random();
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, &ObjectAdded));
// Callback for a notification.
auto notification_callback = [object_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
ASSERT_EQ(id, object_id);
// Check that the object entry was added.
ASSERT_EQ(data.managers, std::vector<std::string>({"A", "B"}));
test->IncrementNumCallbacks();
test->Stop();
};

// Callback for subscription success. This should only be called once.
auto subscribe_callback = [job_id, object_id](gcs::AsyncGcsClient *client) {
test->IncrementNumCallbacks();
// We have subscribed. Add an object table entry.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("A");
data->managers.push_back("B");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, &ObjectAdded));
};

// Subscribe to all object table notifications. Once we have successfully
// subscribed, we will add an object and check that we get notified of the
// operation.
RAY_CHECK_OK(client->object_table().Subscribe(
job_id, ClientID::nil(), notification_callback, subscribe_callback));

// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
// Check that we received one callback for subscription success and one for
// the Add notification.
ASSERT_EQ(test->NumCallbacks(), 2);
}

TEST_F(TestGcsWithAe, TestSubscribeAll) {
Expand All @@ -223,11 +263,152 @@ TEST_F(TestGcsWithAsio, TestSubscribeAll) {
TestSubscribeAll(job_id_, client_);
}

void TestSubscribeId(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
// Add an object table entry.
ObjectID object_id1 = ObjectID::from_random();
auto data1 = std::make_shared<ObjectTableDataT>();
data1->managers.push_back("A");
data1->managers.push_back("B");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id1, data1, nullptr));

// Add a second object table entry.
ObjectID object_id2 = ObjectID::from_random();
auto data2 = std::make_shared<ObjectTableDataT>();
data2->managers.push_back("C");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id2, data2, nullptr));

// The callback for subscription success. Once we've subscribed, request
// notifications for the second object that was added.
auto subscribe_callback = [job_id, object_id2](gcs::AsyncGcsClient *client) {
test->IncrementNumCallbacks();
// Request notifications for the second object. Since we already added the
// entry to the table, we should receive an initial notification for its
// current value.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id2, client->client_table().GetLocalClientId()));
// Overwrite the entry for the object. We should receive a second
// notification for its new value.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("C");
data->managers.push_back("D");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id2, data, nullptr));
};

// The callback for a notification from the object table. This should only be
// received for the object that we requested notifications for.
auto notification_callback = [data2, object_id2](
gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
ASSERT_EQ(id, object_id2);
// Check that we got a notification for the correct object.
ASSERT_EQ(data.managers.front(), "C");
test->IncrementNumCallbacks();
// Stop the loop once we've received notifications for both writes to the
// object key.
if (test->NumCallbacks() == 3) {
test->Stop();
}
};

RAY_CHECK_OK(
client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));

// Run the event loop. The loop will only stop if the registered subscription
// callback is called for both writes to the object key.
test->Start();
// Check that we received one callback for subscription success and two
// callbacks for the Add notifications.
ASSERT_EQ(test->NumCallbacks(), 3);
}

TEST_F(TestGcsWithAe, TestSubscribeId) {
test = this;
TestSubscribeId(job_id_, client_);
}

TEST_F(TestGcsWithAsio, TestSubscribeId) {
test = this;
TestSubscribeId(job_id_, client_);
}

void TestSubscribeCancel(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Write the object table once.
ObjectID object_id = ObjectID::from_random();
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("A");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr));

// The callback for subscription success. Once we've subscribed, request
// notifications for the second object that was added.
auto subscribe_callback = [job_id, object_id](gcs::AsyncGcsClient *client) {
test->IncrementNumCallbacks();
// Request notifications for the object. We should receive a notification
// for the current value at the key.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
// Cancel notifications.
RAY_CHECK_OK(client->object_table().CancelNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
// Write the object table entry twice. Since we canceled notifications, we
// should not get notifications for either of these writes.
auto data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("B");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr));
data = std::make_shared<ObjectTableDataT>();
data->managers.push_back("C");
RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr));
// Request notifications for the object again. We should only receive a
// notification for the current value at the key.
RAY_CHECK_OK(client->object_table().RequestNotifications(
job_id, object_id, client->client_table().GetLocalClientId()));
};

// The callback for a notification from the object table.
auto notification_callback = [object_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) {
ASSERT_EQ(id, object_id);
// Check that we only receive notifications for the key when we have
// requested notifications for it. We should not get a notification for the
// entry that began with "B" since we canceled notifications then.
if (test->NumCallbacks() == 1) {
ASSERT_EQ(data.managers.front(), "A");
} else {
ASSERT_EQ(data.managers.front(), "C");
}
test->IncrementNumCallbacks();
if (test->NumCallbacks() == 3) {
test->Stop();
}
};

RAY_CHECK_OK(
client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(),
notification_callback, subscribe_callback));

// Run the event loop. The loop will only stop if the registered subscription
// callback is called (or an assertion failure).
test->Start();
// Check that we received one callback for subscription success and two
// callbacks for the Add notifications.
ASSERT_EQ(test->NumCallbacks(), 3);
}

TEST_F(TestGcsWithAe, TestSubscribeCancel) {
test = this;
TestSubscribeCancel(job_id_, client_);
}

TEST_F(TestGcsWithAsio, TestSubscribeCancel) {
test = this;
TestSubscribeCancel(job_id_, client_);
}

void ClientTableNotification(gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ClientTableDataT> data, bool is_insertion) {
const ClientTableDataT &data, bool is_insertion) {
ClientID added_id = client->client_table().GetLocalClientId();
ASSERT_EQ(ClientID::from_binary(data->client_id), added_id);
ASSERT_EQ(data->is_insertion, is_insertion);
ASSERT_EQ(ClientID::from_binary(data.client_id), added_id);
ASSERT_EQ(data.is_insertion, is_insertion);

auto cached_client = client->client_table().GetClient(added_id);
ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id);
Expand All @@ -239,8 +420,7 @@ void TestClientTableConnect(const JobID &job_id,
// Register callbacks for when a client gets added and removed. The latter
// event will stop the event loop.
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ClientTableDataT> data) {
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
test->Stop();
});
Expand All @@ -260,13 +440,11 @@ void TestClientTableDisconnect(const JobID &job_id,
// Register callbacks for when a client gets added and removed. The latter
// event will stop the event loop.
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ClientTableDataT> data) {
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
});
client->client_table().RegisterClientRemovedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id,
std::shared_ptr<ClientTableDataT> data) {
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, false);
test->Stop();
});
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ enum TablePubsub:int {
ACTOR
}

table GcsNotification {
id: string;
data: string;
}

table FunctionTableData {
language: Language;
name: string;
Expand Down
Loading