diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index a58d09bea01d..8077376838f5 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -42,12 +42,12 @@ class TestGcs : public ::testing::Test { virtual void Stop() = 0; - int64_t NumCallbacks() const { return num_callbacks_; } + uint64_t NumCallbacks() const { return num_callbacks_; } void IncrementNumCallbacks() { num_callbacks_++; } protected: - int64_t num_callbacks_; + uint64_t num_callbacks_; std::shared_ptr client_; JobID job_id_; }; @@ -91,44 +91,32 @@ class TestGcsWithAsio : public TestGcs { boost::asio::io_service::work work_; }; -void ObjectAdded(gcs::AsyncGcsClient *client, const UniqueID &id, - const ObjectTableDataT &data) { - ASSERT_EQ(data.managers, std::vector({"A", "B"})); -} - -void Lookup(gcs::AsyncGcsClient *client, const UniqueID &id, - const ObjectTableDataT &data) { - // Check that the object entry was added. - ASSERT_EQ(data.managers, std::vector({"A", "B"})); - test->Stop(); -} - -void LookupFailed(gcs::AsyncGcsClient *client, const UniqueID &id) { - // Object entry failed. - RAY_CHECK(false); - test->Stop(); -} - void TestTableLookup(const JobID &job_id, std::shared_ptr client) { TaskID task_id = TaskID::from_random(); auto data = std::make_shared(); data->task_specification = "123"; - auto add_callback = [data](gcs::AsyncGcsClient *client, const UniqueID &id, - const protocol::TaskT &d) { - ASSERT_EQ(data->task_specification, d.task_specification); + // Check that we added the correct task. + auto add_callback = [task_id, data](gcs::AsyncGcsClient *client, const UniqueID &id, + const std::shared_ptr d) { + ASSERT_EQ(id, task_id); + ASSERT_EQ(data->task_specification, d->task_specification); }; - auto lookup_callback = [data](gcs::AsyncGcsClient *client, const UniqueID &id, - const protocol::TaskT &d) { + // Check that the lookup returns the added task. + auto lookup_callback = [task_id, data](gcs::AsyncGcsClient *client, const TaskID &id, + const protocol::TaskT &d) { + ASSERT_EQ(id, task_id); ASSERT_EQ(data->task_specification, d.task_specification); test->Stop(); }; + // Check that the lookup does not return an empty entry. auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) { RAY_CHECK(false); }; + // Add the task, then do a lookup. RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, add_callback)); RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback, failure_callback)); @@ -147,32 +135,89 @@ TEST_F(TestGcsWithAsio, TestTableLookup) { TestTableLookup(job_id_, client_); } -void TestLookupFailure(const JobID &job_id, std::shared_ptr client) { - auto object_id = ObjectID::from_random(); - // Looking up an empty object ID should call the failure callback. - auto failure_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id) { +void TestLogLookup(const JobID &job_id, std::shared_ptr client) { + // Append some entries to the log at an object ID. + ObjectID object_id = ObjectID::from_random(); + std::vector managers = {"abc", "def", "ghi"}; + for (auto &manager : managers) { + auto data = std::make_shared(); + data->manager = manager; + // Check that we added the correct object entries. + auto add_callback = [object_id, data](gcs::AsyncGcsClient *client, const UniqueID &id, + const std::shared_ptr d) { + ASSERT_EQ(id, object_id); + ASSERT_EQ(data->manager, d->manager); + }; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, add_callback)); + } + + // Check that lookup returns the added object entries. + auto lookup_callback = [object_id, managers]( + gcs::AsyncGcsClient *client, const ObjectID &id, + const std::vector &data) { + ASSERT_EQ(id, object_id); + for (const auto &entry : data) { + ASSERT_EQ(entry.manager, managers[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == managers.size()) { + test->Stop(); + } + }; + + // Do a lookup at the object ID. + RAY_CHECK_OK(client->object_table().Lookup(job_id, object_id, lookup_callback)); + // Run the event loop. The loop will only stop if the Lookup callback is + // called (or an assertion failure). + test->Start(); + ASSERT_EQ(test->NumCallbacks(), managers.size()); +} + +TEST_F(TestGcsWithAe, TestLogLookup) { + test = this; + TestLogLookup(job_id_, client_); +} + +TEST_F(TestGcsWithAsio, TestLogLookup) { + test = this; + TestLogLookup(job_id_, client_); +} + +void TestTableLookupFailure(const JobID &job_id, + std::shared_ptr client) { + TaskID task_id = TaskID::from_random(); + + // Check that the lookup does not return data. + auto lookup_callback = [](gcs::AsyncGcsClient *client, const UniqueID &id, + const protocol::TaskT &d) { RAY_CHECK(false); }; + + // Check that the lookup returns an empty entry. + auto failure_callback = [task_id](gcs::AsyncGcsClient *client, const UniqueID &id) { + ASSERT_EQ(id, task_id); test->Stop(); }; - RAY_CHECK_OK( - client->object_table().Lookup(job_id, object_id, nullptr, failure_callback)); + + // Lookup the task. We have not done any writes, so the key should be empty. + RAY_CHECK_OK(client->raylet_task_table().Lookup(job_id, task_id, lookup_callback, + failure_callback)); // Run the event loop. The loop will only stop if the failure callback is - // called. + // called (or an assertion failure). test->Start(); } -TEST_F(TestGcsWithAe, TestLookupFailure) { +TEST_F(TestGcsWithAe, TestTableLookupFailure) { test = this; - TestLookupFailure(job_id_, client_); + TestTableLookupFailure(job_id_, client_); } -TEST_F(TestGcsWithAsio, TestLookupFailure) { +TEST_F(TestGcsWithAsio, TestTableLookupFailure) { test = this; - TestLookupFailure(job_id_, client_); + TestTableLookupFailure(job_id_, client_); } void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id, - const TaskTableDataT &data) { - ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED); + const std::shared_ptr data) { + ASSERT_EQ(data->scheduling_state, SchedulingState_SCHEDULED); } void TaskLookup(gcs::AsyncGcsClient *client, const TaskID &id, @@ -233,191 +278,413 @@ TEST_F(TestGcsWithAsio, TestTaskTable) { TestTaskTable(job_id_, client_); } -void TestSubscribeAll(const JobID &job_id, std::shared_ptr client) { - ObjectID object_id = ObjectID::from_random(); +void TestTableSubscribeAll(const JobID &job_id, + std::shared_ptr client) { + TaskID task_id = TaskID::from_random(); + std::vector task_specs = {"abc", "def", "ghi"}; // Callback for a notification. - auto notification_callback = [object_id]( - gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) { - ASSERT_EQ(id, object_id); - // Check that the object entry was added. - ASSERT_EQ(data.managers, std::vector({"A", "B"})); + auto notification_callback = [task_id, task_specs]( + gcs::AsyncGcsClient *client, const UniqueID &id, const protocol::TaskT &data) { + ASSERT_EQ(id, task_id); + // Check that we get notifications in the same order as the writes. + ASSERT_EQ(data.task_specification, task_specs[test->NumCallbacks()]); test->IncrementNumCallbacks(); - test->Stop(); + if (test->NumCallbacks() == task_specs.size()) { + test->Stop(); + } }; - // Callback for subscription success. This should only be called once. - auto subscribe_callback = [job_id, object_id](gcs::AsyncGcsClient *client) { - test->IncrementNumCallbacks(); - // We have subscribed. Add an object table entry. - auto data = std::make_shared(); - data->managers.push_back("A"); - data->managers.push_back("B"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, &ObjectAdded)); + // Callback for subscription success. We are guaranteed to receive + // notifications after this is called. + auto subscribe_callback = [job_id, task_id, task_specs](gcs::AsyncGcsClient *client) { + // We have subscribed. Do the writes to the table. + for (const auto &task_spec : task_specs) { + auto data = std::make_shared(); + data->task_specification = task_spec; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); + } }; - // Subscribe to all object table notifications. Once we have successfully - // subscribed, we will add an object and check that we get notified of the - // operation. + // Subscribe to all task table notifications. Once we have successfully + // subscribed, we will write the key several times and check that we get + // notified for each. + RAY_CHECK_OK(client->raylet_task_table().Subscribe( + job_id, ClientID::nil(), notification_callback, subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called (or an assertion failure). + test->Start(); + // Check that we received one notification callback for each write. + ASSERT_EQ(test->NumCallbacks(), task_specs.size()); +} + +TEST_F(TestGcsWithAe, TestTableSubscribeAll) { + test = this; + TestTableSubscribeAll(job_id_, client_); +} + +TEST_F(TestGcsWithAsio, TestTableSubscribeAll) { + test = this; + TestTableSubscribeAll(job_id_, client_); +} + +void TestLogSubscribeAll(const JobID &job_id, + std::shared_ptr client) { + std::vector managers = {"abc", "def", "ghi"}; + std::vector object_ids; + for (size_t i = 0; i < managers.size(); i++) { + object_ids.push_back(ObjectID::from_random()); + } + // Callback for a notification. + auto notification_callback = [object_ids, managers]( + gcs::AsyncGcsClient *client, const UniqueID &id, + const std::vector data) { + ASSERT_EQ(id, object_ids[test->NumCallbacks()]); + // Check that we get notifications in the same order as the writes. + for (const auto &entry : data) { + ASSERT_EQ(entry.manager, managers[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == managers.size()) { + test->Stop(); + } + }; + + // Callback for subscription success. We are guaranteed to receive + // notifications after this is called. + auto subscribe_callback = [job_id, object_ids, managers](gcs::AsyncGcsClient *client) { + // We have subscribed. Do the writes to the table. + for (size_t i = 0; i < object_ids.size(); i++) { + auto data = std::make_shared(); + data->manager = managers[i]; + RAY_CHECK_OK(client->object_table().Append(job_id, object_ids[i], data, nullptr)); + } + }; + + // Subscribe to all task table notifications. Once we have successfully + // subscribed, we will append to the key several times and check that we get + // notified for each. RAY_CHECK_OK(client->object_table().Subscribe( job_id, ClientID::nil(), notification_callback, subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription // callback is called (or an assertion failure). test->Start(); - // Check that we received one callback for subscription success and one for - // the Add notification. - ASSERT_EQ(test->NumCallbacks(), 2); + // Check that we received one notification callback for each write. + ASSERT_EQ(test->NumCallbacks(), managers.size()); +} + +TEST_F(TestGcsWithAe, TestLogSubscribeAll) { + test = this; + TestLogSubscribeAll(job_id_, client_); +} + +TEST_F(TestGcsWithAsio, TestLogSubscribeAll) { + test = this; + TestLogSubscribeAll(job_id_, client_); +} + +void TestTableSubscribeId(const JobID &job_id, + std::shared_ptr client) { + // Add a table entry. + TaskID task_id1 = TaskID::from_random(); + std::vector task_specs1 = {"abc", "def", "ghi"}; + auto data1 = std::make_shared(); + data1->task_specification = task_specs1[0]; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data1, nullptr)); + + // Add a table entry at a second key. + TaskID task_id2 = TaskID::from_random(); + std::vector task_specs2 = {"jkl", "mno", "pqr"}; + auto data2 = std::make_shared(); + data2->task_specification = task_specs2[0]; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data2, nullptr)); + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto notification_callback = [task_id2, task_specs2]( + gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { + // Check that we only get notifications for the requested key. + ASSERT_EQ(id, task_id2); + // Check that we get notifications in the same order as the writes. + ASSERT_EQ(data.task_specification, task_specs2[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + if (test->NumCallbacks() == task_specs2.size()) { + test->Stop(); + } + }; + + // The callback for subscription success. Once we've subscribed, request + // notifications for only one of the keys, then write to both keys. + auto subscribe_callback = [job_id, task_id1, task_id2, task_specs1, + task_specs2](gcs::AsyncGcsClient *client) { + // Request notifications for one of the keys. + RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( + job_id, task_id2, client->client_table().GetLocalClientId())); + // Write both keys. We should only receive notifications for the key that + // we requested them for. + auto remaining = std::vector(++task_specs1.begin(), task_specs1.end()); + for (const auto &task_spec : remaining) { + auto data = std::make_shared(); + data->task_specification = task_spec; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id1, data, nullptr)); + } + remaining = std::vector(++task_specs2.begin(), task_specs2.end()); + for (const auto &task_spec : remaining) { + auto data = std::make_shared(); + data->task_specification = task_spec; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id2, data, nullptr)); + } + }; + + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK(client->raylet_task_table().Subscribe( + job_id, client->client_table().GetLocalClientId(), notification_callback, + subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called for the requested key. + test->Start(); + // Check that we received one notification callback for each write to the + // requested key. + ASSERT_EQ(test->NumCallbacks(), task_specs2.size()); } -TEST_F(TestGcsWithAe, TestSubscribeAll) { +TEST_F(TestGcsWithAe, TestTableSubscribeId) { test = this; - TestSubscribeAll(job_id_, client_); + TestTableSubscribeId(job_id_, client_); } -TEST_F(TestGcsWithAsio, TestSubscribeAll) { +TEST_F(TestGcsWithAsio, TestTableSubscribeId) { test = this; - TestSubscribeAll(job_id_, client_); + TestTableSubscribeId(job_id_, client_); } -void TestSubscribeId(const JobID &job_id, std::shared_ptr client) { - // Add an object table entry. +void TestLogSubscribeId(const JobID &job_id, + std::shared_ptr client) { + // Add a log entry. ObjectID object_id1 = ObjectID::from_random(); + std::vector managers1 = {"abc", "def", "ghi"}; auto data1 = std::make_shared(); - data1->managers.push_back("A"); - data1->managers.push_back("B"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id1, data1, nullptr)); + data1->manager = managers1[0]; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id1, data1, nullptr)); - // Add a second object table entry. + // Add a log entry at a second key. ObjectID object_id2 = ObjectID::from_random(); + std::vector managers2 = {"jkl", "mno", "pqr"}; auto data2 = std::make_shared(); - data2->managers.push_back("C"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id2, data2, nullptr)); + data2->manager = managers2[0]; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id2, data2, nullptr)); + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto notification_callback = [object_id2, managers2]( + gcs::AsyncGcsClient *client, const ObjectID &id, + const std::vector &data) { + // Check that we only get notifications for the requested key. + ASSERT_EQ(id, object_id2); + // Check that we get notifications in the same order as the writes. + for (const auto &entry : data) { + ASSERT_EQ(entry.manager, managers2[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == managers2.size()) { + test->Stop(); + } + }; // The callback for subscription success. Once we've subscribed, request - // notifications for the second object that was added. - auto subscribe_callback = [job_id, object_id2](gcs::AsyncGcsClient *client) { - test->IncrementNumCallbacks(); - // Request notifications for the second object. Since we already added the - // entry to the table, we should receive an initial notification for its - // current value. + // notifications for only one of the keys, then write to both keys. + auto subscribe_callback = [job_id, object_id1, object_id2, managers1, + managers2](gcs::AsyncGcsClient *client) { + // Request notifications for one of the keys. RAY_CHECK_OK(client->object_table().RequestNotifications( job_id, object_id2, client->client_table().GetLocalClientId())); - // Overwrite the entry for the object. We should receive a second - // notification for its new value. - auto data = std::make_shared(); - data->managers.push_back("C"); - data->managers.push_back("D"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id2, data, nullptr)); + // Write both keys. We should only receive notifications for the key that + // we requested them for. + auto remaining = std::vector(++managers1.begin(), managers1.end()); + for (const auto &manager : remaining) { + auto data = std::make_shared(); + data->manager = manager; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id1, data, nullptr)); + } + remaining = std::vector(++managers2.begin(), managers2.end()); + for (const auto &manager : remaining) { + auto data = std::make_shared(); + data->manager = manager; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id2, data, nullptr)); + } }; - // The callback for a notification from the object table. This should only be - // received for the object that we requested notifications for. - auto notification_callback = [data2, object_id2]( - gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) { - ASSERT_EQ(id, object_id2); - // Check that we got a notification for the correct object. - ASSERT_EQ(data.managers.front(), "C"); + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK( + client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(), + notification_callback, subscribe_callback)); + // Run the event loop. The loop will only stop if the registered subscription + // callback is called for the requested key. + test->Start(); + // Check that we received one notification callback for each write to the + // requested key. + ASSERT_EQ(test->NumCallbacks(), managers2.size()); +} + +TEST_F(TestGcsWithAe, TestLogSubscribeId) { + test = this; + TestLogSubscribeId(job_id_, client_); +} + +TEST_F(TestGcsWithAsio, TestLogSubscribeId) { + test = this; + TestLogSubscribeId(job_id_, client_); +} + +void TestTableSubscribeCancel(const JobID &job_id, + std::shared_ptr client) { + // Add a table entry. + TaskID task_id = TaskID::from_random(); + std::vector task_specs = {"jkl", "mno", "pqr"}; + auto data = std::make_shared(); + data->task_specification = task_specs[0]; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto notification_callback = [task_id, task_specs]( + gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { + ASSERT_EQ(id, task_id); + // Check that we only get notifications for the first and last writes, + // since notifications are canceled in between. + if (test->NumCallbacks() == 0) { + ASSERT_EQ(data.task_specification, task_specs.front()); + } else { + ASSERT_EQ(data.task_specification, task_specs.back()); + } test->IncrementNumCallbacks(); - // Stop the loop once we've received notifications for both writes to the - // object key. - if (test->NumCallbacks() == 3) { + if (test->NumCallbacks() == 2) { test->Stop(); } }; - RAY_CHECK_OK( - client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(), - notification_callback, subscribe_callback)); + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto subscribe_callback = [job_id, task_id, task_specs](gcs::AsyncGcsClient *client) { + // Request notifications, then cancel immediately. We should receive a + // notification for the current value at the key. + RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( + job_id, task_id, client->client_table().GetLocalClientId())); + RAY_CHECK_OK(client->raylet_task_table().CancelNotifications( + job_id, task_id, client->client_table().GetLocalClientId())); + // Write to the key. Since we canceled notifications, we should not receive + // a notification for these writes. + auto remaining = std::vector(++task_specs.begin(), task_specs.end()); + for (const auto &task_spec : remaining) { + auto data = std::make_shared(); + data->task_specification = task_spec; + RAY_CHECK_OK(client->raylet_task_table().Add(job_id, task_id, data, nullptr)); + } + // Request notifications again. We should receive a notification for the + // current value at the key. + RAY_CHECK_OK(client->raylet_task_table().RequestNotifications( + job_id, task_id, client->client_table().GetLocalClientId())); + }; + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. + RAY_CHECK_OK(client->raylet_task_table().Subscribe( + job_id, client->client_table().GetLocalClientId(), notification_callback, + subscribe_callback)); // Run the event loop. The loop will only stop if the registered subscription - // callback is called for both writes to the object key. + // callback is called for the requested key. test->Start(); - // Check that we received one callback for subscription success and two - // callbacks for the Add notifications. - ASSERT_EQ(test->NumCallbacks(), 3); + // Check that we received a notification callback for the first and least + // writes to the key, since notifications are canceled in between. + ASSERT_EQ(test->NumCallbacks(), 2); } -TEST_F(TestGcsWithAe, TestSubscribeId) { +TEST_F(TestGcsWithAe, TestTableSubscribeCancel) { test = this; - TestSubscribeId(job_id_, client_); + TestTableSubscribeCancel(job_id_, client_); } -TEST_F(TestGcsWithAsio, TestSubscribeId) { +TEST_F(TestGcsWithAsio, TestTableSubscribeCancel) { test = this; - TestSubscribeId(job_id_, client_); + TestTableSubscribeCancel(job_id_, client_); } -void TestSubscribeCancel(const JobID &job_id, - std::shared_ptr client) { - // Write the object table once. +void TestLogSubscribeCancel(const JobID &job_id, + std::shared_ptr client) { + // Add a log entry. ObjectID object_id = ObjectID::from_random(); + std::vector managers = {"jkl", "mno", "pqr"}; auto data = std::make_shared(); - data->managers.push_back("A"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr)); + data->manager = managers[0]; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, nullptr)); - // The callback for subscription success. Once we've subscribed, request - // notifications for the second object that was added. - auto subscribe_callback = [job_id, object_id](gcs::AsyncGcsClient *client) { - test->IncrementNumCallbacks(); - // Request notifications for the object. We should receive a notification - // for the current value at the key. + // The callback for a notification from the object table. This should only be + // received for the object that we requested notifications for. + auto notification_callback = [object_id, managers]( + gcs::AsyncGcsClient *client, const ObjectID &id, + const std::vector &data) { + ASSERT_EQ(id, object_id); + // Check that we get a duplicate notification for the first write. We get a + // duplicate notification because the log is append-only and notifications + // are canceled after the first write, then requested again. + auto managers_copy = managers; + managers_copy.insert(managers_copy.begin(), managers_copy.front()); + for (const auto &entry : data) { + ASSERT_EQ(entry.manager, managers_copy[test->NumCallbacks()]); + test->IncrementNumCallbacks(); + } + if (test->NumCallbacks() == managers_copy.size()) { + test->Stop(); + } + }; + + // The callback for a notification from the table. This should only be + // received for keys that we requested notifications for. + auto subscribe_callback = [job_id, object_id, managers](gcs::AsyncGcsClient *client) { + // Request notifications, then cancel immediately. We should receive a + // notification for the current value at the key. RAY_CHECK_OK(client->object_table().RequestNotifications( job_id, object_id, client->client_table().GetLocalClientId())); - // Cancel notifications. RAY_CHECK_OK(client->object_table().CancelNotifications( job_id, object_id, client->client_table().GetLocalClientId())); - // Write the object table entry twice. Since we canceled notifications, we - // should not get notifications for either of these writes. - auto data = std::make_shared(); - data->managers.push_back("B"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr)); - data = std::make_shared(); - data->managers.push_back("C"); - RAY_CHECK_OK(client->object_table().Add(job_id, object_id, data, nullptr)); - // Request notifications for the object again. We should only receive a - // notification for the current value at the key. + // Append to the key. Since we canceled notifications, we should not + // receive a notification for these writes. + auto remaining = std::vector(++managers.begin(), managers.end()); + for (const auto &manager : remaining) { + auto data = std::make_shared(); + data->manager = manager; + RAY_CHECK_OK(client->object_table().Append(job_id, object_id, data, nullptr)); + } + // Request notifications again. We should receive a notification for the + // current values at the key. RAY_CHECK_OK(client->object_table().RequestNotifications( job_id, object_id, client->client_table().GetLocalClientId())); }; - // The callback for a notification from the object table. - auto notification_callback = [object_id]( - gcs::AsyncGcsClient *client, const UniqueID &id, const ObjectTableDataT &data) { - ASSERT_EQ(id, object_id); - // Check that we only receive notifications for the key when we have - // requested notifications for it. We should not get a notification for the - // entry that began with "B" since we canceled notifications then. - if (test->NumCallbacks() == 1) { - ASSERT_EQ(data.managers.front(), "A"); - } else { - ASSERT_EQ(data.managers.front(), "C"); - } - test->IncrementNumCallbacks(); - if (test->NumCallbacks() == 3) { - test->Stop(); - } - }; - + // Subscribe to notifications for this client. This allows us to request and + // receive notifications for specific keys. RAY_CHECK_OK( client->object_table().Subscribe(job_id, client->client_table().GetLocalClientId(), notification_callback, subscribe_callback)); - // Run the event loop. The loop will only stop if the registered subscription - // callback is called (or an assertion failure). + // callback is called for the requested key. test->Start(); - // Check that we received one callback for subscription success and two - // callbacks for the Add notifications. - ASSERT_EQ(test->NumCallbacks(), 3); + // Check that we received a notification callback for the first append to the + // key, then a notification for all of the appends, because we cancel + // notifications in between. + ASSERT_EQ(test->NumCallbacks(), managers.size() + 1); } -TEST_F(TestGcsWithAe, TestSubscribeCancel) { +TEST_F(TestGcsWithAe, TestLogSubscribeCancel) { test = this; - TestSubscribeCancel(job_id_, client_); + TestLogSubscribeCancel(job_id_, client_); } -TEST_F(TestGcsWithAsio, TestSubscribeCancel) { +TEST_F(TestGcsWithAsio, TestLogSubscribeCancel) { test = this; - TestSubscribeCancel(job_id_, client_); + TestLogSubscribeCancel(job_id_, client_); } void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client_id, diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 2025cc46fd97..c6451e7292a8 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -35,11 +35,9 @@ table FunctionTableData { } table ObjectTableData { - task_id: string; object_size: long; - is_put: bool; - never_created: bool; - managers: [string]; + manager: string; + is_eviction: bool; } enum SchedulingState:int { diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 9b5916318896..434f6a80eacb 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -9,13 +9,13 @@ namespace gcs { template Status Log::Append(const JobID &job_id, const ID &id, - std::shared_ptr data, const Callback &done) { + std::shared_ptr data, const WriteCallback &done) { auto d = std::shared_ptr( - new CallbackData({id, data, done, nullptr, this, client_})); + new CallbackData({id, data, nullptr, nullptr, this, client_})); int64_t callback_index = - RedisCallbackManager::instance().add([d](const std::string &data) { - if (d->callback != nullptr) { - (d->callback)(d->client, d->id, {*d->data}); + RedisCallbackManager::instance().add([d, done](const std::string &data) { + if (done != nullptr) { + (done)(d->client, d->id, d->data); } return true; }); @@ -120,13 +120,13 @@ Status Log::CancelNotifications(const JobID &job_id, const ID &id, template Status Table::Add(const JobID &job_id, const ID &id, - std::shared_ptr data, const Callback &done) { + std::shared_ptr data, const WriteCallback &done) { auto d = std::shared_ptr( - new CallbackData({id, data, done, nullptr, this, client_})); + new CallbackData({id, data, nullptr, nullptr, this, client_})); int64_t callback_index = - RedisCallbackManager::instance().add([d](const std::string &data) { - if (d->callback != nullptr) { - (d->callback)(d->client, d->id, *d->data); + RedisCallbackManager::instance().add([d, done](const std::string &data) { + if (done != nullptr) { + (done)(d->client, d->id, d->data); } return true; }); @@ -232,8 +232,9 @@ void ClientTable::HandleNotification(AsyncGcsClient *client, } } -void ClientTable::HandleConnected(AsyncGcsClient *client, const ClientTableDataT &data) { - auto connected_client_id = ClientID::from_binary(data.client_id); +void ClientTable::HandleConnected(AsyncGcsClient *client, + const std::shared_ptr data) { + auto connected_client_id = ClientID::from_binary(data->client_id); RAY_CHECK(client_id_ == connected_client_id) << connected_client_id.hex() << " " << client_id_.hex(); } @@ -259,10 +260,9 @@ Status ClientTable::Connect() { // Callback to handle our own successful connection once we've added // ourselves. auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key, - const std::vector &data) { + std::shared_ptr data) { RAY_CHECK(log_key == client_log_key_); - RAY_CHECK(data.size() == 1); - HandleConnected(client, data[0]); + HandleConnected(client, data); }; // Callback to add ourselves once we've successfully subscribed. auto subscription_callback = [this, data, add_callback](AsyncGcsClient *c) { @@ -282,9 +282,8 @@ Status ClientTable::Disconnect() { auto data = std::make_shared(local_client_); data->is_insertion = true; auto add_callback = [this](AsyncGcsClient *client, const ClientID &id, - const std::vector &data) { - RAY_CHECK(data.size() == 1); - HandleConnected(client, data[0]); + std::shared_ptr data) { + HandleConnected(client, data); RAY_CHECK_OK(CancelNotifications(JobID::nil(), client_log_key_, id)); }; RAY_RETURN_NOT_OK(Append(JobID::nil(), client_log_key_, data, add_callback)); @@ -306,9 +305,9 @@ const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) { } template class Log; +template class Log; template class Table; template class Table; -template class Table; } // namespace gcs diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 90e650b903d4..a1277c024989 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -41,6 +41,9 @@ class Log { using DataT = typename Data::NativeTableType; using Callback = std::function &data)>; + /// The callback to call when a write to a key succeeds. + using WriteCallback = std::function data)>; /// The callback to call when a SUBSCRIBE call completes and we are ready to /// request and receive notifications. using SubscriptionCallback = std::function; @@ -72,7 +75,7 @@ class Log { /// GCS. /// \return Status Status Append(const JobID &job_id, const ID &id, std::shared_ptr data, - const Callback &done); + const WriteCallback &done); /// Lookup the log values at a key asynchronously. /// @@ -158,6 +161,7 @@ class Table : private Log { using DataT = typename Log::DataT; using Callback = std::function; + using WriteCallback = typename Log::WriteCallback; /// The callback to call when a Lookup call returns an empty entry. using FailureCallback = std::function; /// The callback to call when a Subscribe call completes and we are ready to @@ -190,7 +194,7 @@ class Table : private Log { /// GCS. /// \return Status Status Add(const JobID &job_id, const ID &id, std::shared_ptr data, - const Callback &done); + const WriteCallback &done); /// Lookup an entry asynchronously. /// @@ -214,10 +218,10 @@ class Table : private Log { using Log::prefix_; }; -class ObjectTable : public Table { +class ObjectTable : public Log { public: ObjectTable(const std::shared_ptr &context, AsyncGcsClient *client) - : Table(context, client) { + : Log(context, client) { pubsub_channel_ = TablePubsub_OBJECT; prefix_ = TablePrefix_OBJECT; }; @@ -409,7 +413,8 @@ class ClientTable : private Log { /// Handle a client table notification. void HandleNotification(AsyncGcsClient *client, const ClientTableDataT ¬ifications); /// Handle this client's successful connection to the GCS. - void HandleConnected(AsyncGcsClient *client, const ClientTableDataT ¬ifications); + void HandleConnected(AsyncGcsClient *client, + const std::shared_ptr client_data); /// The key at which the log of client information is stored. This key must /// be kept the same across all instances of the ClientTable, so that all diff --git a/src/ray/gcs/task_table.cc b/src/ray/gcs/task_table.cc index ba36d0cd10e4..a60ab148e732 100644 --- a/src/ray/gcs/task_table.cc +++ b/src/ray/gcs/task_table.cc @@ -44,9 +44,9 @@ Status TaskTableAdd(AsyncGcsClient *gcs_client, Task *task) { TaskSpec *spec = execution_spec.Spec(); auto data = MakeTaskTableData(execution_spec, Task_local_scheduler(task), static_cast(Task_state(task))); - return gcs_client->task_table().Add( - ray::JobID::nil(), TaskSpec_task_id(spec), data, - [](gcs::AsyncGcsClient *client, const TaskID &id, const TaskTableDataT &data) {}); + return gcs_client->task_table().Add(ray::JobID::nil(), TaskSpec_task_id(spec), data, + [](gcs::AsyncGcsClient *client, const TaskID &id, + std::shared_ptr data) {}); } // TODO(pcm): This is a helper method that should go away once we get rid of