Skip to content

Commit

Permalink
Extends ResultItem by timestamp (#8202)
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink authored Oct 22, 2024
2 parents 71d4d97 + 62fa9f1 commit f377d1a
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 11 deletions.
16 changes: 10 additions & 6 deletions src/tribler/core/recommender/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ def create_crawl_query_info(query_id: int) -> dict:
}


def create_crawl_query_info_response(query_id: int, results: int, chosen_index: int, query: str) -> dict:
def create_crawl_query_info_response(query_id: int, timestamp: int, results: int, chosen_index: int, query: str) -> dict:
"""
A response with the number of available results for the query with the id ``query_id``.
"""
return {
"version": 0,
"version": 1,
"type": "query_info",
"query_id": query_id,
"results": results,
"chosen_index": chosen_index,
"timestamp": timestamp,
"query": query
}

Expand Down Expand Up @@ -176,7 +177,8 @@ def process_query_info(self, peer: Peer, request: dict) -> None:
query_id=query.rowid,
results=len(unpacked["results"]),
chosen_index=unpacked["chosen_index"],
query=unpacked["query"]
timestamp=unpacked.get("timestamp", 0),
query=unpacked["query"],
)), b""))

@lazy_wrapper(Crawl)
Expand Down Expand Up @@ -220,6 +222,7 @@ def __init__(self, request_cache: RequestCache, peer: Peer, response: dict) -> N
self.total_results = response["results"]
self.results: list[ResultItem | None] = [None] * self.total_results
self.chosen_index = response["chosen_index"]
self.timestamp = response.get("timestamp", 0)
self.query = response["query"]

def get_next_range(self) -> tuple[int, int] | None:
Expand Down Expand Up @@ -300,14 +303,15 @@ def init_crawl_history(self) -> None:
self.crawl_history[peer_mid] = (max_id, missing)

def finalize_query(self, peer: Peer, query_id: int, query: str, chosen_index: int,
results: list[ResultItem]) -> None:
timestamp: int, results: list[ResultItem]) -> None:
"""
Update self.crawl_history and write the results to a file.
"""
query_dir = os.path.join(self.crawl_directory, hexlify(peer.mid).decode())
os.makedirs(query_dir, exist_ok=True)
json_dict = {
"query": query,
"timestamp": timestamp,
"chosen_index": chosen_index,
"results": results
}
Expand Down Expand Up @@ -356,7 +360,7 @@ def process_query_info_response(self, peer: Peer, response: dict) -> None:

if next_range is None:
self.logger.info("Query %d is empty for %s.", response["query_id"], str(peer))
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index, [])
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index, cache.timestamp, [])
else:
self.request_cache.add(cache)
self.ez_send(peer, Crawl(peer.mid, self.json_pack(create_crawl_fragment(
Expand All @@ -375,7 +379,7 @@ def process_query_fragment_response(self, peer: Peer, response: dict) -> None:

if next_range is None:
self.logger.info("Query %d has completed for %s.", response["query_id"], str(peer))
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index,
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index, cache.timestamp,
cast(list[ResultItem] , cache.results))
else:
self.request_cache.add(cache) # Reset the two-minute timer
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/recommender/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def add_query(self, json_data: str) -> None:
Inject data into our database.
"""
with db_session:
self.Query(version=0, json=json_data)
self.Query(version=1, json=json_data)
1 change: 1 addition & 0 deletions src/tribler/core/recommender/orm_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Query(Entity, metaclass=IterQuery):
"""
{
chosen_index: int,
timestamp: int,
query: str,
results: [{infohash: str, seeders: int, leechers: int}]
}
Expand Down
2 changes: 2 additions & 0 deletions src/tribler/core/recommender/restapi/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, middlewares: tuple = (), client_max_size: int = MAX_REQUEST_S
@json_schema(schema(ClickedRequest={
"query": (String, "The query that led to the list of results"),
"chosen_index": (String, "The winning result index in the results list"),
"timestamp": (Integer, "The timestamp of the query"),
"results": (List(Nested(schema(ClickedResult={"infohash": (String, "A displayed infohash"),
"seeders": (Integer, "Its displayed number of seeders"),
"leechers": (Integer, "Its displayed number of seeders")}))),
Expand All @@ -63,6 +64,7 @@ async def put_clicked(self, request: RequestType) -> RESTResponse:
{
query: str,
chosen_index: int,
timestamp: int,
results: list[{
infohash: str,
seeders: int,
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/torrent_checker/torrentchecker_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class UdpTrackerSession(TrackerSession):
# A list of transaction IDs that have been used in order to avoid conflict.
_active_session_dict: dict[UdpTrackerSession, int] = {}

def __init__(self, tracker_url: str, tracker_address: tuple[str, int], announce_page: str, # noqa: PLR0913
def __init__(self, tracker_url: str, tracker_address: tuple[str, int], announce_page: str,
timeout: float, proxy: tuple, socket_mgr: UdpSocketManager) -> None:
"""
Create a session for UDP trackers.
Expand Down
33 changes: 31 additions & 2 deletions src/tribler/test_unit/core/recommender/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def init_crawl_history(self) -> None:
"""

def finalize_query(self, peer: Peer, query_id: int, query: str, chosen_index: int,
results: list[ResultItem]) -> None:
timestamp: int,results: list[ResultItem]) -> None:
"""
Don't write to disk.
"""
Expand Down Expand Up @@ -122,7 +122,7 @@ async def test_crawl_table_empty(self) -> None:
self.assertEqual("table_size", response["type"])
self.assertEqual(0, response["total_queries"])

async def test_crawl_query(self) -> None:
async def test_crawl_query_v0(self) -> None:
"""
Test if a single query can be crawled.
"""
Expand Down Expand Up @@ -151,6 +151,35 @@ async def test_crawl_query(self) -> None:
self.assertEqual(1, self.crawler_overlay().crawl_history[self.mid(0)][0], "The known size should be 1")
self.assertSetEqual(set(), self.crawler_overlay().crawl_history[self.mid(0)][1], "There should be no missing")

async def test_crawl_query_v1(self) -> None:
"""
Test if a single query can be crawled.
"""
self.overlay(0).manager.add_query('{"query": "test query", "timestamp": 1234567890, "chosen_index": 2, "results": ['
f'{{"infohash": "{"01" * 20}", "seeders": 1, "leechers": 2}}, '
f'{{"infohash": "{"02" * 20}", "seeders": 3, "leechers": 4}}, '
f'{{"infohash": "{"03" * 20}", "seeders": 5, "leechers": 6}}'
']}')

with self.assertReceivedBy(1, [CrawlResponse, CrawlResponse, CrawlResponse]) as messages:
self.crawler_overlay().crawl_next(self.peer(0))
await self.deliver_messages()

response1 = json.loads(messages[0].data)
response2 = json.loads(messages[1].data)
response3 = json.loads(messages[2].data)
self.assertEqual("table_size", response1["type"])
self.assertEqual("query_info", response2["type"])
self.assertEqual(1, response2["query_id"])
self.assertEqual("query_fragment", response3["type"])
self.assertEqual(1, response3["query_id"])
self.assertListEqual(["01" * 20, "02" * 20, "03" * 20], response3["infohashes"])
self.assertListEqual([1, 3, 5], response3["seeders"])
self.assertListEqual([2, 4, 6], response3["leechers"])
self.assertIn(self.mid(0), self.crawler_overlay().crawl_history)
self.assertEqual(1, self.crawler_overlay().crawl_history[self.mid(0)][0], "The known size should be 1")
self.assertSetEqual(set(), self.crawler_overlay().crawl_history[self.mid(0)][1], "There should be no missing")

async def test_crawl_query_done(self) -> None:
"""
Test if a crawl after completion leads to no further requests for data.
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/test_unit/core/recommender/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_add_query(self) -> None:

self.assertEqual(1, size)
self.assertEqual(1, result.rowid)
self.assertEqual(0, result.version)
self.assertEqual(1, result.version)
self.assertEqual('{"key":"value"}', result.json)

def test_get_total_queries(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions src/tribler/ui/src/services/tribler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ export class TriblerService {
return (await this.http.put(`/recommender/clicked`, {
query: query,
chosen_index: results.findIndex((e) => e.infohash == clicked.infohash),
timestamp: Date.now(),
results: results.map((x) => { return {
infohash: x.infohash,
seeders: x.num_seeders,
Expand Down

0 comments on commit f377d1a

Please sign in to comment.