Skip to content

Commit

Permalink
feat: Neo4j backend for popular tables personalization (#233)
Browse files Browse the repository at this point in the history
Signed-off-by: Josh Howard <joshthoward@gmail.com>
  • Loading branch information
joshthoward authored Jan 5, 2021
1 parent a7b2ec5 commit d045efa
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 7 deletions.
43 changes: 40 additions & 3 deletions metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,8 @@ def get_latest_updated_ts(self) -> Optional[int]:
return None

@timer_with_counter
@_CACHE.cache('_get_popular_tables_uris', expire=_GET_POPULAR_TABLE_CACHE_EXPIRY_SEC)
def _get_popular_tables_uris(self, num_entries: int) -> List[str]:
@_CACHE.cache('_get_global_popular_tables_uris', _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC)
def _get_global_popular_tables_uris(self, num_entries: int) -> List[str]:
"""
Retrieve popular table uris. Will provide tables with top x popularity score.
Popularity score = number of distinct readers * log(total number of reads)
Expand All @@ -846,6 +846,38 @@ def _get_popular_tables_uris(self, num_entries: int) -> List[str]:

return [record['table_key'] for record in records]

@timer_with_counter
@_CACHE.cache('_get_personal_popular_tables_uris', _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC)
def _get_personal_popular_tables_uris(self, num_entries: int,
user_id: str) -> List[str]:
"""
Retrieve personalized popular table uris. Will provide tables with top
popularity score that have been read by a peer of the user_id provided.
The popularity score is defined in the same way as `_get_global_popular_tables_uris`
The result of this method will be cached based on the key (num_entries, user_id),
and the cache will be expired based on _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC
:return: Iterable of table uri
"""
statement = textwrap.dedent("""
MATCH (:User {key:$user_id})<-[:READ_BY]-(:Table)-[:READ_BY]->
(coUser:User)<-[coRead:READ_BY]-(table:Table)
WITH table.key AS table_key, count(DISTINCT coUser) AS co_readers,
sum(coRead.read_count) AS total_co_reads
WHERE co_readers >= $num_readers
RETURN table_key, (co_readers * log(total_co_reads)) AS score
ORDER BY score DESC LIMIT $num_entries;
""")
LOGGER.info('Querying popular tables URIs')
num_readers = current_app.config['POPULAR_TABLE_MINIMUM_READER_COUNT']
records = self._execute_cypher_query(statement=statement,
param_dict={'user_id': user_id,
'num_readers': num_readers,
'num_entries': num_entries})

return [record['table_key'] for record in records]

@timer_with_counter
def get_popular_tables(self, *,
num_entries: int,
Expand All @@ -857,8 +889,13 @@ def get_popular_tables(self, *,
:param num_entries:
:return: Iterable of PopularTable
"""
if user_id is None:
# Get global popular table URIs
table_uris = self._get_global_popular_tables_uris(num_entries)
else:
# Get personalized popular table URIs
table_uris = self._get_personal_popular_tables_uris(num_entries, user_id)

table_uris = self._get_popular_tables_uris(num_entries)
if not table_uris:
return []

Expand Down
19 changes: 15 additions & 4 deletions tests/unit/proxy/test_neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,17 +515,28 @@ def test_get_neo4j_latest_updated_ts(self) -> None:
self.assertIsNone(neo4j_last_updated_ts)

def test_get_popular_tables(self) -> None:
# Test cache hit
# Test cache hit for global popular tables
with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
mock_execute.return_value = [{'table_key': 'foo'}, {'table_key': 'bar'}]

neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000)
self.assertEqual(neo4j_proxy._get_popular_tables_uris(2), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_popular_tables_uris(2), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_popular_tables_uris(2), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_global_popular_tables_uris(2), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_global_popular_tables_uris(2), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_global_popular_tables_uris(2), ['foo', 'bar'])

self.assertEqual(mock_execute.call_count, 1)

# Test cache hit for personal popular tables
with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
mock_execute.return_value = [{'table_key': 'foo'}, {'table_key': 'bar'}]

neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000)
self.assertEqual(neo4j_proxy._get_personal_popular_tables_uris(2, 'test_id'), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_personal_popular_tables_uris(2, 'test_id'), ['foo', 'bar'])
self.assertEqual(neo4j_proxy._get_personal_popular_tables_uris(2, 'other_id'), ['foo', 'bar'])

self.assertEqual(mock_execute.call_count, 2)

with patch.object(GraphDatabase, 'driver'), patch.object(Neo4jProxy, '_execute_cypher_query') as mock_execute:
mock_execute.return_value = [
{'database_name': 'db', 'cluster_name': 'clstr', 'schema_name': 'sch', 'table_name': 'foo',
Expand Down

0 comments on commit d045efa

Please sign in to comment.