Skip to content

Commit

Permalink
PR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Oct 14, 2021
1 parent 7b47c18 commit 2841e72
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def tag_torrent(infohash, tags_db, tags=None):
for tag in tags:
cur_time = int(time.time())
for key in [random_key_1, random_key_2]: # Each tag should be proposed by two unique users
message = TagOperationMessage(infohash=infohash, tag=tag, operation=TagOperation.ADD, time=cur_time,
message = TagOperationMessage(infohash=infohash, tag=tag, operation=TagOperation.ADD, counter=cur_time,
creator_public_key=key.pub().key_to_bin())
tags_db.add_tag_operation(message, b"")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from binascii import unhexlify

from cryptography.exceptions import InvalidSignature
from pony.orm import TransactionIntegrityError, db_session
from pony.orm import db_session

from ipv8.lazy_community import lazy_wrapper
from ipv8.types import Key
Expand All @@ -15,14 +15,14 @@
REQUESTED_TAGS_COUNT = 10

REQUEST_INTERVAL = 5 # 5 sec
TIME_DELTA_FOR_TAGS_THAT_READY_TO_GOSSIP = {'seconds': 60}
CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes
TIME_DELTA_FOR_TAGS_THAT_READY_TO_GOSSIP = {'minutes': 1}


class TagCommunity(TriblerCommunity):
""" Community for disseminating tags across the network.
Only tags that older than 1 minute will be gossiped.
Only tags are older than 1 minute will be gossiped.
"""

community_id = unhexlify('042020c5e5e2ee0727fe99d704b430698e308d98')
Expand Down Expand Up @@ -63,8 +63,6 @@ def on_message(self, peer, signature, payload):
self.db.add_tag_operation(payload, signature.signature)
self.logger.info(f'Tag added: {payload.tag}:{payload.infohash}')

except TransactionIntegrityError: # db error
pass
except PeerValidationError as e: # peer has exhausted his response count
self.logger.warning(e)
except (ValueError, AssertionError) as e: # validation error
Expand All @@ -78,15 +76,18 @@ def on_request(self, peer, payload):
self.logger.info(f'On request {tags_count} tags')

with db_session:
random_tag_operations = self.db.get_tags_operations_for_gossip(count=tags_count)
random_tag_operations = self.db.get_tags_operations_for_gossip(
count=tags_count,
time_delta=TIME_DELTA_FOR_TAGS_THAT_READY_TO_GOSSIP
)

self.logger.debug(f'Response {len(random_tag_operations)} tags')
for tag_operation in random_tag_operations:
try:
payload = TagOperationMessage(
infohash=tag_operation.torrent_tag.torrent.infohash,
operation=tag_operation.operation,
time=tag_operation.time,
counter=tag_operation.counter,
creator_public_key=tag_operation.peer.public_key,
tag=tag_operation.torrent_tag.tag.name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TagOperationMessage:
"""
infohash: type_from_format('20s')
operation: int
time: int
counter: int
creator_public_key: type_from_format('74s')
tag: str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from unittest.mock import MagicMock, Mock

from cryptography.exceptions import InvalidSignature
from pony.orm import TransactionIntegrityError, db_session
from pony.orm import db_session

from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8
Expand All @@ -26,7 +26,7 @@ def create_node(self, *args, **kwargs):
return MockIPv8("curve25519", TagCommunity, db=TagDatabase(), request_interval=REQUEST_INTERVAL_FOR_RANDOM_TAGS)

def create_message(self, tag=''):
return TagOperationMessage(infohash=b'1' * 20, operation=TagOperation.ADD, time=1,
return TagOperationMessage(infohash=b'1' * 20, operation=TagOperation.ADD, counter=1,
creator_public_key=self.overlay(0).my_peer.public_key.key_to_bin(),
tag=tag)

Expand Down Expand Up @@ -78,7 +78,7 @@ async def test_gossip_no_fresh_tags(self):
async def test_on_message_eat_exceptions(self):
# Tests that except blocks in on_message function works as expected
# some exceptions should be eaten silently
exception_to_be_tested = {TransactionIntegrityError, PeerValidationError, ValueError, AssertionError,
exception_to_be_tested = {PeerValidationError, ValueError, AssertionError,
InvalidSignature}
await self.fill_db()
for exception_class in exception_to_be_tested:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
pytestmark = pytest.mark.asyncio


def create_message(infohash=b'infohash', operation: int = TagOperation.ADD, time: int = 0,
def create_message(infohash=b'infohash', operation: int = TagOperation.ADD, counter: int = 0,
creator_public_key=b'creator_public_key', tag: str = 'tag') -> TagOperationMessage:
return TagOperationMessage(infohash=infohash, operation=operation, time=time, creator_public_key=creator_public_key,
return TagOperationMessage(infohash=infohash, operation=operation, counter=counter, creator_public_key=creator_public_key,
tag=tag)


Expand Down
21 changes: 9 additions & 12 deletions src/tribler-core/tribler_core/components/tag/db/tag_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class Peer(db.Entity):
public_key = orm.Required(bytes, unique=True)
added_at = orm.Optional(datetime.datetime, default=datetime.datetime.utcnow)
operations = orm.Set(lambda: TorrentTagOp)
last_time = orm.Required(int, default=0)

class Torrent(db.Entity):
id = orm.PrimaryKey(int, auto=True)
Expand Down Expand Up @@ -76,7 +75,7 @@ class TorrentTagOp(db.Entity):
peer = orm.Required(lambda: Peer)

operation = orm.Required(int)
time = orm.Required(int)
counter = orm.Required(int)
signature = orm.Required(bytes)
updated_at = orm.Required(datetime.datetime, default=datetime.datetime.utcnow)

Expand Down Expand Up @@ -110,21 +109,20 @@ def add_tag_operation(self, message: TagOperationMessage, signature: bytes, is_l
Returns:
"""
self.logger.debug(f'Add tag operation. Infohash: {hexlify(message.infohash)}, tag: {message.tag}')
peer = self._get_or_create(self.instance.Peer, public_key=message.creator_public_key,
create_kwargs={'last_time': message.time})
peer = self._get_or_create(self.instance.Peer, public_key=message.creator_public_key)
tag = self._get_or_create(self.instance.Tag, name=message.tag)
torrent = self._get_or_create(self.instance.Torrent, infohash=message.infohash)
torrent_tag = self._get_or_create(self.instance.TorrentTag, tag=tag, torrent=torrent)
op = self.instance.TorrentTagOp.get_for_update(torrent_tag=torrent_tag, peer=peer)

if not op: # then insert
self.instance.TorrentTagOp(torrent_tag=torrent_tag, peer=peer, operation=message.operation,
time=message.time, signature=signature)
counter=message.counter, signature=signature)
torrent_tag.update_counter(message.operation, is_local_peer=is_local_peer)
return

# if it is a message from the past, then return
if message.time <= op.time:
if message.counter <= op.counter:
return

# To prevent endless incrementing of the operation, we apply the following logic:
Expand All @@ -134,7 +132,7 @@ def add_tag_operation(self, message: TagOperationMessage, signature: bytes, is_l
# 2. Increment new operation
torrent_tag.update_counter(message.operation, is_local_peer=is_local_peer)
# 3. Update the operation entity
op.set(operation=message.operation, time=message.time, signature=signature,
op.set(operation=message.operation, counter=message.counter, signature=signature,
updated_at=datetime.datetime.utcnow())

def get_tags(self, infohash: bytes) -> List[str]:
Expand All @@ -156,8 +154,8 @@ def show_condition(torrent_tag):
query = orm.select(tt.tag.name for tt in query)
return list(query)

def get_last_time_of_operation(self, infohash: bytes, tag: str, peer_public_key: bytes) -> int:
""" Get time of operation
def get_last_operation_counter(self, infohash: bytes, tag: str, peer_public_key: bytes) -> int:
""" Get counter of operation
Args:
infohash: the infohash of a torrent to which tag will be added
tag: the string representation of a tag
Expand All @@ -177,16 +175,15 @@ def get_last_time_of_operation(self, infohash: bytes, tag: str, peer_public_key:
return 0

op = self.instance.TorrentTagOp.get(torrent_tag=torrent_tag, peer=peer)
return op.time if op else 0
return op.counter if op else 0

def get_tags_operations_for_gossip(self, time_delta=None, count: int = 10) -> List:
def get_tags_operations_for_gossip(self, time_delta, count: int = 10) -> List:
""" Get random operations from the DB that older than time_delta.
Args:
time_delta: a dictionary for `datetime.timedelta`
count: a limit for a resulting query
"""
time_delta = time_delta or {'minutes': 1}
updated_at = datetime.datetime.utcnow() - datetime.timedelta(**time_delta)
return list(self.instance.TorrentTagOp
.select(lambda tto: tto.updated_at <= updated_at)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class TestTagDB(TestBase):
def setUp(self):
super().setUp()
self.db = TagDatabase()
self.timer = 0
self.counter = 0

async def tearDown(self):
await super().tearDown()
Expand All @@ -29,8 +29,8 @@ def create_torrent_tag(self, tag='tag', infohash=b'infohash'):

def add_operation(self, infohash=b'', tag='', peer=b'', time_increment=1, operation=TagOperation.ADD,
is_local_peer=False):
self.timer += time_increment
message = TagOperationMessage(infohash=infohash, tag=tag, operation=operation, time=self.timer,
self.counter += time_increment
message = TagOperationMessage(infohash=infohash, tag=tag, operation=operation, counter=self.counter,
creator_public_key=peer)
self.db.add_tag_operation(message, signature=b'', is_local_peer=is_local_peer)
commit()
Expand All @@ -42,14 +42,15 @@ async def test_get_or_create(self):
assert self.db.instance.Peer.select().count() == 0

# test create
self.db._get_or_create(self.db.instance.Peer, public_key=b'123', create_kwargs={'last_time': 42})
peer = self.db._get_or_create(self.db.instance.Peer, public_key=b'123')
commit()
assert peer.public_key == b'123'
assert self.db.instance.Peer.select().count() == 1

# test get
peer = self.db._get_or_create(self.db.instance.Peer, public_key=b'123', create_kwargs={'last_time': 24})
peer = self.db._get_or_create(self.db.instance.Peer, public_key=b'123')
assert peer.public_key == b'123'
assert peer.last_time == 42
assert self.db.instance.Peer.select().count() == 1

@db_session
async def test_update_counter_add(self):
Expand Down Expand Up @@ -230,20 +231,20 @@ async def test_hide_local_tags(self):
@db_session
async def test_get_last_time_of_operation_no_data(self):
# Test that if there is no data, then get_last_time_of_operation() should return 0
assert self.db.get_last_time_of_operation(b'infohash', 'tag', b'peer_public_key') == 0
assert self.db.get_last_operation_counter(b'infohash', 'tag', b'peer_public_key') == 0

@db_session
async def test_get_last_time_of_operation_data_exists(self):
self.add_operation(b'infohash', 'tag', b'peer_public_key', operation=TagOperation.ADD)
self.add_operation(b'infohash', 'tag', b'peer_public_key', operation=TagOperation.REMOVE)
assert self.db.get_last_time_of_operation(b'infohash', 'tag', b'peer_public_key') == 2
assert self.db.get_last_operation_counter(b'infohash', 'tag', b'peer_public_key') == 2

@db_session
async def test_get_last_time_of_operation_no_torrent_tag(self):
# Test that if there is no torrent_tag, get_last_time_of_operation() should return 0
self.add_operation(b'infohash', 'tag', b'peer_public_key', operation=TagOperation.ADD)
self.db.instance.TorrentTag.get = Mock(return_value=None)
assert self.db.get_last_time_of_operation(b'infohash', 'tag', b'peer_public_key') == 0
assert self.db.get_last_operation_counter(b'infohash', 'tag', b'peer_public_key') == 0

@db_session
async def test_get_tags_operations_for_gossip(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def modify_tags(self, infohash: bytes, new_tags: Set[str]):
public_key = self.community.my_peer.key.pub().key_to_bin()
for tag in added_tags.union(removed_tags):
operation = TagOperation.ADD if tag in added_tags else TagOperation.REMOVE
t = self.db.get_last_time_of_operation(infohash, tag, public_key) + 1
message = TagOperationMessage(infohash=infohash, operation=operation, time=t, creator_public_key=public_key,
tag=tag)
counter = self.db.get_last_operation_counter(infohash, tag, public_key) + 1
message = TagOperationMessage(infohash=infohash, operation=operation, counter=counter,
creator_public_key=public_key, tag=tag)
signature = self.community.sign(message)
self.db.add_tag_operation(message, signature, is_local_peer=True)

0 comments on commit 2841e72

Please sign in to comment.