Skip to content

Add ReplicationClient #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions spec/replication_client_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require "./spec_helper"
require "../src/replication_client"

module Redis
describe ReplicationClient do
describe ".parse_replication_section" do
it "parses the master's replication section" do
section = <<-SECTION
# Replication\r
role:master\r
connected_slaves:2\r
slave0:ip=10.76.3.39,port=6379,state=stable_sync,lag=0\r
slave1:ip=10.76.1.130,port=6379,state=stable_sync,lag=0\r
master_replid:b08ca5082296cf5b2c1de7207f2bc16bb8da3d80\r
SECTION

data = ReplicationClient::Info::Replication.new(section)

data.role.master?.should eq true
data.connected_replicas.should eq 2
data.replicas.should contain ReplicationClient::Info::Replica.new(
ip: "10.76.3.39",
port: 6379,
state: :stable_sync,
lag: 0.seconds,
)
end

it "parses a replica's replication section" do
section = <<-SECTION
# Replication\r
role:replica\r
master_host:10.76.2.33\r
master_port:9999\r
master_link_status:up\r
master_last_io_seconds_ago:0\r
master_sync_in_progress:0\r
SECTION

data = ReplicationClient::Info::Replication.new(section)

data.role.master?.should eq false
data.role.replica?.should eq true
data.master_host.should eq "10.76.2.33"
data.master_port.should eq 9999
data.master_link_status.should eq "up"
data.master_last_io.not_nil!.should be_within 1.seconds, of: Time.utc
data.master_sync_in_progress?.should eq false
end
end
end
end
6 changes: 4 additions & 2 deletions src/client.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require "db/pool"
require "log"

require "./connection"
require "./log"

module Redis
# The Redis client is the expected entrypoint for this shard. By default, it will connect to localhost:6379, but you can also supply a `URI` to connect to an arbitrary Redis server. SSL, password authentication, and DB selection are all supported.
@@ -27,7 +29,7 @@ module Redis

# The client holds a pool of connections that expands and contracts as
# needed.
def initialize(uri : URI = URI.parse(ENV.fetch("REDIS_URL", "redis:///")))
def initialize(uri : URI = URI.parse(ENV.fetch("REDIS_URL", "redis:///")), @log = Log)
# defaults as per https://github.com/crystal-lang/crystal-db/blob/v0.11.0/src/db/pool.cr
initial_pool_size = uri.query_params.fetch("initial_pool_size", 1).to_i
max_pool_size = uri.query_params.fetch("max_pool_size", 0).to_i
@@ -46,7 +48,7 @@ module Redis
retry_attempts: retry_attempts,
retry_delay: retry_delay,
)) do
Connection.new(uri)
Connection.new(uri, log: log)
end
end

57 changes: 1 addition & 56 deletions src/cluster.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# require "./client"
require "./connection"
require "./commands"
require "./read_only_commands"
require "db/pool"
require "set"

@@ -214,62 +215,6 @@ module Redis
each_master(&.run({"flushdb"}))
end

# Add commands here to route them to read-only replicas.
private READ_ONLY_COMMANDS = %w[
dump
echo
eval_ro
evalsha_ro
exists
expiretime
get
getbit
getrange
hexists
hget
hgetall
hkeys
hlen
hmget
hstrlen
hvals
keys
lcs
lindex
llen
lpos
lrange
mget
pttl
randomkey
scard
sdiff
sinter
sintercard
sismember
smembers
smismember
srandmember
strlen
sunion
ttl
type
xlen
xrange
xrevrange
zcard
zcount
zdiff
zinter
zlexcount
zrandmember
zrange
zrangebylex
zrangebyscore
zrank
zrevrangebylex
].to_set

def run(command full_command)
if full_command.empty?
raise ArgumentError.new("Redis commands must have at least one component")
8 changes: 3 additions & 5 deletions src/connection.cr
Original file line number Diff line number Diff line change
@@ -8,23 +8,21 @@ require "./pipeline"
require "./value"
require "./transaction"
require "./writer"
require "./log"

module Redis
# The connection wraps the TCP connection to the Redis server.
class Connection
include Commands

# :nodoc:
LOG = ::Log.for(self)

@socket : TCPSocket | OpenSSL::SSL::Socket::Client

# We receive all connection information in the URI.
#
# SSL connections require specifying the `rediss://` scheme.
# Password authentication uses the URI password.
# DB selection uses the URI path.
def initialize(@uri = URI.parse("redis:///"))
def initialize(@uri = URI.parse("redis:///"), @log = Log)
host = uri.host.presence || "localhost"
port = uri.port || 6379
socket = TCPSocket.new(host, port)
@@ -355,7 +353,7 @@ module Redis
@writer.encode command
flush
result = read
LOG.debug &.emit "redis", command: command[0...2].join(' '), duration_ms: (Time.monotonic - start).total_milliseconds
@log.debug &.emit "redis", command: command[0...2].join(' '), duration_ms: (Time.monotonic - start).total_milliseconds
return result
rescue ex : IO::Error
if retries > 0
6 changes: 6 additions & 0 deletions src/log.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require "log"

module Redis
# Default Redis log
Log = ::Log.for(self)
end
182 changes: 182 additions & 0 deletions src/read_only_commands.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
module Redis
# Commands in this set are routed to replicas by `Redis::Cluster` and
# `Redis::ReplicationClient`.
#
# You can add additional commands that this shard does not yet know about
# (for example, one provided by a custom Redis module) by using the `<<` method:
#
# ```
# Redis::READ_ONLY_COMMANDS << "mymodule.mycommand"
# ```
READ_ONLY_COMMANDS = %w[
bf.card
bf.debug
bf.exists
bf.info
bf.mexists
bf.scandump
bitcount
bitfield_ro
bitpos
cf.compact
cf.count
cf.debug
cf.exists
cf.info
cf.mexists
cf.scandump
cms.info
cms.query
dbsize
dump
eval_ro
evalsha_ro
exists
expiretime
fcall_ro
ft._aliasaddifnx
ft._aliasdelifx
ft._list
ft.aggregate
ft.aliasadd
ft.aliasdel
ft.aliasupdate
ft.config
ft.cursor
ft.debug
ft.dictadd
ft.dictdel
ft.dictdump
ft.explain
ft.explaincli
ft.get
ft.info
ft.mget
ft.profile
ft.search
ft.spellcheck
ft.sugget
ft.suglen
ft.syndump
ft.tagvals
geodist
geohash
geopos
georadius_ro
georadiusbymember_ro
geosearch
get
getbit
getrange
hexists
hget
hgetall
hkeys
hlen
hmget
hrandfield
hscan
hstrlen
hvals
json.arrindex
json.arrlen
json.debug
json.get
json.mget
json.objkeys
json.objlen
json.resp
json.strlen
json.type
keys
lcs
lindex
llen
lolwut
lpos
lrange
mget
pexpiretime
pfcount
pttl
randomkey
redisgears_2.clusterset
redisgears_2.clustersetfromshard
redisgears_2.forceshardsconnection
redisgears_2.hello
redisgears_2.infocluster
redisgears_2.innercommunication
redisgears_2.networktest
redisgears_2.refreshcluster
scan
scard
sdiff
sinter
sintercard
sismember
smembers
smismember
sort_ro
srandmember
sscan
strlen
substr
sunion
tdigest.byrank
tdigest.byrevrank
tdigest.cdf
tdigest.info
tdigest.max
tdigest.min
tdigest.quantile
tdigest.rank
tdigest.revrank
tdigest.trimmed_mean
timeseries.clusterset
timeseries.clustersetfromshard
timeseries.forceshardsconnection
timeseries.hello
timeseries.infocluster
timeseries.innercommunication
timeseries.networktest
timeseries.refreshcluster
topk.info
topk.list
topk.query
touch
ts.get
ts.info
ts.mget
ts.mrange
ts.mrevrange
ts.queryindex
ts.range
ts.revrange
ttl
type
xlen
xpending
xrange
xread
xrevrange
zcard
zcount
zdiff
zinter
zintercard
zlexcount
zmscore
zrandmember
zrange
zrangebylex
zrangebyscore
zrank
zrevrange
zrevrangebylex
zrevrangebyscore
zrevrank
zscan
zscore
zunion
].to_set
end
421 changes: 421 additions & 0 deletions src/replication_client.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,421 @@
require "uri"
require "set"

require "./client"
require "./connection"
require "./read_only_commands"

# If you're using Redis replication, you can use `ReplicationClient` to send
# read commands to replicas and reduce load on the primary. This can be important
# when your Redis primary is CPU-bound.
#
# The commands that will be routed to replicas are listed in
# `Redis::READ_ONLY_COMMANDS`.
#
# NOTE: Redis replication does not provide consistency guarantees. Every
# mechanism in Redis to improve consistency, such as
# [WAIT](https://redis.io/commands/wait/#consistency-and-wait), is best-effort,
# but not guaranteed. If you require strong consistency from Redis, stick to
# using `Redis::Client`. if you require strong consistency but your Redis primary
# is CPU-bound, you may need to either choose between consistency and performance
# or move that workload out of Redis.
#
# This client is useful for operations where strong consistency isn't typically
# needed, such as caching, full-text search with `Redis::FullText#search`,
# querying time-series data with `Redis::TimeSeries#mrange`, checking the current
# state of larger data structures without blocking the primary, etc.
#
# ## Explicitly routing commands to a primary or replica
#
# This class provides `on_primary` and `on_replica` methods to ensure your
# command is routed to the server type you want. This is useful in several
# scenarios:
#
# - you want to ensure you retrieve a value that is consistent with the state of
# the primary server — for example a value that changes frequently and you
# need the canonical state for observability purposes
# - a read-only command is routed to a primary because this client does not yet
# know about it
# - You can add commands to `Redis::READ_ONLY_COMMANDS` in one-off cases
# - Feel free to [open an issue](https://github.com/jgaskins/redis/issues) or
# [pull request](https://github.com/jgaskins/redis/pulls) to add it, as well
#
# ## Topology changes
#
# If the replication topology changes (for example, new replicas are added,
# existing ones removed, or the primary failed over), `ReplicationClient` will
# automatically pick up the changes. You can set how often it checks for these
# changes with the `topology_ttl` argument to the constructor or leave it at its
# default of 10 seconds.
@[Experimental("`ReplicationClient` is currently in alpha testing. There may be rough edges.")]
class Redis::ReplicationClient
include Commands

Log = ::Log.for(self)

@master : Client
@replicas : Array(Client)
@master_uri : URI
@replica_uris : Array(URI)
getter topology_ttl : Time::Span

def self.new
new(entrypoint: URI.parse("redis:///"))
end

# Have the `ReplicationClient` discover the master and replicas on its own
# when given the URI of a single entrypoint. The cluster topology will be
# refreshed with a max staleness of `topology_ttl`.
#
# ```
# redis = Redis::ReplicationClient.new(
# ```
def initialize(entrypoint : URI, topology_ttl : Time::Span = 10.seconds)
connection = Connection.new(entrypoint, log: Log.for("redis.replication_client"))

begin
result = connection.run({"info", "replication"}).as(String)
ensure
connection.close
end

parsed = self.class.parse_replication_section(result)
case parsed.role
in .master?
initialize(
master_uri: entrypoint,
replica_uris: parsed
.replicas
.map do |replica|
entrypoint.dup.tap do |uri|
uri.host = replica.ip
uri.port = replica.port
# TODO: Should we ignore excessively lagged replicas?
end
end
.sort_by!(&.host.not_nil!),
topology_ttl: topology_ttl,
)
in .replica?
initialize(
entrypoint.dup.tap do |uri|
uri.host = parsed.master_host
# Dragonfly seems to report 9999 as a default port?
if parsed.master_port != 9999
uri.port = parsed.master_port
end
end,
topology_ttl: topology_ttl,
)
end
end

# Initialize the client with known master and replica URIs, keeping the
# toplogy up to date with at most `topology_ttl` staleness. If you don't wish
# to keep the replication topology up to date, you can simply set
# `topology_ttl` to `0.seconds`.
def initialize(@master_uri, @replica_uris, @topology_ttl = 10.seconds)
@master = Client.new(@master_uri, log: ::Log.for("redis.primary"))

@replicas = @replica_uris.map do |uri|
Client.new(uri, log: ::Log.for("redis.replica"))
end

if topology_ttl > 0.seconds
spawn do
replication = self.class.parse_replication_section(@master.run({"info", "replication"}).as(String))

until closed?
sleep @topology_ttl
# Check topology and update if needed
new_replication = self.class.parse_replication_section(@master.run({"info", "replication"}).as(String))
if new_replication != replication
Log.info &.emit "Topology is changed, updating Redis::ReplicationClient"
topology_ttl = @topology_ttl
# Avoid re-spawning this fiber
initialize entrypoint: @master_uri, topology_ttl: 0.seconds
@topology_ttl = topology_ttl
replication = new_replication
end
end
end
end
end

Connection.set_return_types!

# :nodoc:
def finalize
close
end

# Close all connections to both the primary and all replicas.
def close
@master.close rescue nil
@replicas.each do |replica|
replica.close rescue nil
end

@closed = true
end

# Returns `true` if this `ReplicationClient` has been explicitly closed,
# `false` otherwise.
getter? closed = false

protected def self.parse_replication_section(text : String)
Info::Replication.new text
end

private module Info
struct Replication
getter role : Role

# Master
# connected_slaves:2\r
# slave0:ip=10.76.3.39,port=6379,state=stable_sync,lag=0\r
# slave1:ip=10.76.1.130,port=6379,state=stable_sync,lag=0\r
# master_replid:b08ca5082296cf5b2c1de7207f2bc16bb8da3d80\r
getter connected_replicas = 0
getter replicas : Array(Info::Replica) { [] of Info::Replica }
getter master_replid : String?

# Replica
# master_host:10.76.2.33\r
# master_port:9999\r
# master_link_status:up\r
# master_last_io_seconds_ago:0\r
# master_sync_in_progress:0\r
getter master_host : String?
getter master_port : Int32?
getter master_link_status : String?
getter master_last_io : Time?
getter? master_sync_in_progress : Bool?

def initialize(text : String)
found_role = false
role = ""

found_connected_replicas = false
connected_replicas = 0

master_replid = ""

text.each_line(chomp: true).with_index do |line, index|
next if line.starts_with? '#'

case line
when .starts_with? "role:"
found_role = true
role = line[5..]
when .starts_with? "connected_slaves:"
@connected_replicas = line["connected_slaves:".bytesize..].to_i
@replicas = Array(Info::Replica).new(initial_capacity: connected_replicas)
when .starts_with? "slave"
if separator_index = line.index(':')
replicas << Replica.new(line[separator_index + 1..])
else
raise ArgumentError.new("Cannot read line: #{line.inspect}")
end
when .starts_with? "master_host:"
@master_host = line["master_host:".bytesize..]
when .starts_with? "master_port:"
@master_port = line["master_port:".bytesize..].to_i
when .starts_with? "master_link_status:"
@master_link_status = line["master_link_status:".bytesize..]
when .starts_with? "master_last_io_seconds_ago:"
@master_last_io = line["master_last_io_seconds_ago:".bytesize..].to_i.seconds.ago
when .starts_with? "master_sync_in_progress:"
# No need to create a substring, we just need to check the last byte
@master_sync_in_progress = line.ends_with? '1'
end
end

if found_role
@role = Role.parse(role)
else
raise ArgumentError.new("Missing role")
end
end

enum Role
Master
Replica
end
end

struct Replica
getter ip : String
getter port : Int32
getter state : State
getter lag : Time::Span

def initialize(text : String)
found_ip = false
ip = ""

found_port = false
port = 0

found_state = false
state : State = :stable_sync

found_lag = false
lag = 0.seconds

token_start = 0
parse_state = ParseState::ReadingKey
key = ""
value = ""
text.size.times do |cursor|
case text[cursor]
when '='
parse_state = ParseState::KVSeparator
when ','
parse_state = ParseState::EntrySeparator
end
if cursor == text.size - 1
parse_state = ParseState::End
end

case parse_state
in .reading_key?
in .reading_value?
in .kv_separator?
key = text[token_start...cursor]
parse_state = ParseState::ReadingValue
token_start = cursor + 1
in .entry_separator?, .end?
value = text[token_start..(parse_state.entry_separator? ? cursor - 1 : cursor)]
parse_state = ParseState::ReadingKey
token_start = cursor + 1

case key
when "ip"
found_ip = true
ip = value
when "port"
found_port = true
port = value.to_i
when "state"
found_state = true
state = State.parse(value)
when "lag"
found_lag = true
lag = value.to_i.seconds
end
end
end

if found_ip && found_port && found_state && found_lag
initialize(ip: ip, port: port, state: state, lag: lag)
else
raise ArgumentError.new("Replica info string must contain ip, port, state, and lag. Received: #{text.inspect}.")
end
end

def initialize(@ip, @port, @state, @lag)
end

def ==(other : self)
ip == other.ip && port == other.port
end

enum State
StableSync
end

private enum ParseState
ReadingKey
ReadingValue
KVSeparator
EntrySeparator
End
end
end
end

def run(command full_command)
if full_command.empty?
raise ArgumentError.new("Redis commands must have at least one component")
end

if READ_ONLY_COMMANDS.includes? full_command[0].downcase
on_replica(&.run full_command)
else
@master.run full_command
end
end

# Route one or more commands to replicas. This should rarely be necessary since
# read-only commands (which can only be executed on replicas) are automatically
# routed to replicas, but if it's a command this shard does not know about (see
# `Redis::READ_ONLY_COMMANDS`) this may be necessary. Alternatively, you can
# shovel additional commands into `Redis::READ_ONLY_COMMANDS` to avoid having to
# perform this explicit routing.
def on_replica
if @replicas.empty?
yield @master
else
yield @replicas.sample
end
end

# Route one or more commands to the primary to avoid consistency issues arising
# from replication latency.
#
# ```
# require "redis/replication_client"
#
# redis = Redis::ReplicationClient.new
#
# redis.incr "counter"
# value = redis.on_primary &.get("counter")
# ```
#
# This is useful for pipelining commands or executing transactions:
#
# ```
# redis.on_primary &.transaction do |txn|
# txn.incr "counter:#{queue}"
# txn.sadd "queues", queue
# txn.lpush "queue:#{queue}", job_data
# end
# ```
#
# … which is shorthand for this and removes the need for nesting blocks:
#
# ```
# redis.on_primary do |primary|
# primary.transaction do |txn|
# txn.incr "counter:#{queue}"
# txn.sadd "queues", queue
# txn.lpush "queue:#{queue}", job_data
# end
# end
# ```
#
# If you need to route many commands to the primary without necessarily
# pipelining or opening transactions, you can omit the `&.transaction` and
# call methods directly on the primary's `Redis::Client` in the block:
#
# ```
# redis.on_primary do |primary|
# counter = primary.incr "counter:#{queue}"
# primary.sadd "queues", queue
# end
# ```
#
# NOTE: The object yielded to the block is a `Redis::Client`, but if you try
# to use it outside the block you may run into errors because the replication
# topology could change, in which case this `Redis::Client` might not be the
# primary anymore.
def on_primary
on_master { |redis| yield redis }
end

# Alias of `on_primary`.
def on_master
yield @master
end
end