Skip to content

Commit

Permalink
add Redis7 sharded pubsub support
Browse files Browse the repository at this point in the history
  • Loading branch information
slact committed May 17, 2022
1 parent 11c67c9 commit eb1da51
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 35 deletions.
20 changes: 10 additions & 10 deletions src/store/redis/rdsstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -957,8 +957,7 @@ static void redis_subscriber_callback(redisAsyncContext *c, void *r, void *privd
}
}

else if(CHECK_REPLY_STRVAL(reply->element[0], "subscribe") && CHECK_REPLY_INT(reply->element[2])) {

else if((CHECK_REPLY_STRVAL(reply->element[0], "subscribe") || CHECK_REPLY_STRVAL(reply->element[0], "ssubscribe")) && CHECK_REPLY_INT(reply->element[2])) {
if(chid) {
chanhead = find_chanhead_for_pubsub_callback(chid);
if(chanhead != NULL) {
Expand Down Expand Up @@ -997,7 +996,7 @@ static void redis_subscriber_callback(redisAsyncContext *c, void *r, void *privd

DBG("REDIS: PUB/SUB subscribed to %s (%i total)", reply->element[1]->str, reply->element[2]->integer);
}
else if(CHECK_REPLY_STRVAL(reply->element[0], "unsubscribe") && CHECK_REPLY_INT(reply->element[2])) {
else if((CHECK_REPLY_STRVAL(reply->element[0], "unsubscribe") || CHECK_REPLY_STRVAL(reply->element[0], "sunsubscribe")) && CHECK_REPLY_INT(reply->element[2])) {

if(chid) {
DBG("received UNSUBSCRIBE acknowledgement for channel %V", chid);
Expand Down Expand Up @@ -1358,9 +1357,8 @@ ngx_int_t ensure_chanhead_pubsub_subscribed_if_needed(rdstore_channel_head_t *ch
&& nodeset_ready(ch->redis.nodeset)
) {
pubsub_node = nodeset_node_pubsub_find_by_chanhead(ch);
DBG("SUBSCRIBING to %V{channel:%V}:pubsub", namespace, &ch->id);
ch->pubsub_status = REDIS_PUBSUB_SUBSCRIBING;
redis_subscriber_command(pubsub_node, redis_subscriber_callback, pubsub_node, "SUBSCRIBE %b", STR(&ch->redis.pubsub_id));
redis_subscriber_command(pubsub_node, redis_subscriber_callback, pubsub_node, "%s %b", pubsub_node->nodeset->use_spublish ? "SSUBSCRIBE" : "SUBSCRIBE", STR(&ch->redis.pubsub_id));
}
return NGX_OK;
}
Expand Down Expand Up @@ -1651,7 +1649,7 @@ static ngx_int_t nchan_store_delete_channel_send(redis_nodeset_t *ns, void *pd)
redis_channel_callback_data_t *d = pd;
if(nodeset_ready(ns)) {
redis_node_t *node = nodeset_node_find_by_channel_id(ns, d->channel_id);
nchan_redis_script(delete, node, &redisChannelDeleteCallback, d, d->channel_id, "");
nchan_redis_script(delete, node, &redisChannelDeleteCallback, d, d->channel_id, "%s",ns->use_spublish ? "SPUBLISH" : "PUBLISH");
return NGX_OK;
}
else {
Expand Down Expand Up @@ -2275,9 +2273,10 @@ static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd)
}
redis_command(node, publish_callback, d,
//can't use the prebaked pubsub channel id because we don't have the chanhead here, just its id
"PUBLISH %b{channel:%b}:pubsub "
"%s %b{channel:%b}:pubsub "
"\x9A\xA3msg\xCE%b\xCE%b%b%b%b\xDB%b%b\xD9%b%b\xD9%b%b%b",

nodeset->use_spublish ? "SPUBLISH" : "PUBLISH",
STR(nodeset->settings.namespace),
STR(d->channel_id),

Expand All @@ -2304,10 +2303,10 @@ static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd)

}
else {
//input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff]
//input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, , optimize_target, use_spublish]
//output: message_time, message_tag, channel_hash {ttl, time_last_seen, subscribers, messages}
nchan_redis_script(publish, node, &redisPublishCallback, d, d->channel_id,
"%i %b %b %b %i %i %i %i %i",
"%i %b %b %b %i %i %i %i %i %s",
msg->id.time,
STR(&msgstr),
STR((msg->content_type ? msg->content_type : &empty)),
Expand All @@ -2316,7 +2315,8 @@ static ngx_int_t redis_publish_message_send(redis_nodeset_t *nodeset, void *pd)
d->message_timeout,
d->max_messages,
redis_publish_message_msgkey_size,
nodeset->settings.optimize_target
nodeset->settings.optimize_target,
nodeset->use_spublish ? "SPUBLISH" : "PUBLISH"
);
}
if(mmapped && munmap(msgstr.data, msgstr.len) == -1) {
Expand Down
3 changes: 2 additions & 1 deletion src/store/redis/redis-lua-scripts/delete.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
--input: keys: [], values: [ namespace, channel_id ]
--input: keys: [], values: [ namespace, channel_id, publish_command ]
--output: channel_hash {ttl, time_last_seen, subscribers, messages} or nil
-- delete this channel and all its messages
local ns = ARGV[1]
local id = ARGV[2]
local publish_command = ARGV[3]
local ch = ('%s{channel:%s}'):format(ns, id)
local key_msg= ch..':msg:%s' --not finished yet
local key_channel=ch
Expand Down
6 changes: 4 additions & 2 deletions src/store/redis/redis-lua-scripts/publish.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
--input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression_setting, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, optimize_target]
--input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression_setting, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, optimize_target, use_spublish]
--output: channel_hash {ttl, time_last_subscriber_seen, subscribers, last_message_id, messages}, channel_created_just_now?

local ns, id=ARGV[1], ARGV[2]
Expand All @@ -16,6 +16,7 @@ end
local msgpacked_pubsub_cutoff = tonumber(ARGV[10])

local optimize_target = tonumber(ARGV[11]) == 2 and "bandwidth" or "cpu"
local publish_command = ARGV[12]

local time
if optimize_target == "cpu" and redis.replicate_commands then
Expand Down Expand Up @@ -271,7 +272,8 @@ local msgpacked
--but now that we're subscribing to slaves this is not possible
--so just PUBLISH always.
msgpacked = cmsgpack.pack(unpacked)
redis.call('PUBLISH', channel_pubsub, msgpacked)

redis.call(publish_command, channel_pubsub, msgpacked)

local num_messages = redis.call('llen', key.messages)

Expand Down
4 changes: 2 additions & 2 deletions src/store/redis/redis-lua-scripts/publish_status.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
--input: keys: [], values: [namespace, channel_id, status_code]
--input: keys: [], values: [namespace, channel_id, status_code, publish_command]
--output: current_subscribers

redis.call('echo', ' ####### PUBLISH STATUS ####### ')
Expand All @@ -17,7 +17,7 @@ local chan_key = ch

for k,channel_key in pairs(redis.call('SMEMBERS', subs_key)) do
--not efficient, but useful for a few short-term subscriptions
redis.call('PUBLISH', channel_key, pubmsg)
redis.call(publish_command, channel_key, pubmsg)
end
--clear short-term subscriber list
redis.call('DEL', subs_key)
Expand Down
19 changes: 11 additions & 8 deletions src/store/redis/redis_lua_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ redis_lua_scripts_t redis_lua_scripts = {
" return -1\n"
"end\n"},

{"delete", "a6008241827ea3bdec215b4460e96ad498cad8cb",
"--input: keys: [], values: [ namespace, channel_id ]\n"
{"delete", "121cb1d32fb1b52c3f32cc8b8a3384e99fa10358",
"--input: keys: [], values: [ namespace, channel_id, publish_command ]\n"
"--output: channel_hash {ttl, time_last_seen, subscribers, messages} or nil\n"
"-- delete this channel and all its messages\n"
"local ns = ARGV[1]\n"
"local id = ARGV[2]\n"
"local publish_command = ARGV[3]\n"
"local ch = ('%s{channel:%s}'):format(ns, id)\n"
"local key_msg= ch..':msg:%s' --not finished yet\n"
"local key_channel=ch\n"
Expand Down Expand Up @@ -403,8 +404,8 @@ redis_lua_scripts_t redis_lua_scripts = {
"\n"
"return val\n"},

{"publish", "be28281114137d8e85a25a913c50bed2e93d0c7d",
"--input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression_setting, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, optimize_target]\n"
{"publish", "6b7588a37712d713cee194f947b375403e58a21d",
"--input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression_setting, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, optimize_target, use_spublish]\n"
"--output: channel_hash {ttl, time_last_subscriber_seen, subscribers, last_message_id, messages}, channel_created_just_now?\n"
"\n"
"local ns, id=ARGV[1], ARGV[2]\n"
Expand All @@ -422,6 +423,7 @@ redis_lua_scripts_t redis_lua_scripts = {
"local msgpacked_pubsub_cutoff = tonumber(ARGV[10])\n"
"\n"
"local optimize_target = tonumber(ARGV[11]) == 2 and \"bandwidth\" or \"cpu\"\n"
"local publish_command = ARGV[12]\n"
"\n"
"local time\n"
"if optimize_target == \"cpu\" and redis.replicate_commands then\n"
Expand Down Expand Up @@ -677,7 +679,8 @@ redis_lua_scripts_t redis_lua_scripts = {
"--but now that we're subscribing to slaves this is not possible\n"
"--so just PUBLISH always.\n"
"msgpacked = cmsgpack.pack(unpacked)\n"
"redis.call('PUBLISH', channel_pubsub, msgpacked)\n"
"\n"
"redis.call(publish_command, channel_pubsub, msgpacked)\n"
"\n"
"local num_messages = redis.call('llen', key.messages)\n"
"\n"
Expand All @@ -692,8 +695,8 @@ redis_lua_scripts_t redis_lua_scripts = {
"\n"
"return {ch, new_channel}\n"},

{"publish_status", "2f2ce1443b22c8c9cf069d5588bad4bab58d70aa",
"--input: keys: [], values: [namespace, channel_id, status_code]\n"
{"publish_status", "868da11fde87043c24d82285d0b31adbbdcef406",
"--input: keys: [], values: [namespace, channel_id, status_code, publish_command]\n"
"--output: current_subscribers\n"
"\n"
"redis.call('echo', ' ####### PUBLISH STATUS ####### ')\n"
Expand All @@ -712,7 +715,7 @@ redis_lua_scripts_t redis_lua_scripts = {
"\n"
"for k,channel_key in pairs(redis.call('SMEMBERS', subs_key)) do\n"
" --not efficient, but useful for a few short-term subscriptions\n"
" redis.call('PUBLISH', channel_key, pubmsg)\n"
" redis.call(publish_command, channel_key, pubmsg)\n"
"end\n"
"--clear short-term subscriber list\n"
"redis.call('DEL', subs_key)\n"
Expand Down
8 changes: 4 additions & 4 deletions src/store/redis/redis_lua_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ typedef struct {
//output: seconds until next keepalive is expected, or -1 for "let it disappear"
redis_lua_script_t channel_keepalive;

//input: keys: [], values: [ namespace, channel_id ]
//input: keys: [], values: [ namespace, channel_id, publish_command ]
//output: channel_hash {ttl, time_last_seen, subscribers, messages} or nil
// delete this channel and all its messages
redis_lua_script_t delete;
Expand All @@ -44,11 +44,11 @@ typedef struct {
//output: next_unique_request_id_integer
redis_lua_script_t get_subscriber_info_id;

//input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression_setting, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, optimize_target]
//input: keys: [], values: [namespace, channel_id, time, message, content_type, eventsource_event, compression_setting, msg_ttl, max_msg_buf_size, pubsub_msgpacked_size_cutoff, optimize_target, use_spublish]
//output: channel_hash {ttl, time_last_subscriber_seen, subscribers, last_message_id, messages}, channel_created_just_now?
redis_lua_script_t publish;

//input: keys: [], values: [namespace, channel_id, status_code]
//input: keys: [], values: [namespace, channel_id, status_code, publish_command]
//output: current_subscribers
redis_lua_script_t publish_status;

Expand All @@ -75,7 +75,7 @@ typedef struct {
} redis_lua_scripts_t;
extern redis_lua_scripts_t redis_lua_scripts;
extern const int redis_lua_scripts_count;
#define REDIS_LUA_SCRIPTS_ALL_HASHES "fd2ee193a2518982e607f99623c8200849b21098 7e213d514486887425875dc9835564edfe14e677 a6008241827ea3bdec215b4460e96ad498cad8cb d027f1bc80eb1afc33f909759dfb7428cf1f9063 fb9c46d33b3798a11d4eca6e0f7a3f92beba8685 304efcd42590f99e0016686572530defd3de1383 3490d5bc3fdc7ed065d9d54b4a0cb8ad6b62c180 be28281114137d8e85a25a913c50bed2e93d0c7d 2f2ce1443b22c8c9cf069d5588bad4bab58d70aa 93c500e094dfc5364251854eeac8d4331a0223c0 2fca046fa783d6cc25e493c993c407e59998e6e8 0418d941e41ce9d8cb938860fd340d85c121d4cc a98e07b21485951a7d34cf80736e53db1b6e87a6"
#define REDIS_LUA_SCRIPTS_ALL_HASHES "fd2ee193a2518982e607f99623c8200849b21098 7e213d514486887425875dc9835564edfe14e677 121cb1d32fb1b52c3f32cc8b8a3384e99fa10358 d027f1bc80eb1afc33f909759dfb7428cf1f9063 fb9c46d33b3798a11d4eca6e0f7a3f92beba8685 304efcd42590f99e0016686572530defd3de1383 3490d5bc3fdc7ed065d9d54b4a0cb8ad6b62c180 6b7588a37712d713cee194f947b375403e58a21d 868da11fde87043c24d82285d0b31adbbdcef406 93c500e094dfc5364251854eeac8d4331a0223c0 2fca046fa783d6cc25e493c993c407e59998e6e8 0418d941e41ce9d8cb938860fd340d85c121d4cc a98e07b21485951a7d34cf80736e53db1b6e87a6"
#define REDIS_LUA_SCRIPTS_COUNT 13
#define REDIS_LUA_SCRIPTS_EACH(script) \
for((script)=(redis_lua_script_t *)&redis_lua_scripts; (script) < (redis_lua_script_t *)(&redis_lua_scripts + 1); (script)++)
Expand Down
57 changes: 51 additions & 6 deletions src/store/redis/redis_nodeset.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static int nodeset_recover_cluster(redis_nodeset_t *ns);
static int nodeset_reset_cluster_node_info(redis_nodeset_t *ns);
static void nodeset_cluster_check_event(ngx_event_t *ev);
static void nodeset_check_status_event(ngx_event_t *ev);
static void nodeset_check_spublish_availability(redis_nodeset_t *ns);
static int node_discover_cluster_peer(redis_node_t *node, cluster_nodes_line_t *l, redis_node_t **known_node);
static int node_skip_cluster_peer(redis_node_t *node, cluster_nodes_line_t *l, int log_action);
static int node_set_cluster_slots(redis_node_t *node, cluster_nodes_line_t *l, char *errbuf, size_t max_err_len);
Expand Down Expand Up @@ -338,6 +339,8 @@ redis_nodeset_t *nodeset_create(nchan_loc_conf_t *lcf) {
ns->cluster.current_epoch = 0;
rbtree_init(&ns->cluster.keyslots, "redis cluster node (by keyslot) data", rbtree_cluster_keyslots_node_id, rbtree_cluster_keyslots_bucketer, rbtree_cluster_keyslots_compare);

ns->use_spublish = 0;

//urls
if(rcf->upstream) {
nchan_srv_conf_t *scf = NULL;
Expand Down Expand Up @@ -707,9 +710,7 @@ static void node_ping_event(ngx_event_t *ev) {
if(node->state == REDIS_NODE_READY) {
assert(node->ctx.cmd);

//we used to PUBLISH to the correct keyslot-mapped cluster node
//but Redis clusters don't shard the PUBSUB keyspace, so this discrimination isn't necessary
//just publish the damn thing if this is a master node, and just a PING for slaves
//we want to do this on EVERY node to anounce our presence. definitely don't use SPUBLISH here
if(node->role == REDIS_NODE_ROLE_MASTER) {
redisAsyncCommand(node->ctx.cmd, ping_command_callback, node, "PUBLISH %s ping", redis_worker_id);
}
Expand Down Expand Up @@ -892,6 +893,7 @@ static int nodeset_recover_cluster(redis_nodeset_t *ns) {
redisAsyncCommand(node->ctx.cmd, NULL, NULL, "MULTI");
redisAsyncCommand(node->ctx.cmd, NULL, NULL, "CLUSTER INFO");
redisAsyncCommand(node->ctx.cmd, NULL, NULL, "CLUSTER NODES");
redisAsyncCommand(node->ctx.cmd, NULL, NULL, "COMMAND INFO SPUBLISH");
redisAsyncCommand(node->ctx.cmd, nodeset_recover_cluster_handler, node, "EXEC");

return 1;
Expand All @@ -914,7 +916,7 @@ static void nodeset_recover_cluster_handler(redisAsyncContext *ac, void *rep, vo
ngx_snprintf(errbuf, 1024, "reply not ok%Z");
goto fail;
}
if(reply->type != REDIS_REPLY_ARRAY || reply->elements != 2) {
if(reply->type != REDIS_REPLY_ARRAY || reply->elements != 3) {
ngx_snprintf(errbuf, 1024, "got something other than an array of size 2%Z");
goto fail;
}
Expand Down Expand Up @@ -970,6 +972,7 @@ static void nodeset_recover_cluster_handler(redisAsyncContext *ac, void *rep, vo
if(node_skip_cluster_peer(node, l, 0)) {
continue;
}

if(node_discover_cluster_peer(node, l, &peer)) {
discovered++;
continue;
Expand Down Expand Up @@ -997,6 +1000,9 @@ static void nodeset_recover_cluster_handler(redisAsyncContext *ac, void *rep, vo
//we will set actual master-slave associations later.
}

//does this node have SPUBLISH support?
node->have_spublish = reply->element[2]->type == REDIS_REPLY_ARRAY;

//connect the disconnected
redis_node_t *cur;
for(cur = nchan_list_first(&ns->nodes); cur != NULL; cur = nchan_list_next(cur)) {
Expand Down Expand Up @@ -1085,6 +1091,7 @@ static redis_node_t *nodeset_node_create_with_space(redis_nodeset_t *ns, redis_c
node->discovered = 0;
node->connecting = 0;
node->recovering = 0;
node->have_spublish = 0;
node->connect_timeout = NULL;
node->connect_params = *rcp;
node->connect_params.peername.data = node_blob->peername;
Expand Down Expand Up @@ -2209,6 +2216,7 @@ static void node_connector_callback(redisAsyncContext *ac, void *rep, void *priv
redisAsyncCommand(node->ctx.pubsub, node_connector_callback, node, "INFO SERVER");
node->state++;
break;

case REDIS_NODE_PUBSUB_GETTING_INFO:
if(reply && reply->type == REDIS_REPLY_ERROR && nchan_cstr_startswith(reply->str, "NOAUTH")) {
return node_connector_fail(node, "authentication required");
Expand All @@ -2234,7 +2242,7 @@ static void node_connector_callback(redisAsyncContext *ac, void *rep, void *priv
node->state++;
break;

case REDIS_NODE_SUBSCRIBING_WORKER:
case REDIS_NODE_SUBSCRIBING_WORKER:
if(!reply) {
return node_connector_fail(node, "disconnected while subscribing to worker PUBSUB channel");
}
Expand Down Expand Up @@ -2285,6 +2293,20 @@ static void node_connector_callback(redisAsyncContext *ac, void *rep, void *priv
node->cluster.ok=1;
node->state++;
/* fall through */

case REDIS_NODE_GET_SHARDED_PUBSUB_SUPPORT:
redisAsyncCommand(node->ctx.cmd, node_connector_callback, node, "COMMAND INFO SPUBLISH");
node->state++;
break;

case REDIS_NODE_GETTING_SHARDED_PUBSUB_SUPPORT:
if(reply == NULL || reply->type == REDIS_REPLY_ERROR) {
return node_connector_fail(node, "COMMAND INFO reply not ok");
}
node->have_spublish = reply->element[0]->type == REDIS_REPLY_ARRAY;
node->state++;
/* fall through */

case REDIS_NODE_GET_CLUSTER_NODES:
if(!node->ctx.cmd) {
return node_connector_fail(node, "cmd connection missing, can't send CLUSTER NODES command");
Expand Down Expand Up @@ -2424,7 +2446,7 @@ static void node_make_ready(redis_node_t *node) {
invalid on the server for unknown reasons.
So, the response handler is NULL
*/
node_batch_command_init(&unsub, node, NULL, NULL, 1, "UNSUBSCRIBE");
node_batch_command_init(&unsub, node, NULL, NULL, 1, node->nodeset->use_spublish ? "SUNSUBSCRIBE" : "UNSUBSCRIBE");

int unsubbed_count = 0;
for(cur = nchan_slist_first(pubsub); cur != NULL; cur = next) {
Expand Down Expand Up @@ -2718,6 +2740,7 @@ ngx_int_t nodeset_set_status(redis_nodeset_t *nodeset, redis_nodeset_status_t st
if(nodeset->cluster.enabled) {
nodeset_start_cluster_check_timer(nodeset);
}
nodeset_check_spublish_availability(nodeset);
nodeset->current_reconnect_delay = 0;
nodeset_reconnect_disconnected_channels(nodeset);
nodeset_run_on_ready_callbacks(nodeset);
Expand All @@ -2731,6 +2754,28 @@ ngx_int_t nodeset_set_status(redis_nodeset_t *nodeset, redis_nodeset_status_t st
return NGX_OK;
}

static void nodeset_check_spublish_availability(redis_nodeset_t *ns) {
if(!ns->cluster.enabled) {
//no cluster? we don't care.
return;
}

redis_node_t *cur;

int no_spublish = 0;
for(cur = nchan_list_first(&ns->nodes); cur != NULL; cur = nchan_list_next(cur)) {
if(cur->state == REDIS_NODE_READY && !cur->have_spublish) {
no_spublish ++;
}
}
ns->use_spublish = no_spublish == 0;

if(no_spublish) {
nodeset_log_warning(ns, "This cluster has nodes running Redis version < 7. Nchan is forced to use non-sharded pubsub commands that scale inversely to the cluster size. Upgrade to Redis >= 7 for much better scalability.");
}

}

static void node_find_slaves_callback(redisAsyncContext *ac, void *rep, void *pd) {
redis_node_t *node = pd;
redisReply *reply = rep;
Expand Down
Loading

0 comments on commit eb1da51

Please sign in to comment.