Skip to content

Commit 7d043a4

Browse files
author
Chris Rossi
authored
fix: detect cache write failure for MemcacheCache (#665)
Fixes #656
1 parent e84cd4d commit 7d043a4

File tree

4 files changed

+173
-9
lines changed

4 files changed

+173
-9
lines changed

packages/google-cloud-ndb/google/cloud/ndb/_cache.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ class _GlobalCacheSetBatch(_GlobalCacheBatch):
304304
def __init__(self, options):
305305
self.expires = options.get("expires")
306306
self.todo = {}
307-
self.futures = []
307+
self.futures = {}
308308

309309
def add(self, key, value):
310310
"""Add a key, value pair to store in the cache.
@@ -316,11 +316,52 @@ def add(self, key, value):
316316
Returns:
317317
tasklets.Future: Eventual result will be ``None``.
318318
"""
319+
future = self.futures.get(key)
320+
if future:
321+
if self.todo[key] != value:
322+
# I don't think this is likely to happen. I'd like to know about it if
323+
# it does because that might indicate a bad software design.
324+
future = tasklets.Future()
325+
future.set_exception(
326+
RuntimeError(
327+
"Key has already been set in this batch: {}".format(key)
328+
)
329+
)
330+
331+
return future
332+
319333
future = tasklets.Future(info=self.future_info(key, value))
320334
self.todo[key] = value
321-
self.futures.append(future)
335+
self.futures[key] = future
322336
return future
323337

338+
def done_callback(self, cache_call):
339+
"""Process results of call to global cache.
340+
341+
If there is an exception for the cache call, distribute that to waiting
342+
futures, otherwise examine the result of the cache call. If the result is
343+
:data:`None`, simply set the result to :data:`None` for all waiting futures.
344+
Otherwise, if the result is a `dict`, use that to propagate results for
345+
individual keys to waiting figures.
346+
"""
347+
exception = cache_call.exception()
348+
if exception:
349+
for future in self.futures.values():
350+
future.set_exception(exception)
351+
return
352+
353+
result = cache_call.result()
354+
if result:
355+
for key, future in self.futures.items():
356+
key_result = result.get(key, None)
357+
if isinstance(key_result, Exception):
358+
future.set_exception(key_result)
359+
else:
360+
future.set_result(key_result)
361+
else:
362+
for future in self.futures.values():
363+
future.set_result(None)
364+
324365
def make_call(self):
325366
"""Call :method:`GlobalCache.set`."""
326367
return _global_cache().set(self.todo, expires=self.expires)

packages/google-cloud-ndb/google/cloud/ndb/global_cache.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import threading
2525
import time
2626
import uuid
27+
import warnings
2728

2829
import pymemcache
2930
import redis as redis_module
@@ -106,6 +107,12 @@ def set(self, items, expires=None):
106107
items (Dict[bytes, Union[bytes, None]]): Mapping of keys to
107108
serialized entities.
108109
expires (Optional[float]): Number of seconds until value expires.
110+
111+
Returns:
112+
Optional[Dict[bytes, Any]]: May return :data:`None`, or a `dict` mapping
113+
keys to arbitrary results. If the result for a key is an instance of
114+
`Exception`, the result will be raised as an exception in that key's
115+
future.
109116
"""
110117
raise NotImplementedError
111118

@@ -446,9 +453,22 @@ class MemcacheCache(GlobalCache):
446453
errors in the cache layer. Default: :data:`True`.
447454
"""
448455

456+
class KeyNotSet(Exception):
457+
def __init__(self, key):
458+
self.key = key
459+
super(MemcacheCache.KeyNotSet, self).__init__(
460+
"SET operation failed in memcache for key: {}".format(key)
461+
)
462+
463+
def __eq__(self, other):
464+
if isinstance(other, type(self)):
465+
return self.key == other.key
466+
return NotImplemented
467+
449468
transient_errors = (
450469
IOError,
451470
ConnectionError,
471+
KeyNotSet,
452472
pymemcache.exceptions.MemcacheServerError,
453473
pymemcache.exceptions.MemcacheUnexpectedCloseError,
454474
)
@@ -561,9 +581,23 @@ def get(self, keys):
561581

562582
def set(self, items, expires=None):
563583
"""Implements :meth:`GlobalCache.set`."""
564-
items = {self._key(key): value for key, value in items.items()}
565584
expires = expires if expires else 0
566-
self.client.set_many(items, expire=expires)
585+
orig_items = items
586+
items = {}
587+
orig_keys = {}
588+
for orig_key, value in orig_items.items():
589+
key = self._key(orig_key)
590+
orig_keys[key] = orig_key
591+
items[key] = value
592+
593+
unset_keys = self.client.set_many(items, expire=expires, noreply=False)
594+
if unset_keys:
595+
unset_keys = [orig_keys[key] for key in unset_keys]
596+
warnings.warn(
597+
"Keys failed to set in memcache: {}".format(unset_keys),
598+
RuntimeWarning,
599+
)
600+
return {key: MemcacheCache.KeyNotSet(key) for key in unset_keys}
567601

568602
def delete(self, keys):
569603
"""Implements :meth:`GlobalCache.delete`."""

packages/google-cloud-ndb/tests/unit/test__cache.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,17 @@ def test_with_expires(_batch, _global_cache):
344344

345345

346346
class Test_GlobalCacheSetBatch:
347+
@staticmethod
348+
def test_add_duplicate_key_and_value():
349+
batch = _cache._GlobalCacheSetBatch({})
350+
future1 = batch.add(b"foo", b"one")
351+
future2 = batch.add(b"foo", b"one")
352+
assert future1 is future2
353+
347354
@staticmethod
348355
def test_add_and_idle_and_done_callbacks(in_context):
349-
cache = mock.Mock()
356+
cache = mock.Mock(spec=("set",))
357+
cache.set.return_value = []
350358

351359
batch = _cache._GlobalCacheSetBatch({})
352360
future1 = batch.add(b"foo", b"one")
@@ -363,9 +371,29 @@ def test_add_and_idle_and_done_callbacks(in_context):
363371
assert future1.result() is None
364372
assert future2.result() is None
365373

374+
@staticmethod
375+
def test_add_and_idle_and_done_callbacks_with_duplicate_keys(in_context):
376+
cache = mock.Mock(spec=("set",))
377+
cache.set.return_value = []
378+
379+
batch = _cache._GlobalCacheSetBatch({})
380+
future1 = batch.add(b"foo", b"one")
381+
future2 = batch.add(b"foo", b"two")
382+
383+
assert batch.expires is None
384+
385+
with in_context.new(global_cache=cache).use():
386+
batch.idle_callback()
387+
388+
cache.set.assert_called_once_with({b"foo": b"one"}, expires=None)
389+
assert future1.result() is None
390+
with pytest.raises(RuntimeError):
391+
future2.result()
392+
366393
@staticmethod
367394
def test_add_and_idle_and_done_callbacks_with_expires(in_context):
368-
cache = mock.Mock()
395+
cache = mock.Mock(spec=("set",))
396+
cache.set.return_value = []
369397

370398
batch = _cache._GlobalCacheSetBatch({"expires": 5})
371399
future1 = batch.add(b"foo", b"one")
@@ -383,7 +411,8 @@ def test_add_and_idle_and_done_callbacks_with_expires(in_context):
383411
@staticmethod
384412
def test_add_and_idle_and_done_callbacks_w_error(in_context):
385413
error = Exception("spurious error")
386-
cache = mock.Mock()
414+
cache = mock.Mock(spec=("set",))
415+
cache.set.return_value = []
387416
cache.set.return_value = tasklets.Future()
388417
cache.set.return_value.set_exception(error)
389418

@@ -400,6 +429,28 @@ def test_add_and_idle_and_done_callbacks_w_error(in_context):
400429
assert future1.exception() is error
401430
assert future2.exception() is error
402431

432+
@staticmethod
433+
def test_done_callbacks_with_results(in_context):
434+
class SpeciousError(Exception):
435+
pass
436+
437+
cache_call = _future_result(
438+
{
439+
b"foo": "this is a result",
440+
b"bar": SpeciousError("this is also a kind of result"),
441+
}
442+
)
443+
444+
batch = _cache._GlobalCacheSetBatch({})
445+
future1 = batch.add(b"foo", b"one")
446+
future2 = batch.add(b"bar", b"two")
447+
448+
batch.done_callback(cache_call)
449+
450+
assert future1.result() == "this is a result"
451+
with pytest.raises(SpeciousError):
452+
assert future2.result()
453+
403454

404455
@pytest.mark.usefixtures("in_context")
405456
@mock.patch("google.cloud.ndb._cache._global_cache")
@@ -552,7 +603,8 @@ def test_with_expires(_batch, _global_cache):
552603
class Test_GlobalCacheCompareAndSwapBatch:
553604
@staticmethod
554605
def test_add_and_idle_and_done_callbacks(in_context):
555-
cache = mock.Mock()
606+
cache = mock.Mock(spec=("compare_and_swap",))
607+
cache.compare_and_swap.return_value = None
556608

557609
batch = _cache._GlobalCacheCompareAndSwapBatch({})
558610
future1 = batch.add(b"foo", b"one")
@@ -571,7 +623,8 @@ def test_add_and_idle_and_done_callbacks(in_context):
571623

572624
@staticmethod
573625
def test_add_and_idle_and_done_callbacks_with_expires(in_context):
574-
cache = mock.Mock()
626+
cache = mock.Mock(spec=("compare_and_swap",))
627+
cache.compare_and_swap.return_value = None
575628

576629
batch = _cache._GlobalCacheCompareAndSwapBatch({"expires": 5})
577630
future1 = batch.add(b"foo", b"one")

packages/google-cloud-ndb/tests/unit/test_global_cache.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ def test_get():
449449
@staticmethod
450450
def test_set():
451451
client = mock.Mock(spec=("set_many",))
452+
client.set_many.return_value = []
452453
cache = global_cache.MemcacheCache(client)
453454
key1 = cache._key(b"one")
454455
key2 = cache._key(b"two")
@@ -464,11 +465,13 @@ def test_set():
464465
key2: "shoe",
465466
},
466467
expire=0,
468+
noreply=False,
467469
)
468470

469471
@staticmethod
470472
def test_set_w_expires():
471473
client = mock.Mock(spec=("set_many",))
474+
client.set_many.return_value = []
472475
cache = global_cache.MemcacheCache(client)
473476
key1 = cache._key(b"one")
474477
key2 = cache._key(b"two")
@@ -485,8 +488,41 @@ def test_set_w_expires():
485488
key2: "shoe",
486489
},
487490
expire=5,
491+
noreply=False,
488492
)
489493

494+
@staticmethod
495+
def test_set_failed_key():
496+
client = mock.Mock(spec=("set_many",))
497+
cache = global_cache.MemcacheCache(client)
498+
key1 = cache._key(b"one")
499+
key2 = cache._key(b"two")
500+
client.set_many.return_value = [key2]
501+
502+
unset = cache.set(
503+
{
504+
b"one": "bun",
505+
b"two": "shoe",
506+
}
507+
)
508+
assert unset == {b"two": global_cache.MemcacheCache.KeyNotSet(b"two")}
509+
510+
client.set_many.assert_called_once_with(
511+
{
512+
key1: "bun",
513+
key2: "shoe",
514+
},
515+
expire=0,
516+
noreply=False,
517+
)
518+
519+
@staticmethod
520+
def test_KeyNotSet():
521+
unset = global_cache.MemcacheCache.KeyNotSet(b"foo")
522+
assert unset == global_cache.MemcacheCache.KeyNotSet(b"foo")
523+
assert not unset == global_cache.MemcacheCache.KeyNotSet(b"goo")
524+
assert not unset == "hamburger"
525+
490526
@staticmethod
491527
def test_delete():
492528
client = mock.Mock(spec=("delete_many",))

0 commit comments

Comments
 (0)