From 2ab304e325c3ab57ef00b3b5577d4cc099c7f109 Mon Sep 17 00:00:00 2001 From: Jeremy Woertink Date: Sun, 5 May 2024 15:52:15 -0700 Subject: [PATCH] Big refactor to make the backend registerable and swappable. Ref #88 --- spec/spec_helper.cr | 3 +- src/backend/dev/backend.cr | 4 +-- src/backend/redis/backend.cr | 11 ++++-- src/backend/redis/legacy/backend.cr | 6 ++-- src/cable.cr | 7 ++-- src/cable/backend_core.cr | 35 +++++++++++++++++-- .../{redis_pinger.cr => backend_pinger.cr} | 10 +++--- src/cable/channel.cr | 2 -- src/cable/server.cr | 8 ++--- 9 files changed, 60 insertions(+), 26 deletions(-) rename src/cable/{redis_pinger.cr => backend_pinger.cr} (54%) diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 0bd6f0d..de59fa8 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -12,7 +12,8 @@ require "./support/channels/*" Cable.configure do |settings| settings.route = "/updates" settings.token = "test_token" - settings.redis_ping_interval = 2.seconds + settings.url = "redis://localhost:6379" + settings.backend_ping_interval = 2.seconds settings.restart_error_allowance = 2 settings.on_error = ->(exception : Exception, message : String) do FakeExceptionService.notify(exception, message: message) diff --git a/src/backend/dev/backend.cr b/src/backend/dev/backend.cr index 52c0c88..16e7111 100644 --- a/src/backend/dev/backend.cr +++ b/src/backend/dev/backend.cr @@ -38,10 +38,10 @@ module Cable @@subscriptions.delete(stream_identifier) end - def ping_redis_subscribe + def ping_subscribe_connection end - def ping_redis_publish + def ping_publish_connection end end end diff --git a/src/backend/redis/backend.cr b/src/backend/redis/backend.cr index 00d419b..56705d1 100644 --- a/src/backend/redis/backend.cr +++ b/src/backend/redis/backend.cr @@ -1,5 +1,10 @@ +require "redis" + module Cable class RedisBackend < Cable::BackendCore + register "redis" # redis:// + register "rediss" # rediss:// + # connection management getter redis_subscribe : Redis::Connection = Redis::Connection.new(URI.parse(Cable.settings.url)) getter redis_publish : Redis::Client = Redis::Client.new(URI.parse(Cable.settings.url)) @@ -72,13 +77,13 @@ module Cable # then publish a special channel/message broadcast # the @server.redis_subscribe picks up this special combination # and calls ping on the block loop for us - def ping_redis_subscribe + def ping_subscribe_connection Cable.server.publish(Cable::INTERNAL[:channel], "ping") end - def ping_redis_publish + def ping_publish_connection result = redis_publish.run({"ping"}) - Cable::Logger.debug { "Cable::RedisPinger.ping_redis_publish -> #{result}" } + Cable::Logger.debug { "Cable::BackendPinger.ping_publish_connection -> #{result}" } end end end diff --git a/src/backend/redis/legacy/backend.cr b/src/backend/redis/legacy/backend.cr index b13deec..7847d1a 100644 --- a/src/backend/redis/legacy/backend.cr +++ b/src/backend/redis/legacy/backend.cr @@ -106,15 +106,15 @@ # ping/pong - def ping_redis_subscribe + def ping_subscribe_connection Cable.server.publish("_internal", "ping") end - def ping_redis_publish + def ping_publish_connection request = Redis::Request.new request << "ping" result = redis_subscribe._connection.send(request) - Cable::Logger.debug { "Cable::RedisPinger.ping_redis_publish -> #{result}" } + Cable::Logger.debug { "Cable::BackendPinger.ping_publish_connection -> #{result}" } end end end diff --git a/src/cable.cr b/src/cable.cr index 644a1f7..c9adcf5 100644 --- a/src/cable.cr +++ b/src/cable.cr @@ -1,6 +1,5 @@ require "habitat" require "json" -require "redis" require "./cable/**" # TODO: Write documentation for `Cable` @@ -30,10 +29,10 @@ module Cable Habitat.create do setting route : String = Cable.message(:default_mount_path), example: "/cable" setting token : String = "token", example: "token" - setting url : String = ENV.fetch("REDIS_URL", "redis://localhost:6379"), example: "redis://localhost:6379" + setting url : String = ENV["CABLE_BACKEND_URL"], example: "redis://localhost:6379" setting disable_sec_websocket_protocol_header : Bool = false - setting backend_class : Cable::BackendCore.class = Cable::RedisBackend, example: "Cable::RedisBackend" - setting redis_ping_interval : Time::Span = 15.seconds + setting backend_class : Cable::BackendCore.class = Cable::BackendRegistry, example: "Cable::RedisBackend" + setting backend_ping_interval : Time::Span = 15.seconds setting restart_error_allowance : Int32 = 20 setting on_error : Proc(Exception, String, Nil) = ->(exception : Exception, message : String) do Cable::Logger.error(exception: exception) { message } diff --git a/src/cable/backend_core.cr b/src/cable/backend_core.cr index 9d04b9c..61de5cf 100644 --- a/src/cable/backend_core.cr +++ b/src/cable/backend_core.cr @@ -1,5 +1,9 @@ module Cable abstract class BackendCore + def self.register(uri_scheme : String, backend : BackendCore.class = self) + ::Cable::BackendRegistry.register uri_scheme, backend + end + # connection management abstract def subscribe_connection abstract def publish_connection @@ -17,8 +21,35 @@ module Cable abstract def unsubscribe(stream_identifier : String) # ping/pong + abstract def ping_subscribe_connection + abstract def ping_publish_connection + end + + class BackendRegistry < BackendCore + REGISTERED_BACKENDS = {} of String => BackendCore.class + + def self.register(uri_scheme : String, backend : BackendCore.class = self) + REGISTERED_BACKENDS[uri_scheme] = backend + end + + @backend : BackendCore + + def initialize + @backend = REGISTERED_BACKENDS[URI.parse(::Cable.settings.url).scheme].new + end - abstract def ping_redis_subscribe - abstract def ping_redis_publish + delegate( + subscribe_connection, + publish_connection, + close_subscribe_connection, + close_publish_connection, + open_subscribe_connection, + publish_message, + subscribe, + unsubscribe, + ping_subscribe_connection, + ping_publish_connection, + to: @backend + ) end end diff --git a/src/cable/redis_pinger.cr b/src/cable/backend_pinger.cr similarity index 54% rename from src/cable/redis_pinger.cr rename to src/cable/backend_pinger.cr index b07200d..b447ba6 100644 --- a/src/cable/redis_pinger.cr +++ b/src/cable/backend_pinger.cr @@ -1,16 +1,16 @@ require "tasker" module Cable - class RedisPinger + class BackendPinger private getter task : Tasker::Task def initialize(@server : Cable::Server) - @task = Tasker.every(Cable.settings.redis_ping_interval) do - @server.backend.ping_redis_subscribe - @server.backend.ping_redis_publish + @task = Tasker.every(Cable.settings.backend_ping_interval) do + @server.backend.ping_subscribe_connection + @server.backend.ping_publish_connection rescue e stop - Cable::Logger.error { "Cable::RedisPinger Exception: #{e.class.name} -> #{e.message}" } + Cable::Logger.error { "Cable::BackendPinger Exception: #{e.class.name} -> #{e.message}" } # Restart cable if something happened Cable.server.count_error! Cable.restart if Cable.server.restart? diff --git a/src/cable/channel.cr b/src/cable/channel.cr index 4150e86..ef28e4d 100644 --- a/src/cable/channel.cr +++ b/src/cable/channel.cr @@ -1,7 +1,5 @@ module Cable class Channel - class CloseRedisFiber < Exception; end - CHANNELS = {} of String => Cable::Channel.class macro inherited diff --git a/src/cable/server.cr b/src/cable/server.cr index 7cd9ebe..d7eee1b 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -27,8 +27,8 @@ module Cable getter connections = {} of String => Cable::Connection getter errors = 0 getter fiber_channel = ::Channel({String, String}).new - getter pinger : Cable::RedisPinger do - Cable::RedisPinger.new(self) + getter pinger : Cable::BackendPinger do + Cable::BackendPinger.new(self) end getter backend : Cable::BackendCore do Cable.settings.backend_class.new @@ -112,7 +112,7 @@ module Cable end end - # redis only accepts strings, so we should be strict here + # Some backends only accept strings, so we should be strict here def publish(channel : String, message : String) backend.publish_message(channel, message) end @@ -169,7 +169,7 @@ module Cable backend.close_publish_connection rescue e : IO::Error # the @writer IO is closed already - Cable::Logger.debug { "Cable::Server#shutdown Connection to redis was severed: #{e.message}" } + Cable::Logger.debug { "Cable::Server#shutdown Connection to backend was severed: #{e.message}" } end pinger.stop connections.each do |_k, v|