Skip to content

Commit 7df6677

Browse files
authored
Fix race condition in multilevel cache replication (#232)
* Fix race condition in multilevel cache replication This commit fixes a race condition that is likely to happen when one of the layers for a multilevel cache has to do some networking. The issue arises from the calling `get` to see if a key exists, and then calling `ttl` when replicating the key to other layers. This means that the calls are happening in a non-atomic way, which in turn means that by the time `ttl` gets called the key might have expired already (the race condition). The fix is detecting that the key has expired by checking for `nil` in the `ttl` call result, and in this case not replicating the cached entry to the other layers.
1 parent d47e568 commit 7df6677

File tree

3 files changed

+154
-5
lines changed

3 files changed

+154
-5
lines changed

lib/nebulex/adapters/multilevel.ex

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -616,12 +616,18 @@ defmodule Nebulex.Adapters.Multilevel do
616616
end
617617

618618
defp maybe_replicate({value, [level_meta | [_ | _] = levels]}, key, :inclusive) do
619-
ttl = with_dynamic_cache(level_meta, :ttl, [key]) || :infinity
620-
621619
:ok =
622-
Enum.each(levels, fn l_meta ->
623-
_ = with_dynamic_cache(l_meta, :put, [key, value, [ttl: ttl]])
624-
end)
620+
case with_dynamic_cache(level_meta, :ttl, [key]) do
621+
nil ->
622+
# the cache entry expired between the `get` and `ttl` calls
623+
# don't replicate the entry
624+
:ok
625+
626+
ttl ->
627+
Enum.each(levels, fn l_meta ->
628+
_ = with_dynamic_cache(l_meta, :put, [key, value, [ttl: ttl]])
629+
end)
630+
end
625631

626632
value
627633
end

test/nebulex/adapters/multilevel_inclusive_test.exs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ defmodule Nebulex.Adapters.MultilevelInclusiveTest do
99

1010
alias Nebulex.Adapters.Local.Generation
1111
alias Nebulex.Cache.Cluster
12+
alias Nebulex.TestCache.DelayedReadAdapter
1213
alias Nebulex.TestCache.Multilevel
1314
alias Nebulex.TestCache.Multilevel.{L1, L2, L3}
15+
alias Nebulex.TestCache.MultilevelWithDelay
1416

1517
@gc_interval :timer.hours(1)
1618

@@ -155,6 +157,39 @@ defmodule Nebulex.Adapters.MultilevelInclusiveTest do
155157
end
156158
end
157159

160+
describe "delayed multilevel" do
161+
setup_with_dynamic_cache(MultilevelWithDelay, :multilevel_inclusive_with_delay,
162+
model: :inclusive,
163+
levels: [
164+
{MultilevelWithDelay.L1,
165+
name: :multilevel_inclusive_with_delay_l1,
166+
gc_interval: @gc_interval,
167+
backend: :shards,
168+
partitions: 2},
169+
{MultilevelWithDelay.L2,
170+
name: :multilevel_inclusive_with_delay_l2,
171+
gc_interval: @gc_interval,
172+
backend: :shards,
173+
partitions: 2}
174+
]
175+
)
176+
177+
test "does not replicate the data if the cache expires during replication" do
178+
# reading from L2 will take 500ms
179+
DelayedReadAdapter.put_read_delay(500)
180+
181+
# since we call both `get` and `ttl` the total read time will be 1000ms
182+
:ok = MultilevelWithDelay.put(:key, :data, ttl: 700, level: 2)
183+
184+
# the key should expire between the `get` and `tl` calls, so the data
185+
# should be returned but not replicated
186+
assert MultilevelWithDelay.get(:key) == :data
187+
assert MultilevelWithDelay.get(:key, level: 1) == nil
188+
189+
assert MultilevelWithDelay.ttl(:key) == nil
190+
end
191+
end
192+
158193
describe "distributed levels" do
159194
test "return cluster nodes" do
160195
assert Cluster.get_nodes(:multilevel_inclusive_l2) == [node()]

test/support/test_cache.ex

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,114 @@ defmodule Nebulex.TestCache do
113113
end
114114
end
115115

116+
defmodule DelayedReadAdapter do
117+
@moduledoc false
118+
119+
require Nebulex.Adapters.Local
120+
121+
@behaviour Nebulex.Adapter
122+
@behaviour Nebulex.Adapter.Entry
123+
@behaviour Nebulex.Adapter.Queryable
124+
125+
@fallback_adapter Nebulex.Adapters.Local
126+
127+
@impl true
128+
defmacro __before_compile__(opts) do
129+
quote do
130+
require unquote(@fallback_adapter)
131+
132+
unquote(@fallback_adapter).__before_compile__(unquote(Macro.escape(opts)))
133+
end
134+
end
135+
136+
@impl true
137+
defdelegate init(opts), to: @fallback_adapter
138+
139+
@impl true
140+
def get(adapter_meta, key, opts) do
141+
delay()
142+
@fallback_adapter.get(adapter_meta, key, opts)
143+
end
144+
145+
@impl true
146+
def get_all(adapter_meta, list, opts) do
147+
delay()
148+
@fallback_adapter.get_all(adapter_meta, list, opts)
149+
end
150+
151+
@impl true
152+
defdelegate put(adapter_meta, key, value, ttl, on_write, opts), to: @fallback_adapter
153+
154+
@impl true
155+
defdelegate put_all(adapter_meta, entries, ttl, on_write, opts), to: @fallback_adapter
156+
157+
@impl true
158+
defdelegate delete(adapter_meta, key, opts), to: @fallback_adapter
159+
160+
@impl true
161+
defdelegate take(adapter_meta, key, opts), to: @fallback_adapter
162+
163+
@impl true
164+
def has_key?(adapter_meta, key) do
165+
delay()
166+
@fallback_adapter.has_key?(adapter_meta, key)
167+
end
168+
169+
@impl true
170+
def ttl(adapter_meta, key) do
171+
delay()
172+
@fallback_adapter.ttl(adapter_meta, key)
173+
end
174+
175+
@impl true
176+
defdelegate expire(adapter_meta, key, ttl), to: @fallback_adapter
177+
178+
@impl true
179+
defdelegate touch(adapter_meta, key), to: @fallback_adapter
180+
181+
@impl true
182+
defdelegate update_counter(adapter_meta, key, amount, ttl, default, opts), to: @fallback_adapter
183+
184+
@impl true
185+
defdelegate execute(adapter_meta, command, args, opts), to: @fallback_adapter
186+
187+
@impl true
188+
defdelegate stream(adapter_meta, query, opts), to: @fallback_adapter
189+
190+
@read_delay_key {__MODULE__, :read_delay}
191+
192+
def put_read_delay(delay) when is_integer(delay) do
193+
Process.put(@read_delay_key, delay)
194+
end
195+
196+
defp delay do
197+
delay = Process.get(@read_delay_key, 1000)
198+
199+
Process.sleep(delay)
200+
end
201+
end
202+
203+
defmodule MultilevelWithDelay do
204+
@moduledoc false
205+
use Nebulex.Cache,
206+
otp_app: :nebulex,
207+
adapter: Nebulex.Adapters.Multilevel
208+
209+
defmodule L1 do
210+
@moduledoc false
211+
use Nebulex.Cache,
212+
otp_app: :nebulex,
213+
adapter: Nebulex.Adapters.Local
214+
end
215+
216+
defmodule L2 do
217+
@moduledoc false
218+
use Nebulex.Cache,
219+
otp_app: :nebulex,
220+
adapter: Nebulex.TestCache.DelayedReadAdapter
221+
end
222+
end
223+
116224
## Mocks
117225

118226
defmodule AdapterMock do

0 commit comments

Comments
 (0)