Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extends ResultItem by timestamp #8202

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/tribler/core/recommender/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,17 @@ def create_crawl_query_info(query_id: int) -> dict:
}


def create_crawl_query_info_response(query_id: int, results: int, chosen_index: int, query: str) -> dict:
def create_crawl_query_info_response(query_id: int, timestamp: int, results: int, chosen_index: int, query: str) -> dict:
"""
A response with the number of available results for the query with the id ``query_id``.
"""
return {
"version": 0,
"version": 1,
"type": "query_info",
"query_id": query_id,
"results": results,
"chosen_index": chosen_index,
"timestamp": timestamp,
mg98 marked this conversation as resolved.
Show resolved Hide resolved
"query": query
}

Expand Down Expand Up @@ -176,7 +177,8 @@ def process_query_info(self, peer: Peer, request: dict) -> None:
query_id=query.rowid,
results=len(unpacked["results"]),
chosen_index=unpacked["chosen_index"],
query=unpacked["query"]
timestamp=unpacked.get("timestamp", 0),
query=unpacked["query"],
)), b""))

@lazy_wrapper(Crawl)
Expand Down Expand Up @@ -220,6 +222,7 @@ def __init__(self, request_cache: RequestCache, peer: Peer, response: dict) -> N
self.total_results = response["results"]
self.results: list[ResultItem | None] = [None] * self.total_results
self.chosen_index = response["chosen_index"]
self.timestamp = response.get("timestamp", 0)
self.query = response["query"]

def get_next_range(self) -> tuple[int, int] | None:
Expand Down Expand Up @@ -300,14 +303,15 @@ def init_crawl_history(self) -> None:
self.crawl_history[peer_mid] = (max_id, missing)

def finalize_query(self, peer: Peer, query_id: int, query: str, chosen_index: int,
results: list[ResultItem]) -> None:
timestamp: int, results: list[ResultItem]) -> None:
"""
Update self.crawl_history and write the results to a file.
"""
query_dir = os.path.join(self.crawl_directory, hexlify(peer.mid).decode())
os.makedirs(query_dir, exist_ok=True)
json_dict = {
"query": query,
"timestamp": timestamp,
"chosen_index": chosen_index,
"results": results
}
Expand Down Expand Up @@ -356,7 +360,7 @@ def process_query_info_response(self, peer: Peer, response: dict) -> None:

if next_range is None:
self.logger.info("Query %d is empty for %s.", response["query_id"], str(peer))
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index, [])
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index, cache.timestamp, [])
else:
self.request_cache.add(cache)
self.ez_send(peer, Crawl(peer.mid, self.json_pack(create_crawl_fragment(
Expand All @@ -375,7 +379,7 @@ def process_query_fragment_response(self, peer: Peer, response: dict) -> None:

if next_range is None:
self.logger.info("Query %d has completed for %s.", response["query_id"], str(peer))
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index,
self.finalize_query(peer, cache.query_id, cache.query, cache.chosen_index, cache.timestamp,
cast(list[ResultItem] , cache.results))
else:
self.request_cache.add(cache) # Reset the two-minute timer
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/recommender/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def add_query(self, json_data: str) -> None:
Inject data into our database.
"""
with db_session:
self.Query(version=0, json=json_data)
self.Query(version=1, json=json_data)
1 change: 1 addition & 0 deletions src/tribler/core/recommender/orm_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Query(Entity, metaclass=IterQuery):
"""
{
chosen_index: int,
timestamp: int,
query: str,
results: [{infohash: str, seeders: int, leechers: int}]
}
Expand Down
2 changes: 2 additions & 0 deletions src/tribler/core/recommender/restapi/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, middlewares: tuple = (), client_max_size: int = MAX_REQUEST_S
@json_schema(schema(ClickedRequest={
"query": (String, "The query that led to the list of results"),
"chosen_index": (String, "The winning result index in the results list"),
"timestamp": (Integer, "The timestamp of the query"),
"results": (List(Nested(schema(ClickedResult={"infohash": (String, "A displayed infohash"),
"seeders": (Integer, "Its displayed number of seeders"),
"leechers": (Integer, "Its displayed number of seeders")}))),
Expand All @@ -63,6 +64,7 @@ async def put_clicked(self, request: RequestType) -> RESTResponse:
{
query: str,
chosen_index: int,
timestamp: int,
results: list[{
infohash: str,
seeders: int,
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/torrent_checker/torrentchecker_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class UdpTrackerSession(TrackerSession):
# A list of transaction IDs that have been used in order to avoid conflict.
_active_session_dict: dict[UdpTrackerSession, int] = {}

def __init__(self, tracker_url: str, tracker_address: tuple[str, int], announce_page: str, # noqa: PLR0913
def __init__(self, tracker_url: str, tracker_address: tuple[str, int], announce_page: str,
timeout: float, proxy: tuple, socket_mgr: UdpSocketManager) -> None:
"""
Create a session for UDP trackers.
Expand Down
33 changes: 31 additions & 2 deletions src/tribler/test_unit/core/recommender/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def init_crawl_history(self) -> None:
"""

def finalize_query(self, peer: Peer, query_id: int, query: str, chosen_index: int,
results: list[ResultItem]) -> None:
timestamp: int,results: list[ResultItem]) -> None:
"""
Don't write to disk.
"""
Expand Down Expand Up @@ -122,7 +122,7 @@ async def test_crawl_table_empty(self) -> None:
self.assertEqual("table_size", response["type"])
self.assertEqual(0, response["total_queries"])

async def test_crawl_query(self) -> None:
async def test_crawl_query_v0(self) -> None:
"""
Test if a single query can be crawled.
"""
Expand Down Expand Up @@ -151,6 +151,35 @@ async def test_crawl_query(self) -> None:
self.assertEqual(1, self.crawler_overlay().crawl_history[self.mid(0)][0], "The known size should be 1")
self.assertSetEqual(set(), self.crawler_overlay().crawl_history[self.mid(0)][1], "There should be no missing")

async def test_crawl_query_v1(self) -> None:
"""
Test if a single query can be crawled.
"""
self.overlay(0).manager.add_query('{"query": "test query", "timestamp": 1234567890, "chosen_index": 2, "results": ['
f'{{"infohash": "{"01" * 20}", "seeders": 1, "leechers": 2}}, '
f'{{"infohash": "{"02" * 20}", "seeders": 3, "leechers": 4}}, '
f'{{"infohash": "{"03" * 20}", "seeders": 5, "leechers": 6}}'
']}')

with self.assertReceivedBy(1, [CrawlResponse, CrawlResponse, CrawlResponse]) as messages:
self.crawler_overlay().crawl_next(self.peer(0))
await self.deliver_messages()

response1 = json.loads(messages[0].data)
response2 = json.loads(messages[1].data)
response3 = json.loads(messages[2].data)
self.assertEqual("table_size", response1["type"])
self.assertEqual("query_info", response2["type"])
self.assertEqual(1, response2["query_id"])
self.assertEqual("query_fragment", response3["type"])
self.assertEqual(1, response3["query_id"])
self.assertListEqual(["01" * 20, "02" * 20, "03" * 20], response3["infohashes"])
self.assertListEqual([1, 3, 5], response3["seeders"])
self.assertListEqual([2, 4, 6], response3["leechers"])
self.assertIn(self.mid(0), self.crawler_overlay().crawl_history)
self.assertEqual(1, self.crawler_overlay().crawl_history[self.mid(0)][0], "The known size should be 1")
self.assertSetEqual(set(), self.crawler_overlay().crawl_history[self.mid(0)][1], "There should be no missing")

async def test_crawl_query_done(self) -> None:
"""
Test if a crawl after completion leads to no further requests for data.
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/test_unit/core/recommender/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_add_query(self) -> None:

self.assertEqual(1, size)
self.assertEqual(1, result.rowid)
self.assertEqual(0, result.version)
self.assertEqual(1, result.version)
self.assertEqual('{"key":"value"}', result.json)

def test_get_total_queries(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions src/tribler/ui/src/services/tribler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ export class TriblerService {
return (await this.http.put(`/recommender/clicked`, {
query: query,
chosen_index: results.findIndex((e) => e.infohash == clicked.infohash),
timestamp: Date.now(),
results: results.map((x) => { return {
infohash: x.infohash,
seeders: x.num_seeders,
Expand Down