From 43dc4170041643a4a5aa8fa55da5217cc3dca33e Mon Sep 17 00:00:00 2001 From: Stephanie Date: Mon, 19 Mar 2018 22:26:48 -0700 Subject: [PATCH 1/3] Define string prefixes for all tables in the new GCS API --- src/common/redis_module/ray_redis_module.cc | 51 +++++++++++++++------ src/ray/gcs/format/gcs.fbs | 8 ++++ src/ray/gcs/redis_context.cc | 14 +++--- src/ray/gcs/redis_context.h | 4 +- src/ray/gcs/tables.h | 25 ++++++++-- 5 files changed, 74 insertions(+), 28 deletions(-) diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index 7a870ed74e2a..b22679ad35e8 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -49,6 +49,12 @@ return RedisModule_ReplyWithError(ctx, (MESSAGE)); \ } +static const char *table_prefixes[] = { + NULL, "TASK:", "CLIENT:", "OBJECT:", "FUNCTION:", +}; + +// TODO(swang): This helper function should be deprecated by the version below, +// which uses enums for table prefixes. RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix, RedisModuleString *keyname, @@ -61,6 +67,20 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, return key; } +RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, + RedisModuleString *prefix_enum, + RedisModuleString *keyname, + int mode) { + long long prefix_long; + RAY_CHECK(RedisModule_StringToLongLong(prefix_enum, &prefix_long) == + REDISMODULE_OK) + << "Prefix must be a valid TablePrefix"; + auto prefix = static_cast(prefix_long); + RAY_CHECK(prefix != TablePrefix_UNUSED) + << "This table has no prefix registered"; + return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode); +} + /** * This is a helper method to convert a redis module string to a flatbuffer * string. @@ -394,17 +414,18 @@ bool PublishObjectNotification(RedisModuleCtx *ctx, int TableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 4) { + if (argc != 5) { return RedisModule_WrongArity(ctx); } - RedisModuleString *pubsub_channel_str = argv[1]; - RedisModuleString *id = argv[2]; - RedisModuleString *data = argv[3]; + RedisModuleString *prefix_str = argv[1]; + RedisModuleString *pubsub_channel_str = argv[2]; + RedisModuleString *id = argv[3]; + RedisModuleString *data = argv[4]; // Set the keys in the table. - RedisModuleKey *key = - OpenPrefixedKey(ctx, "T:", id, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id, + REDISMODULE_READ | REDISMODULE_WRITE); RedisModule_StringSet(key, data); RedisModule_CloseKey(key); @@ -517,13 +538,14 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx, int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 3) { + if (argc != 4) { return RedisModule_WrongArity(ctx); } - RedisModuleString *id = argv[2]; + RedisModuleString *prefix_str = argv[1]; + RedisModuleString *id = argv[3]; - RedisModuleKey *key = OpenPrefixedKey(ctx, "T:", id, REDISMODULE_READ); + RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ); if (key == nullptr) { return RedisModule_ReplyWithNull(ctx); } @@ -554,14 +576,15 @@ bool is_nil(const std::string &data) { int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 4) { + if (argc != 5) { return RedisModule_WrongArity(ctx); } - RedisModuleString *id = argv[2]; - RedisModuleString *update_data = argv[3]; + RedisModuleString *prefix_str = argv[1]; + RedisModuleString *id = argv[3]; + RedisModuleString *update_data = argv[4]; - RedisModuleKey *key = - OpenPrefixedKey(ctx, "T:", id, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id, + REDISMODULE_READ | REDISMODULE_WRITE); size_t value_len = 0; char *value_buf = RedisModule_StringDMA(key, &value_len, REDISMODULE_READ); diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 386873e331fa..e6483179bf50 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -4,6 +4,14 @@ enum Language:int { JAVA = 2 } +enum TablePrefix:int { + UNUSED = 0, + TASK, + CLIENT, + OBJECT, + FUNCTION +} + // The channel that Add operations to the Table should be published on, if any. enum TablePubsub:int { NO_PUBLISH = 0, diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index d1b3dc94611b..67e6b4aef331 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -161,23 +161,23 @@ Status RedisContext::AttachToEventLoop(aeEventLoop *loop) { } Status RedisContext::RunAsync(const std::string &command, const UniqueID &id, - uint8_t *data, int64_t length, + uint8_t *data, int64_t length, const TablePrefix prefix, const TablePubsub pubsub_channel, int64_t callback_index) { if (length > 0) { - std::string redis_command = command + " %d %b %b"; + std::string redis_command = command + " %d %d %b %b"; int status = redisAsyncCommand( async_context_, reinterpret_cast(&GlobalRedisCallback), - reinterpret_cast(callback_index), redis_command.c_str(), pubsub_channel, - id.data(), id.size(), data, length); + reinterpret_cast(callback_index), redis_command.c_str(), prefix, + pubsub_channel, id.data(), id.size(), data, length); if (status == REDIS_ERR) { return Status::RedisError(std::string(async_context_->errstr)); } } else { - std::string redis_command = command + " %d %b"; + std::string redis_command = command + " %d %d %b"; int status = redisAsyncCommand( async_context_, reinterpret_cast(&GlobalRedisCallback), - reinterpret_cast(callback_index), redis_command.c_str(), pubsub_channel, - id.data(), id.size()); + reinterpret_cast(callback_index), redis_command.c_str(), prefix, + pubsub_channel, id.data(), id.size()); if (status == REDIS_ERR) { return Status::RedisError(std::string(async_context_->errstr)); } diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index c118a1f94a94..af0117d7dbbc 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -51,8 +51,8 @@ class RedisContext { Status Connect(const std::string &address, int port); Status AttachToEventLoop(aeEventLoop *loop); Status RunAsync(const std::string &command, const UniqueID &id, uint8_t *data, - int64_t length, const TablePubsub pubsub_channel, - int64_t callback_index); + int64_t length, const TablePrefix prefix, + const TablePubsub pubsub_channel, int64_t callback_index); Status SubscribeAsync(const ClientID &client_id, const TablePubsub pubsub_channel, int64_t callback_index); redisAsyncContext *async_context() { return async_context_; } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 448aa3b8fd1d..8da11ea7e071 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -47,7 +47,10 @@ class Table { }; Table(const std::shared_ptr &context, AsyncGcsClient *client) - : context_(context), client_(client), pubsub_channel_(TablePubsub_NO_PUBLISH){}; + : context_(context), + client_(client), + pubsub_channel_(TablePubsub_NO_PUBLISH), + prefix_(TablePrefix_UNUSED){}; /// Add an entry to the table. /// @@ -71,7 +74,8 @@ class Table { fbb.ForceDefaults(true); fbb.Finish(Data::Pack(fbb, data.get())); RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_ADD", id, fbb.GetBufferPointer(), - fbb.GetSize(), pubsub_channel_, callback_index)); + fbb.GetSize(), prefix_, pubsub_channel_, + callback_index)); return Status::OK(); } @@ -102,7 +106,7 @@ class Table { }); std::vector nil; RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), nil.size(), - pubsub_channel_, callback_index)); + prefix_, pubsub_channel_, callback_index)); return Status::OK(); } @@ -148,6 +152,7 @@ class Table { std::shared_ptr context_; AsyncGcsClient *client_; TablePubsub pubsub_channel_; + TablePrefix prefix_; }; class ObjectTable : public Table { @@ -155,6 +160,7 @@ class ObjectTable : public Table { ObjectTable(const std::shared_ptr &context, AsyncGcsClient *client) : Table(context, client) { pubsub_channel_ = TablePubsub_OBJECT; + prefix_ = TablePrefix_OBJECT; }; /// Set up a client-specific channel for receiving notifications about @@ -183,7 +189,14 @@ class ObjectTable : public Table { const std::vector &object_ids); }; -using FunctionTable = Table; +class FunctionTable : public Table { + public: + FunctionTable(const std::shared_ptr &context, AsyncGcsClient *client) + : Table(context, client) { + pubsub_channel_ = TablePubsub_NO_PUBLISH; + prefix_ = TablePrefix_FUNCTION; + }; +}; using ClassTable = Table; @@ -195,6 +208,7 @@ class TaskTable : public Table { TaskTable(const std::shared_ptr &context, AsyncGcsClient *client) : Table(context, client) { pubsub_channel_ = TablePubsub_TASK; + prefix_ = TablePrefix_TASK; }; using TestAndUpdateCallback = @@ -230,7 +244,7 @@ class TaskTable : public Table { flatbuffers::FlatBufferBuilder fbb; fbb.Finish(TaskTableTestAndUpdate::Pack(fbb, data.get())); RAY_RETURN_NOT_OK(context_->RunAsync("RAY.TABLE_TEST_AND_UPDATE", id, - fbb.GetBufferPointer(), fbb.GetSize(), + fbb.GetBufferPointer(), fbb.GetSize(), prefix_, pubsub_channel_, callback_index)); return Status::OK(); } @@ -281,6 +295,7 @@ class ClientTable : private Table { client_id_(ClientID::from_binary(local_client.client_id)), local_client_(local_client) { pubsub_channel_ = TablePubsub_CLIENT; + prefix_ = TablePrefix_CLIENT; // Add a nil client to the cache so that we can serve requests for clients // that we have not heard about. From 9a2a6039167d68ab1baa3a871566f58083345f4e Mon Sep 17 00:00:00 2001 From: Stephanie Date: Mon, 19 Mar 2018 22:47:50 -0700 Subject: [PATCH 2/3] Extra check for TablePrefix enum --- src/common/redis_module/ray_redis_module.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/common/redis_module/ray_redis_module.cc b/src/common/redis_module/ray_redis_module.cc index b22679ad35e8..6cb6c96909da 100644 --- a/src/common/redis_module/ray_redis_module.cc +++ b/src/common/redis_module/ray_redis_module.cc @@ -78,6 +78,8 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, auto prefix = static_cast(prefix_long); RAY_CHECK(prefix != TablePrefix_UNUSED) << "This table has no prefix registered"; + RAY_CHECK(prefix >= TablePrefix_MIN && prefix <= TablePrefix_MAX) + << "Prefix must be a valid TablePrefix"; return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode); } From f1a400a872c414dde11f6db1274b78045a629745 Mon Sep 17 00:00:00 2001 From: Stephanie Date: Tue, 20 Mar 2018 15:43:35 -0700 Subject: [PATCH 3/3] Remove unused field and add doc for existing fields --- src/ray/gcs/tables.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 8da11ea7e071..5ac9ad2651fe 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -148,10 +148,15 @@ class Table { Status Remove(const JobID &job_id, const ID &id, const Callback &done); protected: - std::unordered_map, UniqueIDHasher> callback_data_; + /// The connection to the GCS. std::shared_ptr context_; + /// The GCS client. AsyncGcsClient *client_; + /// The pubsub channel to subscribe to for notifications about keys in this + /// table. If no notifications are required, this may be set to + /// TablePubsub_NO_PUBLISH. TablePubsub pubsub_channel_; + /// The prefix to use for keys in this table. TablePrefix prefix_; };