diff --git a/Adapter/RedisTagAwareAdapter.php b/Adapter/RedisTagAwareAdapter.php index eb416ab..6cd7e98 100644 --- a/Adapter/RedisTagAwareAdapter.php +++ b/Adapter/RedisTagAwareAdapter.php @@ -60,14 +60,23 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements PruneableI * @var string|null detected eviction policy used on Redis server */ private $redisEvictionPolicy; + /** + * @var string|null detected redis version of Redis server + */ + private $redisVersion; + /** + * @var bool|null Indicate whether this "namespace" has been pruned and what the result was. + */ + private $pruneResult; private $namespace; /** * @param \Redis|\RedisArray|\RedisCluster|\Predis\ClientInterface|RedisProxy|RedisClusterProxy $redis The redis client * @param string $namespace The default namespace * @param int $defaultLifetime The default lifetime + * @param bool $pruneWithCompression Enable compressed prune. Way more resource intensive. */ - public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null) + public function __construct($redis, string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null, bool $pruneWithCompression = true) { if ($redis instanceof \Predis\ClientInterface && $redis->getConnection() instanceof ClusterInterface && !$redis->getConnection() instanceof PredisCluster) { throw new InvalidArgumentException(sprintf('Unsupported Predis cluster connection: only "%s" is, "%s" given.', PredisCluster::class, get_debug_type($redis->getConnection()))); @@ -85,6 +94,7 @@ public function __construct($redis, string $namespace = '', int $defaultLifetime $this->init($redis, $namespace, $defaultLifetime, new TagAwareMarshaller($marshaller)); $this->namespace = $namespace; + $this->pruneWithCompression = $pruneWithCompression; } /** @@ -296,6 +306,36 @@ protected function doInvalidate(array $tagIds): bool return $success; } + /** + * @TODO Move to RedisTrait? It already has a version check - this would be handy. + * + * @return string + */ + private function getRedisVersion(): string + { + if (null !== $this->redisVersion) { + return $this->redisVersion; + } + + $hosts = $this->getHosts(); + $host = reset($hosts); + if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) { + // Predis supports info command only on the master in replication environments + $hosts = [$host->getClientFor('master')]; + } + + foreach ($hosts as $host) { + $info = $host->info('Server'); + + if ($info instanceof ErrorInterface) { + continue; + } + return $this->redisVersion = $info['redis_version']; + } + // Fallback to 2.0 like RedisTrait does. + return $this->redisVersion = '2.0'; + } + private function getRedisEvictionPolicy(): string { if (null !== $this->redisEvictionPolicy) { @@ -362,35 +402,62 @@ protected function getAllTagKeys(): array }); $setKeys = $results->valid() ? iterator_to_array($results) : []; - [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; - // merge the fetched ids together - $tagKeys = array_merge($tagKeys, $ids); + // $setKeys[$tagsPrefix] might be an RedisException object - + // check before just using it. + if (is_array($setKeys[$tagsPrefix])) { + [$cursor, $ids] = $setKeys[$tagsPrefix] ?? [null, null]; + // merge the fetched ids together + $tagKeys = array_merge($tagKeys, $ids); + } elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) { + $this->logger->error($setKeys[$tagsPrefix]->getMessage()); + } } while ($cursor = (int) $cursor); return $tagKeys; } + /** - * Checks all tags in the cache for orphaned items and creates a "report" array. - * - * By default, only completely orphaned tag keys are reported. If - * compressMode is enabled the report will include all tag keys - * that have any orphaned references to cache items + * Applies a callback to all tag keys. * * @TODO Verify the LUA scripts are redis-cluster safe. - * @TODO Is there anything that can be done to reduce memory footprint? - * - * @return array{tagKeys: string[], orphanedTagKeys: string[], orphanedTagReferenceKeys?: array} - * tagKeys: List of all tags in the cache. - * orphanedTagKeys: List of tags that only reference orphaned cache items. - * orphanedTagReferenceKeys: List of all orphaned cache item references per tag. - * Keyed by tag, value is the list of orphaned cache item keys. */ - private function getOrphanedTagsStats(bool $compressMode = false): array + protected function processAllTagKeys(\Closure $generator): \Generator { $prefix = $this->getPrefix(); - $tagKeys = $this->getAllTagKeys(); + // need to trim the \0 for lua script + $tagsPrefix = trim(self::TAGS_PREFIX); + + // get all SET entries which are tagged + $getTagsLua = <<<'EOLUA' + redis.replicate_commands() + local cursor = ARGV[1] + local prefix = ARGV[2] + local tagPrefix = string.gsub(KEYS[1], prefix, "") + return redis.call('SCAN', cursor, 'COUNT', 5000, 'MATCH', '*' .. tagPrefix .. '*', 'TYPE', 'set') + EOLUA; + $cursor = 0; + do { + $results = $this->pipeline(function () use ($getTagsLua, $cursor, $prefix, $tagsPrefix) { + yield 'eval' => [$getTagsLua, [$tagsPrefix, $cursor, $prefix], 1]; + }); + $setKeys = $results->valid() ? iterator_to_array($results) : []; + // $setKeys[$tagsPrefix] might be an RedisException object - + // check before just using it. + if (is_array($setKeys[$tagsPrefix])) { + [$cursor, $tagKeys] = $setKeys[$tagsPrefix]; + // merge the fetched ids together + foreach ($tagKeys as $tagKey) { + yield $tagKey => $generator($tagKey); + } + } elseif (isset($setKeys[$tagsPrefix]) && $setKeys[$tagsPrefix] instanceof \Throwable) { + $this->logger->error($setKeys[$tagsPrefix]->getMessage()); + } + } while ($cursor = (int) $cursor); + } + private function processSetMembers(\Closure $generator, $key): \Generator + { // lua for fetching all entries/content from a SET $getSetContentLua = <<<'EOLUA' redis.replicate_commands() @@ -398,110 +465,275 @@ private function getOrphanedTagsStats(bool $compressMode = false): array return redis.call('SSCAN', KEYS[1], cursor, 'COUNT', 5000) EOLUA; - $orphanedTagReferenceKeys = []; - $orphanedTagKeys = []; - // Iterate over each tag and check if its entries reference orphaned - // cache items. - foreach ($tagKeys as $tagKey) { - $tagKey = substr($tagKey, \strlen($prefix)); - $cursor = 0; - $hasExistingKeys = false; - do { - // Fetch all referenced cache keys from the tag entry. - $results = $this->pipeline(function () use ($getSetContentLua, $tagKey, $cursor) { - yield 'eval' => [$getSetContentLua, [$tagKey, $cursor], 1]; - }); - [$cursor, $referencedCacheKeys] = $results->valid() ? $results->current() : [null, null]; + $cursor = 0; + do { + // Fetch all referenced cache keys from the tag entry. + $results = $this->pipeline(function () use ($getSetContentLua, $key, $cursor) { + yield 'eval' => [$getSetContentLua, [$key, $cursor], 1]; + }); + [$cursor, $setMembers] = $results->valid() ? $results->current() : [null, null]; + yield $cursor => $generator($setMembers); + } while ($cursor = (int) $cursor); + } - if (!empty($referencedCacheKeys)) { - // Counts how many of the referenced cache items exist. - $existingCacheKeysResult = $this->pipeline(function () use ($referencedCacheKeys) { - yield 'exists' => $referencedCacheKeys; - }); - $existingCacheKeysCount = $existingCacheKeysResult->valid() ? $existingCacheKeysResult->current() : 0; - $hasExistingKeys = $hasExistingKeys || ($existingCacheKeysCount > 0 ?? false); - - // If compression mode is enabled and the count between - // referenced and existing cache keys differs collect the - // missing references. - if ($compressMode && \count($referencedCacheKeys) > $existingCacheKeysCount) { - // In order to create the delta each single reference - // has to be checked. - foreach ($referencedCacheKeys as $cacheKey) { - $existingCacheKeyResult = $this->pipeline(function () use ($cacheKey) { - yield 'exists' => [$cacheKey]; - }); - if ($existingCacheKeyResult->valid() && !$existingCacheKeyResult->current()) { - $orphanedTagReferenceKeys[$tagKey][] = $cacheKey; - } - } - } - // Stop processing cursors in case compression mode is - // disabled and the tag references existing keys. - if (!$compressMode && $hasExistingKeys) { - break; + /** + * Accepts a list of cache keys and returns a list with orphaned keys. + * + * The method attempts to optimize the testing of the keys by batching the + * key tests and hence reduce the amount of redis calls. + * + * @param array $cacheKeys + * @param int $chunks Number of chunks to create when processing cacheKeys. + * + * @return array + */ + private function getOrphanedCacheKeys(array $cacheKeys, int $chunks = 2) + { + $orphanedCacheKeys = []; + if (version_compare($this->getRedisVersion(), '3.0.3', '>=')) { + // If we can check multiple keys at once divide and conquer to have + // faster execution. + $cacheKeysChunks = array_chunk($cacheKeys, max(1, floor(count($cacheKeys) / $chunks)), true); + foreach ($cacheKeysChunks as $cacheKeysChunk) { + $result = $this->pipeline(function () use ($cacheKeysChunk) { + yield 'exists' => [$cacheKeysChunk]; + }); + if ($result->valid()) { + $existingKeys = $result->current(); + if ($existingKeys === 0) { + // None of the chunk exists - register all. + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $cacheKeysChunk); + } elseif ($existingKeys !== ($cacheKeysChunkCount = count($cacheKeysChunk))) { + // Some exists some don't - trigger another batch of chunks. + // The chunk size should be small enough that the there + // is a high possibility to hit an exists run with 0 + // existing items in it - thus breaking the recursion. + // The simplest approach would be to use the ratio + // between existing and missing keys - but this doesn't + // account for fragmentation. So instead the chunk size + // has to be smaller than the ratio in order to get + // block hits even if there's fragmentation. + // 10 keys, 3 orphans -> chunks min 3, with fragmentation probably more is better. + // For now the ratio between existing keys and total keys rounded up to next int plus 1 is used. + // This should account for some fragmentation. + // @TODO Someone got probabilistic stats on what's the + // best chunk size based on keys, orphanedKeys? + // @TODO At what chunk size is a single item comparison + // more efficient? + $newChunks = max(1, ceil($cacheKeysChunkCount / ($cacheKeysChunkCount - $existingKeys)) + 1); + $orphanedCacheKeys = array_merge($orphanedCacheKeys, $this->getOrphanedCacheKeys($cacheKeysChunk, $newChunks)); } } - } while ($cursor = (int) $cursor); - if (!$hasExistingKeys) { - $orphanedTagKeys[] = $tagKey; + } + } else { + // Without multi-key support in exists each single reference + // has to be checked individually to create the delta. + foreach ($cacheKeys as $cacheKey) { + $result = $this->pipeline(function () use ($cacheKey) { + yield 'exists' => [$cacheKey]; + }); + if ($result->valid() && !$result->current()) { + $orphanedCacheKeys[] = $cacheKey; + } } } + return $orphanedCacheKeys; + } - $stats = ['orphanedTagKeys' => $orphanedTagKeys, 'tagKeys' => $tagKeys]; - if ($compressMode) { - $stats['orphanedTagReferenceKeys'] = $orphanedTagReferenceKeys; - } + /** + * Checks all tags in the cache for orphaned items and creates a "report" array. + * + * @TODO Verify the LUA scripts are redis-cluster safe. + * + * @return array{tagKeys: string[], orphanedTagKeys: string[], orphanedTagReferenceKeyCount?: array} + * tagKeys: List of all tags in the cache. + * orphanedTagKeys: List of tags that only reference orphaned cache items. + * orphanedTagReferenceKeyCount: Number of orphaned cache item references per tag. + * Keyed by tag, value is the number of orphaned cache item keys. + */ + private function getOrphanedTagsStats(): array + { + // Iterates over all tags, analyzing the keys referencing the tag. + $tags = $this->processAllTagKeys(function ($tagKey) { + return $this->processSetMembers(function ($membersChunk) { + $keyReferencesCount = count($membersChunk); + // By default assume all references exist. + $existingKeysCount = count($membersChunk); + $result = $this->pipeline(function () use ($membersChunk) { + yield 'exists' => [$membersChunk]; + }); + if ($result->valid()) { + $existingKeysCount = $result->current(); + } + yield [ + 'keyReferencesCount' => $keyReferencesCount, + 'existingKeysCount' => $existingKeysCount, + ]; + }, $tagKey); + }); + $stats = [ + 'tagKeys' => [], + 'orphanedTagKeys' => [], + 'orphanedTagReferenceKeyCount' => [], + ]; + foreach ($tags as $tag => $referencedKeyChunks) { + $stats['tagKeys'][$tag] = $tag; + $orphanedReferencesCount = 0; + $tagHasExistingKeys = false; + foreach ($referencedKeyChunks as $referencedKeyChunk) { + foreach ($referencedKeyChunk as $chunkData) { + $tagHasExistingKeys = $tagHasExistingKeys || ($chunkData['existingKeysCount'] > 0); + $orphanedReferencesCount += $chunkData['keyReferencesCount'] - $chunkData['existingKeysCount']; + } + } + if (!$tagHasExistingKeys) { + $stats['orphanedTagKeys'][$tag] = $tag; + } else { + $stats['orphanedTagReferenceKeyCount'][$tag] = $orphanedReferencesCount; + } + } return $stats; } /** - * @TODO Verify the LUA scripts are redis-cluster safe. + * Iterates over all existing tag sets. Checks if the tag set has keys that + * exist. If none of the references exist anymore the tag is deleted. + * If a sub-set of references exists and compression is enabled the orphaned + * references are deleted once the orphan / existing references ratio is + * above the set threshold. + * + * The processing happens progressively to ensure a cleanup happens even in + * the event of an error somewhere in the later processing. + * + * @param bool $compression If enabled orphaned references in tag sets are + * removed too. Can be quite resource intensive. + * @param float $compressionRatioThreshold Compression is only executed if + * at least the given percentage of references of a tag are orphaned. + * @param int $pruneThrottlingMs Delay in milliseconds between major prune + * actions. Used to reduce the load on redis during pruning. + * */ - private function pruneOrphanedTags(bool $compressMode = false): bool - { + private function pruneOrphanedTags( + bool $compression = false, + float $compressionRatioThreshold = 0.3, + int $pruneThrottlingMs = 50 + ): bool { $success = true; - $orphanedTagsStats = $this->getOrphanedTagsStats($compressMode); + // Enable compression only if redis version is at least 3.0.3 otherwise + // the processing has to check each single key instead of batching. + $compression = $compression && version_compare($this->getRedisVersion(), '3.0.3', '>='); + + // Iterates over all tags, analyzing the keys referencing the tag. + $tags = $this->processAllTagKeys(function ($tagKey) use ($compression, $pruneThrottlingMs) { + return $this->processSetMembers(function ($membersChunk) use ($compression, $pruneThrottlingMs) { + $keyReferencesCount = count($membersChunk); + // By default assume all references exist. + $existingKeysCount = count($membersChunk); + $result = $this->pipeline(function () use ($membersChunk) { + yield 'exists' => [$membersChunk]; + }); + if ($result->valid()) { + $existingKeysCount = $result->current(); + } + $returnValue = [ + 'keyReferencesCount' => $keyReferencesCount, + 'existingKeysCount' => $existingKeysCount, + ]; + if ($compression) { + $returnValue['orphanedKeyReferences'] = $this->getOrphanedCacheKeys($membersChunk); + } + usleep($pruneThrottlingMs * 1000); + yield $returnValue; + }, $tagKey); + }); - // Delete all tags that don't reference any existing cache item. - foreach ($orphanedTagsStats['orphanedTagKeys'] as $orphanedTagKey) { - $result = $this->pipeline(function () use ($orphanedTagKey) { - yield 'del' => [$orphanedTagKey]; - }); - if (!$result->valid() || 1 !== $result->current()) { - $success = false; + $orphanedTags = []; + foreach ($tags as $tag => $referencedKeyChunks) { + usleep($pruneThrottlingMs * 1000); + $tagHasExistingKeys = false; + $tagOrphanedKeyReferences = []; + $tagKeyReferencesCount = 0; + foreach ($referencedKeyChunks as $referencedKeyChunk) { + foreach ($referencedKeyChunk as $chunkData) { + // This notation should ensure the existingKeysCount are only + // evaluated up until the point $tagHasExistingKeys is true + // after that there's no need to eval again. + $tagHasExistingKeys = $tagHasExistingKeys || (is_numeric($chunkData['existingKeysCount']) && $chunkData['existingKeysCount'] > 0); + $tagKeyReferencesCount += $chunkData['keyReferencesCount'] ?? 0; + // Collect the orphaned key references in the tag set. + // @TODO If the memory foot print is getting to big we might + // have to process the orphans right here or in the actual + // generator in order to avoid this collection of keys. + // However, collecting them allows for reduction in command + // execution. + if (!empty($chunkData['orphanedKeyReferences'])) { + $tagOrphanedKeyReferences = array_merge($tagOrphanedKeyReferences, $chunkData['orphanedKeyReferences']); + } + } } - } - // If orphaned cache key references are provided prune them too. - if (!empty($orphanedTagsStats['orphanedTagReferenceKeys'])) { - // lua for deleting member from a SET - $removeSetMemberLua = <<<'EOLUA' - redis.replicate_commands() - return redis.call('SREM', KEYS[1], KEYS[2]) - EOLUA; - // Loop through all tags with orphaned cache item references. - foreach ($orphanedTagsStats['orphanedTagReferenceKeys'] as $tagKey => $orphanedCacheKeys) { - // Remove each cache item reference from the tag set. - foreach ($orphanedCacheKeys as $orphanedCacheKey) { - $result = $this->pipeline(function () use ($tagKey, $orphanedCacheKey) { - yield 'srem' => [$tagKey, $orphanedCacheKey]; + if (!$tagHasExistingKeys) { + $orphanedTags[$tag] = $tag; + } + // Delete in batches of 250 to chip away even if there are errors. + if (count($orphanedTags) >= 250) { + try { + $result = $this->pipeline(function () use ($orphanedTags) { + yield 'del' => [$orphanedTags]; + }); + if (!$result->valid() || count($orphanedTags) !== $result->current()) { + $success = false; + } + } catch (\Throwable $e) { + $success = false; + } finally { + $orphanedTags = []; + } + } + // If compression mode is enabled and the count between + // referenced and existing cache keys differs by more than + // the compression ratio threshold run the compression. + // The ratio trigger ensures that a more sensible amount of + // items is processed. Currently, the set is processed as + // soon as at least a third of references are orphaned. + if ( + $tagHasExistingKeys + && $compression + && !empty($tagOrphanedKeyReferences) + && (count($tagOrphanedKeyReferences) / $tagKeyReferencesCount) >= $compressionRatioThreshold + ) { + try { + // Remove orphaned cache item references from the tag set. + // SREM supports multiple members as of redis 2.4.0 - the + // compression handling is enabled as of redis 3.0.3. + $result = $this->pipeline(function () use ($tag, $tagOrphanedKeyReferences) { + yield 'sRem' => array_merge([$tag], $tagOrphanedKeyReferences); }); - if (!$result->valid() || 1 !== $result->current()) { + if (!$result->valid() || count($tagOrphanedKeyReferences) !== $result->current()) { $success = false; } + } catch (\Throwable $e) { + $success = false; } } } - + if (!empty($orphanedTags)) { + $result = $this->pipeline(function () use ($orphanedTags) { + yield 'del' => [$orphanedTags]; + }); + if (!$result->valid() || 1 !== $result->current()) { + $success = false; + } + } return $success; } - /** - * @TODO Make compression mode flag configurable. - */ public function prune(): bool { - return $this->pruneOrphanedTags(true); + // Only prune once per prune run. + if (!isset($this->pruneResult)) { + $this->pruneResult = $this->pruneOrphanedTags($this->pruneWithCompression); + } + return $this->pruneResult; } }