From b956ec93a08fa7310d03b46f0e04d2adabd9832d Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 3 Mar 2017 13:27:55 -0800 Subject: [PATCH 01/14] Extract common logic for {index, key} parsing. --- src/sentry/scripts/similarity/index.lua | 82 ++++++++++++------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index ea26bf47f5bd02..4158db15caa832 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -226,6 +226,36 @@ local function scale_to_total(values) return result end +local function collect_index_key_pairs(arguments, validator) + if validator == nil then + validator = function (entry) end + end + + local entries = table.ireduce( + arguments, + function (state, token) + if state.active == nil then + state.active = { + index = token, + key = nil, + } + else + state.active.key = token + validator(state.active) + table.insert( + state.completed, + state.active + ) + state.active = nil + end + return state + end, + {active = nil, completed = {}} + ) + assert(entries.active == nil, 'unexpected end of input') + return entries.completed +end + -- Command Parsing @@ -466,28 +496,12 @@ local commands = { MERGE = takes_configuration( function (configuration, arguments) local destination_key = arguments[1] - local entries = table.ireduce( + local sources = collect_index_key_pairs( table.slice(arguments, 2), - function (state, token) - if state.active == nil then - state.active = { - index = token, - key = nil, - } - else - assert(token ~= destination_key, 'cannot merge destination into itself') - state.active.key = token - table.insert( - state.completed, - state.active - ) - state.active = nil - end - return state - end, - {active = nil, completed = {}} + function (entry) + assert(entry.key ~= destination_key, 'cannot merge destination into itself') + end ) - assert(entries.active == nil, 'unexpected end of input') local time_series = get_active_indices( configuration.interval, @@ -495,7 +509,7 @@ local commands = { configuration.timestamp ) - for _, source in ipairs(entries.completed) do + for _, source in ipairs(sources) do for band = 1, configuration.bands do for _, time in ipairs(time_series) do local source_bucket_frequency_key = get_bucket_frequency_key( @@ -566,27 +580,7 @@ local commands = { ), DELETE = takes_configuration( function (configuration, arguments) - local entries = table.ireduce( - arguments, - function (state, token) - if state.active == nil then - state.active = { - index = token, - key = nil, - } - else - state.active.key = token - table.insert( - state.completed, - state.active - ) - state.active = nil - end - return state - end, - {active = nil, completed = {}} - ) - assert(entries.active == nil, 'unexpected end of input') + local sources = collect_index_key_pairs(arguments) local time_series = get_active_indices( configuration.interval, @@ -594,7 +588,7 @@ local commands = { configuration.timestamp ) - for _, source in ipairs(entries.completed) do + for _, source in ipairs(sources) do for band = 1, configuration.bands do for _, time in ipairs(time_series) do local source_bucket_frequency_key = get_bucket_frequency_key( @@ -631,7 +625,7 @@ local commands = { end end end - ) + ), } From 842126ade44fdbac9304820fc0540aa7209ed2de Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 3 Mar 2017 13:33:32 -0800 Subject: [PATCH 02/14] Add command stubs. --- src/sentry/scripts/similarity/index.lua | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index 4158db15caa832..c3cab9629d0ab5 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -626,6 +626,16 @@ local commands = { end end ), + IMPORT = takes_configuration( + function (configuration, arguments) + error("not implemented") + end + ), + EXPORT = takes_configuration( + function (configuration, arguments) + error("not implemented") + end + ) } From cfbfc3a4dc17bd068ca1df92bc741e64fe0d797f Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 3 Mar 2017 14:25:27 -0800 Subject: [PATCH 03/14] Add EXPORT command. --- src/sentry/scripts/similarity/index.lua | 37 ++++++++++++++++++++++++- src/sentry/similarity.py | 22 +++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index c3cab9629d0ab5..36e28d8f286d02 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -633,7 +633,42 @@ local commands = { ), EXPORT = takes_configuration( function (configuration, arguments) - error("not implemented") + local bands = range(1, configuration.bands) + local time_series = get_active_indices( + configuration.interval, + configuration.retention, + configuration.timestamp + ) + return table.imap( + collect_index_key_pairs(arguments), + function (source) + return cmsgpack.pack( + table.imap( + bands, + function (band) + return table.imap( + time_series, + function (time) + return { + time, + redis.call( + 'HGETALL', + get_bucket_frequency_key( + configuration.scope, + source.index, + time, + band, + source.key + ) + ) + } + end + ) + end + ) + ) + end + ) end ) } diff --git a/src/sentry/similarity.py b/src/sentry/similarity.py index f59889b5174cb5..4f1d3be5e93ca3 100644 --- a/src/sentry/similarity.py +++ b/src/sentry/similarity.py @@ -143,6 +143,28 @@ def delete(self, scope, items, timestamp=None): arguments, ) + def export(self, scope, items, timestamp=None): + if timestamp is None: + timestamp = int(time.time()) + + arguments = [ + 'EXPORT', + timestamp, + len(self.bands), + self.interval, + self.retention, + scope, + ] + + for idx, key in items: + arguments.extend([idx, key]) + + return index( + self.cluster.get_local_client_for_key(scope), + [], + arguments, + ) + FRAME_ITEM_SEPARATOR = b'\x00' FRAME_PAIR_SEPARATOR = b'\x01' From 2fdf842a81ce40ecd637c06a7ad934b4b679d0ef Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 3 Mar 2017 15:01:00 -0800 Subject: [PATCH 04/14] Add IMPORT command. --- src/sentry/scripts/similarity/index.lua | 106 +++++++++++++++++++++--- src/sentry/similarity.py | 22 +++++ 2 files changed, 117 insertions(+), 11 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index 36e28d8f286d02..38e70a6fdd215b 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -628,7 +628,82 @@ local commands = { ), IMPORT = takes_configuration( function (configuration, arguments) - error("not implemented") + local entries = table.ireduce( + arguments, + function (state, token) + if state.active == nil then + -- When there is no active entry, we need to initialize + -- a new one. The first token is the index identifier. + state.active = { + index = token, + key = nil, + data = nil, + } + elseif state.active.key == nil then + state.active.key = token + else + state.active.data = cmsgpack.unpack(token) + table.insert(state.completed, state.active) + state.active = nil + end + return state + end, + {active = nil, completed = {}} + ) + + -- If there are any entries in progress when we are completed, that + -- means the input was in an incorrect format and we should error + -- before we record any bad data. + assert(entries.active == nil, 'unexpected end of input') + + for _, entry in ipairs(entries.completed) do + for band, data in ipairs(entry.data) do + for _, item in ipairs(data) do + local time, buckets = item[1], item[2] + local expiration_time = get_index_expiration_time( + configuration.interval, + configuration.retention, + time + ) + local destination_bucket_frequency_key = get_bucket_frequency_key( + configuration.scope, + entry.index, + time, + band, + entry.key + ) + + for _, bucket in ipairs(buckets) do + local bucket_key, count = bucket[1], bucket[2] + local bucket_membership_key = get_bucket_membership_key( + configuration.scope, + entry.index, + time, + band, + bucket_key + ) + redis.call('SADD', bucket_membership_key, entry.key) + redis.call('EXPIREAT', bucket_membership_key, expiration_time) + + redis.call( + 'HINCRBY', + destination_bucket_frequency_key, + bucket_key, + count + ) + end + + -- The destination bucket frequency key may have not + -- existed previously, so we need to make sure we set + -- the expiration on it in case it is new. + redis.call( + 'EXPIREAT', + destination_bucket_frequency_key, + expiration_time + ) + end + end + end end ), EXPORT = takes_configuration( @@ -649,18 +724,27 @@ local commands = { return table.imap( time_series, function (time) + -- TODO(tkaemming): i hate this so much, this is dumb af + local response = redis.call( + 'HGETALL', + get_bucket_frequency_key( + configuration.scope, + source.index, + time, + band, + source.key + ) + ) + local result = {} + for i = 1, #response, 2 do + table.insert( + result, + {response[i], response[i+1]} + ) + end return { time, - redis.call( - 'HGETALL', - get_bucket_frequency_key( - configuration.scope, - source.index, - time, - band, - source.key - ) - ) + result, } end ) diff --git a/src/sentry/similarity.py b/src/sentry/similarity.py index 4f1d3be5e93ca3..a0309cca62a371 100644 --- a/src/sentry/similarity.py +++ b/src/sentry/similarity.py @@ -165,6 +165,28 @@ def export(self, scope, items, timestamp=None): arguments, ) + def import_(self, scope, items, timestamp=None): + if timestamp is None: + timestamp = int(time.time()) + + arguments = [ + 'IMPORT', + timestamp, + len(self.bands), + self.interval, + self.retention, + scope, + ] + + for idx, key, data in items: + arguments.extend([idx, key, data]) + + return index( + self.cluster.get_local_client_for_key(scope), + [], + arguments, + ) + FRAME_ITEM_SEPARATOR = b'\x00' FRAME_PAIR_SEPARATOR = b'\x01' From bdc3ec998176f0ac1a8710209c3a362d1127383f Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 3 Mar 2017 16:36:23 -0800 Subject: [PATCH 05/14] Add some weak tests --- setup.py | 1 + tests/sentry/test_similarity.py | 52 +++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/setup.py b/setup.py index 79b367f1515295..5055871c96def1 100755 --- a/setup.py +++ b/setup.py @@ -78,6 +78,7 @@ 'cqlsh', # /cassandra 'datadog', + 'msgpack', 'pytest-cov>=1.8.0,<1.9.0', 'pytest-timeout>=0.5.0,<0.6.0', 'pytest-xdist>=1.11.0,<1.12.0', diff --git a/tests/sentry/test_similarity.py b/tests/sentry/test_similarity.py index cc96df5fd9ad91..782d78f27639da 100644 --- a/tests/sentry/test_similarity.py +++ b/tests/sentry/test_similarity.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +import time + +import msgpack import pytest from sentry.similarity import ( @@ -129,3 +132,52 @@ def test_index(self): assert results[2][0] in ('3', '4') # equidistant pairs, order doesn't really matter assert results[3][0] in ('3', '4') assert results[4][0] == '5' + + def test_export_import(self): + bands = 2 + retention = 12 + index = MinHashIndex( + redis.clusters.get('default'), + 0xFFFF, + bands, + 2, + 60 * 60, + retention, + ) + + index.record('example', '1', [('index', 'hello world')]) + + timestamp = int(time.time()) + result = index.export('example', [('index', 1)], timestamp=timestamp) + assert len(result) == 1 + + data = msgpack.unpackb(result[0]) + assert len(data) == bands + + for band in data: + assert len(band) == (retention + 1) + assert sum(sum(int(count) for bucket, count in chunk) for _, chunk in band) == 1 + + index.import_('example', [('index', 2, result[0])], timestamp=timestamp) + + assert index.export( + 'example', + [('index', 1)], + timestamp=timestamp + ) == index.export( + 'example', + [('index', 2)], + timestamp=timestamp + ) + + index.import_('example', [('index', 2, result[0])], timestamp=timestamp) + + result = index.export('example', [('index', 2)], timestamp=timestamp) + assert len(result) == 1 + + data = msgpack.unpackb(result[0]) + assert len(data) == bands + + for band in data: + assert len(band) == (retention + 1) + assert sum(sum(int(count) for bucket, count in chunk) for _, chunk in band) == 2 From f41895ee82dc2cf285b7b0f79fcab4f455b9948e Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Tue, 7 Mar 2017 17:51:07 -0800 Subject: [PATCH 06/14] This test was always wrong --- src/sentry/scripts/similarity/index.lua | 108 ++++++++++++++---------- tests/sentry/test_similarity.py | 4 +- 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index 38e70a6fdd215b..cda8bef453853b 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -30,6 +30,10 @@ if not pcall(redis.replicate_commands) then redis.log(redis.LOG_DEBUG, 'Could not enable script effects replication.') end +local function identity(value) + return value +end + local function range(start, stop) local result = {} for i = start, stop do @@ -149,6 +153,20 @@ local function get_index_expiration_time(interval, retention, index) end +-- Redis Helpers + +local function redis_hgetall_response_to_table(response, value_type) + if value_type == nil then + value_type = identity + end + local result = {} + for i = 1, #response, 2 do + result[response[i]] = value_type(response[i + 1]) + end + return result +end + + -- Generic Configuration local configuration_parser = build_argument_parser({ @@ -228,7 +246,7 @@ end local function collect_index_key_pairs(arguments, validator) if validator == nil then - validator = function (entry) end + validator = identity end local entries = table.ireduce( @@ -360,21 +378,23 @@ local commands = { table.imap( time_series, function (time) - return redis.call( - 'HGETALL', - get_bucket_frequency_key( - configuration.scope, - index, - time, - band, - key - ) + return redis_hgetall_response_to_table( + redis.call( + 'HGETALL', + get_bucket_frequency_key( + configuration.scope, + index, + time, + band, + key + ) + ), + tonumber ) end ), function (result, response) - for i = 1, #response, 2 do - local bucket, count = response[i], response[i + 1] + for bucket, count in pairs(response) do result[bucket] = (result[bucket] or 0) + count end return result @@ -532,13 +552,15 @@ local commands = { time ) - local response = redis.call( - 'HGETALL', - source_bucket_frequency_key + local response = redis_hgetall_response_to_table( + redis.call( + 'HGETALL', + source_bucket_frequency_key + ), + tonumber ) - for i = 1, #response, 2 do - local bucket, count = response[i], response[i + 1] + for bucket, count in pairs(response) do -- Remove the source from the bucket membership -- set, and add the destination to the membership -- set. @@ -562,6 +584,7 @@ local commands = { ) end + -- TODO: We only need to do this if the bucket has contents. -- The destination bucket frequency key may have not -- existed previously, so we need to make sure we set -- the expiration on it in case it is new. @@ -599,13 +622,12 @@ local commands = { source.key ) - local response = redis.call( - 'HGETALL', + local buckets = redis.call( + 'HGETKEYS', source_bucket_frequency_key ) - for i = 1, #response, 2 do - local bucket = response[i] + for _, bucket in ipairs(buckets) do redis.call( 'SREM', get_bucket_membership_key( @@ -640,9 +662,14 @@ local commands = { data = nil, } elseif state.active.key == nil then + -- The second token is the key. state.active.key = token else + -- The third and final item is the message packed data + -- from ``EXPORT``. state.active.data = cmsgpack.unpack(token) + -- When the item is marked complete, we can add it to + -- the completed set, and reset the active state. table.insert(state.completed, state.active) state.active = nil end @@ -673,14 +700,13 @@ local commands = { entry.key ) - for _, bucket in ipairs(buckets) do - local bucket_key, count = bucket[1], bucket[2] + for bucket, count in pairs(buckets) do local bucket_membership_key = get_bucket_membership_key( configuration.scope, entry.index, time, band, - bucket_key + bucket ) redis.call('SADD', bucket_membership_key, entry.key) redis.call('EXPIREAT', bucket_membership_key, expiration_time) @@ -688,7 +714,7 @@ local commands = { redis.call( 'HINCRBY', destination_bucket_frequency_key, - bucket_key, + bucket, count ) end @@ -724,27 +750,21 @@ local commands = { return table.imap( time_series, function (time) - -- TODO(tkaemming): i hate this so much, this is dumb af - local response = redis.call( - 'HGETALL', - get_bucket_frequency_key( - configuration.scope, - source.index, - time, - band, - source.key - ) - ) - local result = {} - for i = 1, #response, 2 do - table.insert( - result, - {response[i], response[i+1]} - ) - end return { time, - result, + redis_hgetall_response_to_table( + redis.call( + 'HGETALL', + get_bucket_frequency_key( + configuration.scope, + source.index, + time, + band, + source.key + ) + ), + tonumber + ), } end ) diff --git a/tests/sentry/test_similarity.py b/tests/sentry/test_similarity.py index 782d78f27639da..248dfba2ba77c3 100644 --- a/tests/sentry/test_similarity.py +++ b/tests/sentry/test_similarity.py @@ -156,7 +156,7 @@ def test_export_import(self): for band in data: assert len(band) == (retention + 1) - assert sum(sum(int(count) for bucket, count in chunk) for _, chunk in band) == 1 + assert sum(sum(dict(bucket_frequencies).values()) for index, bucket_frequencies in band) == 1 index.import_('example', [('index', 2, result[0])], timestamp=timestamp) @@ -180,4 +180,4 @@ def test_export_import(self): for band in data: assert len(band) == (retention + 1) - assert sum(sum(int(count) for bucket, count in chunk) for _, chunk in band) == 2 + assert sum(sum(dict(bucket_frequencies).values()) for index, bucket_frequencies in band) == 2 From e5791c32ca3ade64413eab11d42c4f1c7ea80095 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Tue, 7 Mar 2017 18:11:34 -0800 Subject: [PATCH 07/14] TODO --- src/sentry/scripts/similarity/index.lua | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index cda8bef453853b..375b94d95def6e 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -722,6 +722,7 @@ local commands = { -- The destination bucket frequency key may have not -- existed previously, so we need to make sure we set -- the expiration on it in case it is new. + -- TODO(tkaemming): Only need to call this if we mutated anything redis.call( 'EXPIREAT', destination_bucket_frequency_key, From cd87116b42b472d1184452c495fd59afe55afb66 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Wed, 8 Mar 2017 11:23:11 -0800 Subject: [PATCH 08/14] Oops --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5055871c96def1..98e8a68ce5a67c 100755 --- a/setup.py +++ b/setup.py @@ -78,7 +78,7 @@ 'cqlsh', # /cassandra 'datadog', - 'msgpack', + 'msgpack-python', 'pytest-cov>=1.8.0,<1.9.0', 'pytest-timeout>=0.5.0,<0.6.0', 'pytest-xdist>=1.11.0,<1.12.0', From 48c3a89d67caa62201759473a701a0042096b39f Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Wed, 8 Mar 2017 14:02:08 -0800 Subject: [PATCH 09/14] Explain tests a lil bit --- tests/sentry/test_similarity.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/sentry/test_similarity.py b/tests/sentry/test_similarity.py index 248dfba2ba77c3..14f8e93850c1ca 100644 --- a/tests/sentry/test_similarity.py +++ b/tests/sentry/test_similarity.py @@ -158,6 +158,7 @@ def test_export_import(self): assert len(band) == (retention + 1) assert sum(sum(dict(bucket_frequencies).values()) for index, bucket_frequencies in band) == 1 + # Copy the data from key 1 to key 2. index.import_('example', [('index', 2, result[0])], timestamp=timestamp) assert index.export( @@ -170,6 +171,7 @@ def test_export_import(self): timestamp=timestamp ) + # Copy the data again to key 2 (duplicating all of the data.) index.import_('example', [('index', 2, result[0])], timestamp=timestamp) result = index.export('example', [('index', 2)], timestamp=timestamp) From 248868f6f539a4d0dfc3589a02bc5a6608f926a9 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Wed, 8 Mar 2017 15:10:57 -0800 Subject: [PATCH 10/14] Behold `build_variadic_argument_parser` in all of it's majesty --- src/sentry/scripts/similarity/index.lua | 108 ++++++++++-------------- 1 file changed, 44 insertions(+), 64 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index 375b94d95def6e..cea4a37463dd3f 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -117,14 +117,17 @@ local function parse_integer(value) end local function build_argument_parser(fields) - return function (arguments) + return function (arguments, offset) + if offset == nil then + offset = 0 + end local results = {} for i = 1, #fields do local name, parser = unpack(fields[i]) local value = arguments[i] local ok, result = pcall(parser, value) if not ok then - error(string.format('received invalid argument for %q in position %s with value %q; %s', name, i, value, result)) + error(string.format('received invalid argument for %q in position %s with value %q; %s', name, offset + i, value, result)) else results[name] = result end @@ -133,6 +136,31 @@ local function build_argument_parser(fields) end end +local function build_variadic_argument_parser(fields, validator) + if validator == nil then + validator = identity + end + local parser = build_argument_parser(fields) + return function (arguments, offset) + if offset == nil then + offset = 0 + end + if #arguments % #fields ~= 0 then + -- TODO: make this error less crummy + error('invalid number of arguments') + end + local results = {} + for i = 1, #arguments, #fields do + local value, _ = parser(table.slice(arguments, i, i + #fields - 1), i) + table.insert( + results, + validator(value) + ) + end + return results + end +end + -- Time Series @@ -245,33 +273,10 @@ local function scale_to_total(values) end local function collect_index_key_pairs(arguments, validator) - if validator == nil then - validator = identity - end - - local entries = table.ireduce( - arguments, - function (state, token) - if state.active == nil then - state.active = { - index = token, - key = nil, - } - else - state.active.key = token - validator(state.active) - table.insert( - state.completed, - state.active - ) - state.active = nil - end - return state - end, - {active = nil, completed = {}} - ) - assert(entries.active == nil, 'unexpected end of input') - return entries.completed + return build_variadic_argument_parser({ + {"index", identity}, + {"key", identity}, + }, validator)(arguments) end @@ -520,6 +525,7 @@ local commands = { table.slice(arguments, 2), function (entry) assert(entry.key ~= destination_key, 'cannot merge destination into itself') + return entry end ) @@ -604,7 +610,6 @@ local commands = { DELETE = takes_configuration( function (configuration, arguments) local sources = collect_index_key_pairs(arguments) - local time_series = get_active_indices( configuration.interval, configuration.retention, @@ -650,40 +655,15 @@ local commands = { ), IMPORT = takes_configuration( function (configuration, arguments) - local entries = table.ireduce( - arguments, - function (state, token) - if state.active == nil then - -- When there is no active entry, we need to initialize - -- a new one. The first token is the index identifier. - state.active = { - index = token, - key = nil, - data = nil, - } - elseif state.active.key == nil then - -- The second token is the key. - state.active.key = token - else - -- The third and final item is the message packed data - -- from ``EXPORT``. - state.active.data = cmsgpack.unpack(token) - -- When the item is marked complete, we can add it to - -- the completed set, and reset the active state. - table.insert(state.completed, state.active) - state.active = nil - end - return state - end, - {active = nil, completed = {}} - ) - - -- If there are any entries in progress when we are completed, that - -- means the input was in an incorrect format and we should error - -- before we record any bad data. - assert(entries.active == nil, 'unexpected end of input') - - for _, entry in ipairs(entries.completed) do + local entries = build_variadic_argument_parser({ + {'index', identity}, + {'key', identity}, + {'data', function (value) + return cmsgpack.unpack(value) + end} + })(arguments) + + for _, entry in ipairs(entries) do for band, data in ipairs(entry.data) do for _, item in ipairs(data) do local time, buckets = item[1], item[2] From 4fd004d06fc882a5b8a31a4c5627a5cc26e15138 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Fri, 10 Mar 2017 11:33:57 -0800 Subject: [PATCH 11/14] Fix TODO, add comments explaining import/export formats --- src/sentry/scripts/similarity/index.lua | 40 +++++++++++++++++++++---- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index cea4a37463dd3f..bdb96ba8030d3f 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -654,6 +654,16 @@ local commands = { end ), IMPORT = takes_configuration( + --[[ + Loads data returned by the ``EXPORT`` command into the location + specified by the ``index`` and ``key`` arguments. Data can be loaded + into multiple indices or keys by providing additional arguments. + + If the destination specified by ``index`` and ``key`` does not exist + (relocating data to a new key, for example), it will be created. If + data already exists at the new destination, the imported data will be + appended to the existing data. + ]]-- function (configuration, arguments) local entries = build_variadic_argument_parser({ {'index', identity}, @@ -680,6 +690,7 @@ local commands = { entry.key ) + local touched = false for bucket, count in pairs(buckets) do local bucket_membership_key = get_bucket_membership_key( configuration.scope, @@ -697,23 +708,40 @@ local commands = { bucket, count ) + touched = true end -- The destination bucket frequency key may have not -- existed previously, so we need to make sure we set -- the expiration on it in case it is new. - -- TODO(tkaemming): Only need to call this if we mutated anything - redis.call( - 'EXPIREAT', - destination_bucket_frequency_key, - expiration_time - ) + if touched then + redis.call( + 'EXPIREAT', + destination_bucket_frequency_key, + expiration_time + ) + end end end end end ), EXPORT = takes_configuration( + --[[ + Exports data that is located at the provided ``index`` and ``key`` pairs. + + Generally, this data should be treated as opaque method for extracting + data to be provided to the ``IMPORT`` command. Exported data is + returned in the same order as the arguments are provided. Each item is + a messagepacked blob that is at the top level list, where each member + represents the data contained within one band. Each item in the band + list is another list, where each member represents one time series + interval. Each item in the time series list is a tuple containing the + time series index and a mapping containing the counts for each bucket + within the interval. (Due to the Lua data model, an empty mapping will + be represented as an empty list. The consumer of this data must convert + it back to the correct type.) + ]]-- function (configuration, arguments) local bands = range(1, configuration.bands) local time_series = get_active_indices( From c8caa8d54818b579ccd087f4701ca442d507fe15 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Tue, 14 Mar 2017 11:12:47 -0700 Subject: [PATCH 12/14] First-classify function --- src/sentry/scripts/similarity/index.lua | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index bdb96ba8030d3f..2e6d1e3145de0b 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -668,9 +668,7 @@ local commands = { local entries = build_variadic_argument_parser({ {'index', identity}, {'key', identity}, - {'data', function (value) - return cmsgpack.unpack(value) - end} + {'data', cmsgpack.unpack}, })(arguments) for _, entry in ipairs(entries) do From b397ff9c2062db388843b82fdf2f43e7f4d2d6a1 Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Tue, 14 Mar 2017 11:15:17 -0700 Subject: [PATCH 13/14] Simplify conditional EXPIREAT in IMPORT --- src/sentry/scripts/similarity/index.lua | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/sentry/scripts/similarity/index.lua b/src/sentry/scripts/similarity/index.lua index 2e6d1e3145de0b..437ec3fc80eb15 100644 --- a/src/sentry/scripts/similarity/index.lua +++ b/src/sentry/scripts/similarity/index.lua @@ -688,7 +688,6 @@ local commands = { entry.key ) - local touched = false for bucket, count in pairs(buckets) do local bucket_membership_key = get_bucket_membership_key( configuration.scope, @@ -706,13 +705,13 @@ local commands = { bucket, count ) - touched = true end -- The destination bucket frequency key may have not -- existed previously, so we need to make sure we set - -- the expiration on it in case it is new. - if touched then + -- the expiration on it in case it is new. (We only + -- have to do this if there we changed any bucket counts.) + if next(buckets) ~= nil then redis.call( 'EXPIREAT', destination_bucket_frequency_key, From 2cbd6bf848262c89803641df39f152c67b67d3fc Mon Sep 17 00:00:00 2001 From: Ted Kaemming Date: Tue, 14 Mar 2017 11:20:07 -0700 Subject: [PATCH 14/14] Add upper bound to msgpack-python test requirement --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 98e8a68ce5a67c..ccce143ec9f82f 100755 --- a/setup.py +++ b/setup.py @@ -78,7 +78,7 @@ 'cqlsh', # /cassandra 'datadog', - 'msgpack-python', + 'msgpack-python<0.5.0', 'pytest-cov>=1.8.0,<1.9.0', 'pytest-timeout>=0.5.0,<0.6.0', 'pytest-xdist>=1.11.0,<1.12.0',