diff --git a/setup.py b/setup.py index 79b367f1515295..ccce143ec9f82f 100755 --- a/setup.py +++ b/setup.py @@ -78,6 +78,7 @@ 'cqlsh', # /cassandra 'datadog', + 'msgpack-python<0.5.0', 'pytest-cov>=1.8.0,<1.9.0', 'pytest-timeout>=0.5.0,<0.6.0', 'pytest-xdist>=1.11.0,<1.12.0', diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index ea26bf47f5bd02..437ec3fc80eb15 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -30,6 +30,10 @@ if not pcall(redis.replicate_commands) then redis.log(redis.LOG_DEBUG, 'Could not enable script effects replication.') end +local function identity(value) + return value +end + local function range(start, stop) local result = {} for i = start, stop do @@ -113,14 +117,17 @@ local function parse_integer(value) end local function build_argument_parser(fields) - return function (arguments) + return function (arguments, offset) + if offset == nil then + offset = 0 + end local results = {} for i = 1, #fields do local name, parser = unpack(fields[i]) local value = arguments[i] local ok, result = pcall(parser, value) if not ok then - error(string.format('received invalid argument for %q in position %s with value %q; %s', name, i, value, result)) + error(string.format('received invalid argument for %q in position %s with value %q; %s', name, offset + i, value, result)) else results[name] = result end @@ -129,6 +136,31 @@ local function build_argument_parser(fields) end end +local function build_variadic_argument_parser(fields, validator) + if validator == nil then + validator = identity + end + local parser = build_argument_parser(fields) + return function (arguments, offset) + if offset == nil then + offset = 0 + end + if #arguments % #fields ~= 0 then + -- TODO: make this error less crummy + error('invalid number of arguments') + end + local results = {} + for i = 1, #arguments, #fields do + local value, _ = parser(table.slice(arguments, i, i + #fields - 1), i) + table.insert( + results, + validator(value) + ) + end + return results + end +end + -- Time Series @@ -149,6 +181,20 @@ local function get_index_expiration_time(interval, retention, index) end +-- Redis Helpers + +local function redis_hgetall_response_to_table(response, value_type) + if value_type == nil then + value_type = identity + end + local result = {} + for i = 1, #response, 2 do + result[response[i]] = value_type(response[i + 1]) + end + return result +end + + -- Generic Configuration local configuration_parser = build_argument_parser({ @@ -226,6 +272,13 @@ local function scale_to_total(values) return result end +local function collect_index_key_pairs(arguments, validator) + return build_variadic_argument_parser({ + {"index", identity}, + {"key", identity}, + }, validator)(arguments) +end + -- Command Parsing @@ -330,21 +383,23 @@ local commands = { table.imap( time_series, function (time) - return redis.call( - 'HGETALL', - get_bucket_frequency_key( - configuration.scope, - index, - time, - band, - key - ) + return redis_hgetall_response_to_table( + redis.call( + 'HGETALL', + get_bucket_frequency_key( + configuration.scope, + index, + time, + band, + key + ) + ), + tonumber ) end ), function (result, response) - for i = 1, #response, 2 do - local bucket, count = response[i], response[i + 1] + for bucket, count in pairs(response) do result[bucket] = (result[bucket] or 0) + count end return result @@ -466,28 +521,13 @@ local commands = { MERGE = takes_configuration( function (configuration, arguments) local destination_key = arguments[1] - local entries = table.ireduce( + local sources = collect_index_key_pairs( table.slice(arguments, 2), - function (state, token) - if state.active == nil then - state.active = { - index = token, - key = nil, - } - else - assert(token ~= destination_key, 'cannot merge destination into itself') - state.active.key = token - table.insert( - state.completed, - state.active - ) - state.active = nil - end - return state - end, - {active = nil, completed = {}} + function (entry) + assert(entry.key ~= destination_key, 'cannot merge destination into itself') + return entry + end ) - assert(entries.active == nil, 'unexpected end of input') local time_series = get_active_indices( configuration.interval, @@ -495,7 +535,7 @@ local commands = { configuration.timestamp ) - for _, source in ipairs(entries.completed) do + for _, source in ipairs(sources) do for band = 1, configuration.bands do for _, time in ipairs(time_series) do local source_bucket_frequency_key = get_bucket_frequency_key( @@ -518,13 +558,15 @@ local commands = { time ) - local response = redis.call( - 'HGETALL', - source_bucket_frequency_key + local response = redis_hgetall_response_to_table( + redis.call( + 'HGETALL', + source_bucket_frequency_key + ), + tonumber ) - for i = 1, #response, 2 do - local bucket, count = response[i], response[i + 1] + for bucket, count in pairs(response) do -- Remove the source from the bucket membership -- set, and add the destination to the membership -- set. @@ -548,6 +590,7 @@ local commands = { ) end + -- TODO: We only need to do this if the bucket has contents. -- The destination bucket frequency key may have not -- existed previously, so we need to make sure we set -- the expiration on it in case it is new. @@ -566,35 +609,14 @@ local commands = { ), DELETE = takes_configuration( function (configuration, arguments) - local entries = table.ireduce( - arguments, - function (state, token) - if state.active == nil then - state.active = { - index = token, - key = nil, - } - else - state.active.key = token - table.insert( - state.completed, - state.active - ) - state.active = nil - end - return state - end, - {active = nil, completed = {}} - ) - assert(entries.active == nil, 'unexpected end of input') - + local sources = collect_index_key_pairs(arguments) local time_series = get_active_indices( configuration.interval, configuration.retention, configuration.timestamp ) - for _, source in ipairs(entries.completed) do + for _, source in ipairs(sources) do for band = 1, configuration.bands do for _, time in ipairs(time_series) do local source_bucket_frequency_key = get_bucket_frequency_key( @@ -605,13 +627,12 @@ local commands = { source.key ) - local response = redis.call( - 'HGETALL', + local buckets = redis.call( + 'HGETKEYS', source_bucket_frequency_key ) - for i = 1, #response, 2 do - local bucket = response[i] + for _, bucket in ipairs(buckets) do redis.call( 'SREM', get_bucket_membership_key( @@ -631,6 +652,134 @@ local commands = { end end end + ), + IMPORT = takes_configuration( + --[[ + Loads data returned by the ``EXPORT`` command into the location + specified by the ``index`` and ``key`` arguments. Data can be loaded + into multiple indices or keys by providing additional arguments. + + If the destination specified by ``index`` and ``key`` does not exist + (relocating data to a new key, for example), it will be created. If + data already exists at the new destination, the imported data will be + appended to the existing data. + ]]-- + function (configuration, arguments) + local entries = build_variadic_argument_parser({ + {'index', identity}, + {'key', identity}, + {'data', cmsgpack.unpack}, + })(arguments) + + for _, entry in ipairs(entries) do + for band, data in ipairs(entry.data) do + for _, item in ipairs(data) do + local time, buckets = item[1], item[2] + local expiration_time = get_index_expiration_time( + configuration.interval, + configuration.retention, + time + ) + local destination_bucket_frequency_key = get_bucket_frequency_key( + configuration.scope, + entry.index, + time, + band, + entry.key + ) + + for bucket, count in pairs(buckets) do + local bucket_membership_key = get_bucket_membership_key( + configuration.scope, + entry.index, + time, + band, + bucket + ) + redis.call('SADD', bucket_membership_key, entry.key) + redis.call('EXPIREAT', bucket_membership_key, expiration_time) + + redis.call( + 'HINCRBY', + destination_bucket_frequency_key, + bucket, + count + ) + end + + -- The destination bucket frequency key may have not + -- existed previously, so we need to make sure we set + -- the expiration on it in case it is new. (We only + -- have to do this if there we changed any bucket counts.) + if next(buckets) ~= nil then + redis.call( + 'EXPIREAT', + destination_bucket_frequency_key, + expiration_time + ) + end + end + end + end + end + ), + EXPORT = takes_configuration( + --[[ + Exports data that is located at the provided ``index`` and ``key`` pairs. + + Generally, this data should be treated as opaque method for extracting + data to be provided to the ``IMPORT`` command. Exported data is + returned in the same order as the arguments are provided. Each item is + a messagepacked blob that is at the top level list, where each member + represents the data contained within one band. Each item in the band + list is another list, where each member represents one time series + interval. Each item in the time series list is a tuple containing the + time series index and a mapping containing the counts for each bucket + within the interval. (Due to the Lua data model, an empty mapping will + be represented as an empty list. The consumer of this data must convert + it back to the correct type.) + ]]-- + function (configuration, arguments) + local bands = range(1, configuration.bands) + local time_series = get_active_indices( + configuration.interval, + configuration.retention, + configuration.timestamp + ) + return table.imap( + collect_index_key_pairs(arguments), + function (source) + return cmsgpack.pack( + table.imap( + bands, + function (band) + return table.imap( + time_series, + function (time) + return { + time, + redis_hgetall_response_to_table( + redis.call( + 'HGETALL', + get_bucket_frequency_key( + configuration.scope, + source.index, + time, + band, + source.key + ) + ), + tonumber + ), + } + end + ) + end + ) + ) + end + ) + end ) } diff --git a/src/sentry/similarity.py b/src/sentry/similarity.py index f59889b5174cb5..a0309cca62a371 100644 --- a/src/sentry/similarity.py +++ b/src/sentry/similarity.py @@ -143,6 +143,50 @@ def delete(self, scope, items, timestamp=None): arguments, ) + def export(self, scope, items, timestamp=None): + if timestamp is None: + timestamp = int(time.time()) + + arguments = [ + 'EXPORT', + timestamp, + len(self.bands), + self.interval, + self.retention, + scope, + ] + + for idx, key in items: + arguments.extend([idx, key]) + + return index( + self.cluster.get_local_client_for_key(scope), + [], + arguments, + ) + + def import_(self, scope, items, timestamp=None): + if timestamp is None: + timestamp = int(time.time()) + + arguments = [ + 'IMPORT', + timestamp, + len(self.bands), + self.interval, + self.retention, + scope, + ] + + for idx, key, data in items: + arguments.extend([idx, key, data]) + + return index( + self.cluster.get_local_client_for_key(scope), + [], + arguments, + ) + FRAME_ITEM_SEPARATOR = b'\x00' FRAME_PAIR_SEPARATOR = b'\x01' diff --git a/tests/sentry/test_similarity.py b/tests/sentry/test_similarity.py index cc96df5fd9ad91..14f8e93850c1ca 100644 --- a/tests/sentry/test_similarity.py +++ b/tests/sentry/test_similarity.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +import time + +import msgpack import pytest from sentry.similarity import ( @@ -129,3 +132,54 @@ def test_index(self): assert results[2][0] in ('3', '4') # equidistant pairs, order doesn't really matter assert results[3][0] in ('3', '4') assert results[4][0] == '5' + + def test_export_import(self): + bands = 2 + retention = 12 + index = MinHashIndex( + redis.clusters.get('default'), + 0xFFFF, + bands, + 2, + 60 * 60, + retention, + ) + + index.record('example', '1', [('index', 'hello world')]) + + timestamp = int(time.time()) + result = index.export('example', [('index', 1)], timestamp=timestamp) + assert len(result) == 1 + + data = msgpack.unpackb(result[0]) + assert len(data) == bands + + for band in data: + assert len(band) == (retention + 1) + assert sum(sum(dict(bucket_frequencies).values()) for index, bucket_frequencies in band) == 1 + + # Copy the data from key 1 to key 2. + index.import_('example', [('index', 2, result[0])], timestamp=timestamp) + + assert index.export( + 'example', + [('index', 1)], + timestamp=timestamp + ) == index.export( + 'example', + [('index', 2)], + timestamp=timestamp + ) + + # Copy the data again to key 2 (duplicating all of the data.) + index.import_('example', [('index', 2, result[0])], timestamp=timestamp) + + result = index.export('example', [('index', 2)], timestamp=timestamp) + assert len(result) == 1 + + data = msgpack.unpackb(result[0]) + assert len(data) == bands + + for band in data: + assert len(band) == (retention + 1) + assert sum(sum(dict(bucket_frequencies).values()) for index, bucket_frequencies in band) == 2