Skip to content

Commit

Permalink
AWS Elasticache autodiscovery support (#572)
Browse files Browse the repository at this point in the history
* Add AWS ElastiCache client for pymemcache

A new client, AWSElastiCacheHashClient, was added to pymemcache to support AWS ElastiCache interactions. This client uses hashed key distribution and communicates with clusters running Amazon ElastiCache version 1.4.14 or higher. It retrieves its configuration from the AWS ElastiCache cluster itself.

* Refactor and enhance AWS ElastiCache client for pymemcache

* Require port on endpoint validation

* Add __init__.py

* Add unit tests for AWSElastiCacheHashClient

* Add kwargs parameter to MockMemcacheClient to allows use socket_keepalive or other params which might be called on mocking HashClient.

* Apply black formatting
  • Loading branch information
ityshchenko authored Feb 6, 2025
1 parent 2bf6507 commit 760088e
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 0 deletions.
Empty file.
205 changes: 205 additions & 0 deletions pymemcache/client/ext/aws_ec_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import logging
import operator
import socket
import re
import time

from pymemcache import MemcacheUnknownCommandError
from pymemcache.client import Client
from pymemcache.client.base import normalize_server_spec
from pymemcache.client.hash import HashClient
from pymemcache.client.rendezvous import RendezvousHash


logger = logging.getLogger(__name__)

_RE_AWS_ENDPOINT = re.compile(
r"^(?:(?:[\w\d-]{0,61}[\w\d]\.)+[\w]{1,6}|\[(?:[\d]{1,3}\.){3}[\d]{1,3}\])\:\d{1,5}$"
)


class AWSElastiCacheHashClient(HashClient):
"""
This class is a subclass of HashClient and represents a client for interacting with an AWS ElastiCache cluster
using a hash-based algorithm for key distribution.
*Connection *
Supports version 1.4.14 or higher
Example:
>>> client = AWSElastiCacheServerlessClient('cluster.abcxyz.cfg.use1.cache.amazonaws.com')
"""

def __init__(
self,
cfg_node: object,
hasher: object = RendezvousHash,
serde: object = None,
serializer: object = None,
deserializer: object = None,
connect_timeout: object = None,
timeout: object = None,
no_delay: object = False,
socket_module: object = socket,
socket_keepalive: object = None,
key_prefix: object = b"",
max_pool_size: object = None,
pool_idle_timeout: object = 0,
lock_generator: object = None,
retry_attempts: object = 2,
retry_timeout: object = 1,
dead_timeout: object = 60,
use_pooling: object = False,
ignore_exc: object = False,
allow_unicode_keys: object = False,
default_noreply: object = True,
encoding: object = "ascii",
tls_context: object = None,
use_vpc: object = True,
) -> object:
"""
Constructor.
Args:
cfg_node: formatted string containing endpoint and port of the
ElastiCache cluster endpoint. Ex.:
`test-cluster.2os1zk.cfg.use1.cache.amazonaws.com:11211`
serde: optional serializer object, see notes in the class docs.
serializer: deprecated serialization function
deserializer: deprecated deserialization function
connect_timeout: optional float, seconds to wait for a connection to
the memcached server. Defaults to "forever" (uses the underlying
default socket timeout, which can be very long).
timeout: optional float, seconds to wait for send or recv calls on
the socket connected to memcached. Defaults to "forever" (uses the
underlying default socket timeout, which can be very long).
no_delay: optional bool, set the TCP_NODELAY flag, which may help
with performance in some cases. Defaults to False.
ignore_exc: optional bool, True to cause the "get", "gets",
"get_many" and "gets_many" calls to treat any errors as cache
misses. Defaults to False.
socket_module: socket module to use, e.g. gevent.socket. Defaults to
the standard library's socket module.
socket_keepalive: Activate the socket keepalive feature by passing
a KeepaliveOpts structure in this parameter. Disabled by default
(None). This feature is only supported on Linux platforms.
key_prefix: Prefix of key. You can use this as namespace. Defaults
to b''.
default_noreply: bool, the default value for 'noreply' as passed to
store commands (except from cas, incr, and decr, which default to
False).
allow_unicode_keys: bool, support unicode (utf8) keys
encoding: optional str, controls data encoding (defaults to 'ascii').
use_vpc: optional bool, if set False (defaults to True), the client
will use FQDN to connect to nodes instead of IP addresses. See
AWS Docs for extra info
https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/ClientConfig.DNS.html
Notes:
The constructor does not make a connection to memcached. The first
call to a method on the object will do that.
"""
if not (_RE_AWS_ENDPOINT.fullmatch(cfg_node) and isinstance(cfg_node, str)):
raise ValueError("Invalid AWS ElastiCache endpoint value '%s'" % cfg_node)

self._cfg_node = cfg_node
self.clients = {}
self.retry_attempts = retry_attempts
self.retry_timeout = retry_timeout
self.dead_timeout = dead_timeout
self.use_pooling = use_pooling
self.key_prefix = key_prefix
self.ignore_exc = ignore_exc
self.allow_unicode_keys = allow_unicode_keys
self._failed_clients = {}
self._dead_clients = {}
self._last_dead_check_time = time.time()

self.hasher = hasher()

self.default_kwargs = {
"connect_timeout": connect_timeout,
"timeout": timeout,
"no_delay": no_delay,
"socket_module": socket_module,
"socket_keepalive": socket_keepalive,
"key_prefix": key_prefix,
"serde": serde,
"serializer": serializer,
"deserializer": deserializer,
"allow_unicode_keys": allow_unicode_keys,
"default_noreply": default_noreply,
"encoding": encoding,
"tls_context": tls_context,
}

if use_pooling is True:
self.default_kwargs.update(
{
"max_pool_size": max_pool_size,
"pool_idle_timeout": pool_idle_timeout,
"lock_generator": lock_generator,
}
)

# server config returns as `[fqdn, ip, port]` if it's VPC installation you need to use ip
self._use_vpc = int(use_vpc)

self.reconfigure_nodes()

self.encoding = encoding
self.tls_context = tls_context

def reconfigure_nodes(self):
"""
Reconfigures the nodes in the server cluster based on the provided configuration node.
May useful on error handling during cluster scale down or scale up
"""
old_clients = self.clients.copy()
self.clients.clear()

for server in self._get_nodes_list():
self.add_server(normalize_server_spec(server))

for client in old_clients.values():
client.close()

def _get_nodes_list(self) -> list[tuple[str, int]]:
"""
Get the list of nodes from the cluster configuration.
Returns:
A list of tuples containing the address and port of each node in the cluster.
Each tuple has the format (address: str, port: int).
"""
addr, port = self._cfg_node.rsplit(":", maxsplit=1)
client = Client((addr, port), **self.default_kwargs)

# https://docs.aws.amazon.com/AmazonElastiCache/latest/mem-ug/AutoDiscovery.AddingToYourClientLibrary.html
try:
*_, config_line = client.raw_command(
b"config get cluster",
end_tokens=b"\n\r\nEND\r\n",
).splitlines()
except MemcacheUnknownCommandError:
logger.exception(
"Can't retrieve cluster configuration from '%s:%s' "
"Seems like it is ElastiCache Serverless or even isn't ElastiCache at all.",
client.server,
)
finally:
client.close()

servers = [
(server[self._use_vpc], server[2])
for server in map(
operator.methodcaller("split", "|"),
config_line.decode().split(" "),
)
]

logger.debug("Got the next nodes from cluster config: %s", servers)

return servers
84 changes: 84 additions & 0 deletions pymemcache/test/test_ext_aws_ec_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pytest

from unittest.mock import MagicMock, patch

from pymemcache import Client
from pymemcache.client.ext.aws_ec_client import AWSElastiCacheHashClient

from .test_client import MockSocketModule, MockMemcacheClient


@pytest.mark.unit()
@pytest.mark.parametrize(
"connection_sting",
["cluster.abcxyz.cfg.use1.cache.amazonaws.com:11211", "1.1.1.1:11211"],
)
def test_init_valid_node_endpoint(connection_sting, monkeypatch):
with patch.object(
AWSElastiCacheHashClient, "reconfigure_nodes", new=MagicMock()
) as mock:
client = AWSElastiCacheHashClient(
connection_sting, socket_module=MockSocketModule()
)

assert client._cfg_node == connection_sting
assert mock.called


@pytest.mark.unit()
@pytest.mark.parametrize(
"connection_sting",
[
"cluster.abcxyz.cfg.use1.cache.amazonaws.com:abc",
"cluster.abcxyz.cfg.use1.cache.amazonaws.com",
"cluster.abcxyz.cfg.use1.cache.amazonaws.com:123123",
"1.1..1:11211",
],
)
def test_init_invalid_node_endpoint(connection_sting, monkeypatch):
with patch.object(
AWSElastiCacheHashClient, "reconfigure_nodes", new=MagicMock()
) as mock:
with pytest.raises(ValueError):
AWSElastiCacheHashClient(connection_sting, socket_module=MockSocketModule())


@pytest.mark.parametrize(
"server_configuration",
[
(True, ["10.0.0.1:11211", "10.0.0.2:11211"]),
(
False,
[
"cluster.abcxyz.0001.use1.cache.amazonaws.com:11211",
"cluster.abcxyz.0002.use1.cache.amazonaws.com:11211",
],
),
],
)
@pytest.mark.unit()
def test_get_cluster_config_command(server_configuration, monkeypatch):
use_vpc, configuration_list = server_configuration

raw_command = MagicMock(
return_value=b"CONFIG cluster 0 139\r\n"
b"4\n"
b"cluster.abcxyz.0001.use1.cache.amazonaws.com|10.0.0.1|11211 "
b"cluster.abcxyz.0002.use1.cache.amazonaws.com|10.0.0.2|11211"
)

with monkeypatch.context() as ctx:
ctx.setattr(Client, "raw_command", raw_command)
ctx.setattr(AWSElastiCacheHashClient, "client_class", MockMemcacheClient)

client = AWSElastiCacheHashClient(
"cluster.abcxyz.cfg.use1.cache.amazonaws.com:11211",
socket_module=MockSocketModule(),
use_vpc=use_vpc,
)

for name, client in client.clients.items():
assert isinstance(client, MockMemcacheClient)
assert name in configuration_list

assert raw_command.called
1 change: 1 addition & 0 deletions pymemcache/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
allow_unicode_keys=False,
encoding="ascii",
tls_context=None,
**kwargs,
):
self._contents = {}

Expand Down

0 comments on commit 760088e

Please sign in to comment.