From 2841e72d74b2a5838aa8760694c72f343dcc28ec Mon Sep 17 00:00:00 2001 From: drew2a Date: Thu, 14 Oct 2021 12:28:19 +0200 Subject: [PATCH] PR fixes --- .../components/metadata_store/utils.py | 2 +- .../components/tag/community/tag_community.py | 15 ++++++------- .../components/tag/community/tag_payload.py | 2 +- .../tag/community/tests/test_tag_community.py | 6 +++--- .../tag/community/tests/test_tag_payload.py | 4 ++-- .../tribler_core/components/tag/db/tag_db.py | 21 ++++++++----------- .../components/tag/db/tests/test_tag_db.py | 19 +++++++++-------- .../components/tag/restapi/tags_endpoint.py | 6 +++--- 8 files changed, 37 insertions(+), 38 deletions(-) diff --git a/src/tribler-core/tribler_core/components/metadata_store/utils.py b/src/tribler-core/tribler_core/components/metadata_store/utils.py index 518013446e..2a83b3295e 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/utils.py +++ b/src/tribler-core/tribler_core/components/metadata_store/utils.py @@ -34,7 +34,7 @@ def tag_torrent(infohash, tags_db, tags=None): for tag in tags: cur_time = int(time.time()) for key in [random_key_1, random_key_2]: # Each tag should be proposed by two unique users - message = TagOperationMessage(infohash=infohash, tag=tag, operation=TagOperation.ADD, time=cur_time, + message = TagOperationMessage(infohash=infohash, tag=tag, operation=TagOperation.ADD, counter=cur_time, creator_public_key=key.pub().key_to_bin()) tags_db.add_tag_operation(message, b"") diff --git a/src/tribler-core/tribler_core/components/tag/community/tag_community.py b/src/tribler-core/tribler_core/components/tag/community/tag_community.py index 0aa67f75e0..cac46bc8e3 100644 --- a/src/tribler-core/tribler_core/components/tag/community/tag_community.py +++ b/src/tribler-core/tribler_core/components/tag/community/tag_community.py @@ -2,7 +2,7 @@ from binascii import unhexlify from cryptography.exceptions import InvalidSignature -from pony.orm import TransactionIntegrityError, db_session +from pony.orm import db_session from ipv8.lazy_community import lazy_wrapper from ipv8.types import Key @@ -15,14 +15,14 @@ REQUESTED_TAGS_COUNT = 10 REQUEST_INTERVAL = 5 # 5 sec -TIME_DELTA_FOR_TAGS_THAT_READY_TO_GOSSIP = {'seconds': 60} CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes +TIME_DELTA_FOR_TAGS_THAT_READY_TO_GOSSIP = {'minutes': 1} class TagCommunity(TriblerCommunity): """ Community for disseminating tags across the network. - Only tags that older than 1 minute will be gossiped. + Only tags are older than 1 minute will be gossiped. """ community_id = unhexlify('042020c5e5e2ee0727fe99d704b430698e308d98') @@ -63,8 +63,6 @@ def on_message(self, peer, signature, payload): self.db.add_tag_operation(payload, signature.signature) self.logger.info(f'Tag added: {payload.tag}:{payload.infohash}') - except TransactionIntegrityError: # db error - pass except PeerValidationError as e: # peer has exhausted his response count self.logger.warning(e) except (ValueError, AssertionError) as e: # validation error @@ -78,7 +76,10 @@ def on_request(self, peer, payload): self.logger.info(f'On request {tags_count} tags') with db_session: - random_tag_operations = self.db.get_tags_operations_for_gossip(count=tags_count) + random_tag_operations = self.db.get_tags_operations_for_gossip( + count=tags_count, + time_delta=TIME_DELTA_FOR_TAGS_THAT_READY_TO_GOSSIP + ) self.logger.debug(f'Response {len(random_tag_operations)} tags') for tag_operation in random_tag_operations: @@ -86,7 +87,7 @@ def on_request(self, peer, payload): payload = TagOperationMessage( infohash=tag_operation.torrent_tag.torrent.infohash, operation=tag_operation.operation, - time=tag_operation.time, + counter=tag_operation.counter, creator_public_key=tag_operation.peer.public_key, tag=tag_operation.torrent_tag.tag.name, ) diff --git a/src/tribler-core/tribler_core/components/tag/community/tag_payload.py b/src/tribler-core/tribler_core/components/tag/community/tag_payload.py index d49293de0a..133682c96f 100644 --- a/src/tribler-core/tribler_core/components/tag/community/tag_payload.py +++ b/src/tribler-core/tribler_core/components/tag/community/tag_payload.py @@ -23,7 +23,7 @@ class TagOperationMessage: """ infohash: type_from_format('20s') operation: int - time: int + counter: int creator_public_key: type_from_format('74s') tag: str diff --git a/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_community.py b/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_community.py index 4ace9bb810..e4a77e44ed 100644 --- a/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_community.py +++ b/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_community.py @@ -2,7 +2,7 @@ from unittest.mock import MagicMock, Mock from cryptography.exceptions import InvalidSignature -from pony.orm import TransactionIntegrityError, db_session +from pony.orm import db_session from ipv8.test.base import TestBase from ipv8.test.mocking.ipv8 import MockIPv8 @@ -26,7 +26,7 @@ def create_node(self, *args, **kwargs): return MockIPv8("curve25519", TagCommunity, db=TagDatabase(), request_interval=REQUEST_INTERVAL_FOR_RANDOM_TAGS) def create_message(self, tag=''): - return TagOperationMessage(infohash=b'1' * 20, operation=TagOperation.ADD, time=1, + return TagOperationMessage(infohash=b'1' * 20, operation=TagOperation.ADD, counter=1, creator_public_key=self.overlay(0).my_peer.public_key.key_to_bin(), tag=tag) @@ -78,7 +78,7 @@ async def test_gossip_no_fresh_tags(self): async def test_on_message_eat_exceptions(self): # Tests that except blocks in on_message function works as expected # some exceptions should be eaten silently - exception_to_be_tested = {TransactionIntegrityError, PeerValidationError, ValueError, AssertionError, + exception_to_be_tested = {PeerValidationError, ValueError, AssertionError, InvalidSignature} await self.fill_db() for exception_class in exception_to_be_tested: diff --git a/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_payload.py b/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_payload.py index a4ae98c9ba..cdbe8fe75b 100644 --- a/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_payload.py +++ b/src/tribler-core/tribler_core/components/tag/community/tests/test_tag_payload.py @@ -6,9 +6,9 @@ pytestmark = pytest.mark.asyncio -def create_message(infohash=b'infohash', operation: int = TagOperation.ADD, time: int = 0, +def create_message(infohash=b'infohash', operation: int = TagOperation.ADD, counter: int = 0, creator_public_key=b'creator_public_key', tag: str = 'tag') -> TagOperationMessage: - return TagOperationMessage(infohash=infohash, operation=operation, time=time, creator_public_key=creator_public_key, + return TagOperationMessage(infohash=infohash, operation=operation, counter=counter, creator_public_key=creator_public_key, tag=tag) diff --git a/src/tribler-core/tribler_core/components/tag/db/tag_db.py b/src/tribler-core/tribler_core/components/tag/db/tag_db.py index 1722fd429b..f822b64c34 100644 --- a/src/tribler-core/tribler_core/components/tag/db/tag_db.py +++ b/src/tribler-core/tribler_core/components/tag/db/tag_db.py @@ -26,7 +26,6 @@ class Peer(db.Entity): public_key = orm.Required(bytes, unique=True) added_at = orm.Optional(datetime.datetime, default=datetime.datetime.utcnow) operations = orm.Set(lambda: TorrentTagOp) - last_time = orm.Required(int, default=0) class Torrent(db.Entity): id = orm.PrimaryKey(int, auto=True) @@ -76,7 +75,7 @@ class TorrentTagOp(db.Entity): peer = orm.Required(lambda: Peer) operation = orm.Required(int) - time = orm.Required(int) + counter = orm.Required(int) signature = orm.Required(bytes) updated_at = orm.Required(datetime.datetime, default=datetime.datetime.utcnow) @@ -110,8 +109,7 @@ def add_tag_operation(self, message: TagOperationMessage, signature: bytes, is_l Returns: """ self.logger.debug(f'Add tag operation. Infohash: {hexlify(message.infohash)}, tag: {message.tag}') - peer = self._get_or_create(self.instance.Peer, public_key=message.creator_public_key, - create_kwargs={'last_time': message.time}) + peer = self._get_or_create(self.instance.Peer, public_key=message.creator_public_key) tag = self._get_or_create(self.instance.Tag, name=message.tag) torrent = self._get_or_create(self.instance.Torrent, infohash=message.infohash) torrent_tag = self._get_or_create(self.instance.TorrentTag, tag=tag, torrent=torrent) @@ -119,12 +117,12 @@ def add_tag_operation(self, message: TagOperationMessage, signature: bytes, is_l if not op: # then insert self.instance.TorrentTagOp(torrent_tag=torrent_tag, peer=peer, operation=message.operation, - time=message.time, signature=signature) + counter=message.counter, signature=signature) torrent_tag.update_counter(message.operation, is_local_peer=is_local_peer) return # if it is a message from the past, then return - if message.time <= op.time: + if message.counter <= op.counter: return # To prevent endless incrementing of the operation, we apply the following logic: @@ -134,7 +132,7 @@ def add_tag_operation(self, message: TagOperationMessage, signature: bytes, is_l # 2. Increment new operation torrent_tag.update_counter(message.operation, is_local_peer=is_local_peer) # 3. Update the operation entity - op.set(operation=message.operation, time=message.time, signature=signature, + op.set(operation=message.operation, counter=message.counter, signature=signature, updated_at=datetime.datetime.utcnow()) def get_tags(self, infohash: bytes) -> List[str]: @@ -156,8 +154,8 @@ def show_condition(torrent_tag): query = orm.select(tt.tag.name for tt in query) return list(query) - def get_last_time_of_operation(self, infohash: bytes, tag: str, peer_public_key: bytes) -> int: - """ Get time of operation + def get_last_operation_counter(self, infohash: bytes, tag: str, peer_public_key: bytes) -> int: + """ Get counter of operation Args: infohash: the infohash of a torrent to which tag will be added tag: the string representation of a tag @@ -177,16 +175,15 @@ def get_last_time_of_operation(self, infohash: bytes, tag: str, peer_public_key: return 0 op = self.instance.TorrentTagOp.get(torrent_tag=torrent_tag, peer=peer) - return op.time if op else 0 + return op.counter if op else 0 - def get_tags_operations_for_gossip(self, time_delta=None, count: int = 10) -> List: + def get_tags_operations_for_gossip(self, time_delta, count: int = 10) -> List: """ Get random operations from the DB that older than time_delta. Args: time_delta: a dictionary for `datetime.timedelta` count: a limit for a resulting query """ - time_delta = time_delta or {'minutes': 1} updated_at = datetime.datetime.utcnow() - datetime.timedelta(**time_delta) return list(self.instance.TorrentTagOp .select(lambda tto: tto.updated_at <= updated_at) diff --git a/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py b/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py index ae2694ccac..b702d898f4 100644 --- a/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py +++ b/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py @@ -15,7 +15,7 @@ class TestTagDB(TestBase): def setUp(self): super().setUp() self.db = TagDatabase() - self.timer = 0 + self.counter = 0 async def tearDown(self): await super().tearDown() @@ -29,8 +29,8 @@ def create_torrent_tag(self, tag='tag', infohash=b'infohash'): def add_operation(self, infohash=b'', tag='', peer=b'', time_increment=1, operation=TagOperation.ADD, is_local_peer=False): - self.timer += time_increment - message = TagOperationMessage(infohash=infohash, tag=tag, operation=operation, time=self.timer, + self.counter += time_increment + message = TagOperationMessage(infohash=infohash, tag=tag, operation=operation, counter=self.counter, creator_public_key=peer) self.db.add_tag_operation(message, signature=b'', is_local_peer=is_local_peer) commit() @@ -42,14 +42,15 @@ async def test_get_or_create(self): assert self.db.instance.Peer.select().count() == 0 # test create - self.db._get_or_create(self.db.instance.Peer, public_key=b'123', create_kwargs={'last_time': 42}) + peer = self.db._get_or_create(self.db.instance.Peer, public_key=b'123') commit() + assert peer.public_key == b'123' assert self.db.instance.Peer.select().count() == 1 # test get - peer = self.db._get_or_create(self.db.instance.Peer, public_key=b'123', create_kwargs={'last_time': 24}) + peer = self.db._get_or_create(self.db.instance.Peer, public_key=b'123') assert peer.public_key == b'123' - assert peer.last_time == 42 + assert self.db.instance.Peer.select().count() == 1 @db_session async def test_update_counter_add(self): @@ -230,20 +231,20 @@ async def test_hide_local_tags(self): @db_session async def test_get_last_time_of_operation_no_data(self): # Test that if there is no data, then get_last_time_of_operation() should return 0 - assert self.db.get_last_time_of_operation(b'infohash', 'tag', b'peer_public_key') == 0 + assert self.db.get_last_operation_counter(b'infohash', 'tag', b'peer_public_key') == 0 @db_session async def test_get_last_time_of_operation_data_exists(self): self.add_operation(b'infohash', 'tag', b'peer_public_key', operation=TagOperation.ADD) self.add_operation(b'infohash', 'tag', b'peer_public_key', operation=TagOperation.REMOVE) - assert self.db.get_last_time_of_operation(b'infohash', 'tag', b'peer_public_key') == 2 + assert self.db.get_last_operation_counter(b'infohash', 'tag', b'peer_public_key') == 2 @db_session async def test_get_last_time_of_operation_no_torrent_tag(self): # Test that if there is no torrent_tag, get_last_time_of_operation() should return 0 self.add_operation(b'infohash', 'tag', b'peer_public_key', operation=TagOperation.ADD) self.db.instance.TorrentTag.get = Mock(return_value=None) - assert self.db.get_last_time_of_operation(b'infohash', 'tag', b'peer_public_key') == 0 + assert self.db.get_last_operation_counter(b'infohash', 'tag', b'peer_public_key') == 0 @db_session async def test_get_tags_operations_for_gossip(self): diff --git a/src/tribler-core/tribler_core/components/tag/restapi/tags_endpoint.py b/src/tribler-core/tribler_core/components/tag/restapi/tags_endpoint.py index f5862bcefc..5cb64652f9 100644 --- a/src/tribler-core/tribler_core/components/tag/restapi/tags_endpoint.py +++ b/src/tribler-core/tribler_core/components/tag/restapi/tags_endpoint.py @@ -84,8 +84,8 @@ def modify_tags(self, infohash: bytes, new_tags: Set[str]): public_key = self.community.my_peer.key.pub().key_to_bin() for tag in added_tags.union(removed_tags): operation = TagOperation.ADD if tag in added_tags else TagOperation.REMOVE - t = self.db.get_last_time_of_operation(infohash, tag, public_key) + 1 - message = TagOperationMessage(infohash=infohash, operation=operation, time=t, creator_public_key=public_key, - tag=tag) + counter = self.db.get_last_operation_counter(infohash, tag, public_key) + 1 + message = TagOperationMessage(infohash=infohash, operation=operation, counter=counter, + creator_public_key=public_key, tag=tag) signature = self.community.sign(message) self.db.add_tag_operation(message, signature, is_local_peer=True)