Skip to content

Commit

Permalink
Merge pull request #6475 from drew2a/feature/tag_counter
Browse files Browse the repository at this point in the history
Tag System: Revert {pk, tag, infohash} clock
  • Loading branch information
drew2a authored Oct 21, 2021
2 parents c6a56c0 + 8126885 commit fec9ef7
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 49 deletions.
2 changes: 1 addition & 1 deletion src/pyipv8
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ def tag_torrent(infohash, tags_db, tags=None, suggested_tags=None):
# Give each torrent some tags
for tag in tags:
for key in [random_key_1, random_key_2]: # Each tag should be proposed by two unique users
counter = tags_db.get_next_operation_counter()
operation = TagOperation(infohash=infohash, tag=tag, operation=TagOperationEnum.ADD, timestamp=counter,
operation = TagOperation(infohash=infohash, tag=tag, operation=TagOperationEnum.ADD, clock=0,
creator_public_key=key.pub().key_to_bin())
operation.clock = tags_db.get_clock(operation) + 1
tags_db.add_tag_operation(operation, b"")

# Make sure we have some suggestions
for tag in suggested_tags:
counter = tags_db.get_next_operation_counter()
operation = TagOperation(infohash=infohash, tag=tag, operation=TagOperationEnum.ADD, timestamp=counter,
operation = TagOperation(infohash=infohash, tag=tag, operation=TagOperationEnum.ADD, clock=0,
creator_public_key=random_key_3.pub().key_to_bin())
operation.clock = tags_db.get_clock(operation) + 1
tags_db.add_tag_operation(operation, b"")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def on_request(self, peer, operation):
operation = TagOperation(
infohash=tag_operation.torrent_tag.torrent.infohash,
operation=tag_operation.operation,
timestamp=tag_operation.timestamp,
clock=tag_operation.clock,
creator_public_key=tag_operation.peer.public_key,
tag=tag_operation.torrent_tag.tag.name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TagOperation:
"""
infohash: type_from_format('20s')
operation: int
timestamp: int
clock: int # this is the lamport-like clock that unique for each triple {public_key, infohash, tag}
creator_public_key: type_from_format('74s')
tag: str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ def create_node(self, *args, **kwargs):
return MockIPv8("curve25519", TagCommunity, db=TagDatabase(), tags_key=LibNaCLSK(),
request_interval=REQUEST_INTERVAL_FOR_RANDOM_TAGS)

def create_message(self, tag=''):
def create_operation(self, tag=''):
community = self.overlay(0)
return TagOperation(infohash=b'1' * 20,
operation=TagOperationEnum.ADD,
timestamp=community.db.get_next_operation_counter(),
creator_public_key=community.tags_key.pub().key_to_bin(),
tag=tag)
operation = TagOperation(infohash=b'1' * 20, operation=TagOperationEnum.ADD, clock=0,
creator_public_key=community.tags_key.pub().key_to_bin(), tag=tag)
operation.clock = community.db.get_clock(operation) + 1
return operation

@db_session
async def fill_db(self):
Expand All @@ -42,7 +41,7 @@ async def fill_db(self):
# next 5 of them are incorrect
community = self.overlay(0)
for i in range(10):
message = self.create_message(f'{i}' * 3)
message = self.create_operation(f'{i}' * 3)
signature = community.sign(message)
# 5 of them are signed incorrectly
if i >= 5:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@


def create_message(operation: int = TagOperationEnum.ADD, tag: str = 'tag') -> TagOperation:
return TagOperation(infohash=b'infohash', operation=operation, timestamp=0,
creator_public_key=b'creator_public_key', tag=tag)
return TagOperation(infohash=b'infohash', operation=operation, clock=0, creator_public_key=b'peer', tag=tag)


async def test_correct_tag_size():
Expand Down
33 changes: 18 additions & 15 deletions src/tribler-core/tribler_core/components/tag/db/tag_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ def __init__(self, filename: Optional[str] = None):

@staticmethod
def define_binding(db):
class LocalPeer(db.Entity): # pylint: disable=unused-variable
counter = orm.Required(int, default=0, size=64)

class Peer(db.Entity):
id = orm.PrimaryKey(int, auto=True)
public_key = orm.Required(bytes, unique=True)
Expand Down Expand Up @@ -83,7 +80,7 @@ class TorrentTagOp(db.Entity):
peer = orm.Required(lambda: Peer)

operation = orm.Required(int)
timestamp = orm.Required(int)
clock = orm.Required(int)
signature = orm.Required(bytes)
updated_at = orm.Required(datetime.datetime, default=datetime.datetime.utcnow)

Expand Down Expand Up @@ -125,12 +122,12 @@ def add_tag_operation(self, operation: TagOperation, signature: bytes, is_local_

if not op: # then insert
self.instance.TorrentTagOp(torrent_tag=torrent_tag, peer=peer, operation=operation.operation,
timestamp=operation.timestamp, signature=signature)
clock=operation.clock, signature=signature)
torrent_tag.update_counter(operation.operation, is_local_peer=is_local_peer)
return

# if it is a message from the past, then return
if operation.timestamp <= op.timestamp:
if operation.clock <= op.clock:
return

# To prevent endless incrementing of the operation, we apply the following logic:
Expand All @@ -140,7 +137,7 @@ def add_tag_operation(self, operation: TagOperation, signature: bytes, is_local_
# 2. Increment new operation
torrent_tag.update_counter(operation.operation, is_local_peer=is_local_peer)
# 3. Update the operation entity
op.set(operation=operation.operation, timestamp=operation.timestamp, signature=signature,
op.set(operation=operation.operation, clock=operation.clock, signature=signature,
updated_at=datetime.datetime.utcnow())

def _get_tags(self, infohash: bytes, condition: Callable[[], bool]) -> List[str]:
Expand Down Expand Up @@ -185,15 +182,21 @@ def show_suggestions_condition(torrent_tag):

return self._get_tags(infohash, show_suggestions_condition)

def get_next_operation_counter(self) -> int:
""" Get counter of last operation and increment this counter in DB.
Returns: Counter that represented by integer (starts from 1).
def get_clock(self, operation: TagOperation) -> int:
""" Get the clock (int) of operation.
"""
local_peer = self._get_or_create(self.instance.LocalPeer)
next_operation_counter = local_peer.counter + 1
local_peer.set(counter=next_operation_counter)

return next_operation_counter
peer = self.instance.Peer.get(public_key=operation.creator_public_key)
tag = self.instance.Tag.get(name=operation.tag)
torrent = self.instance.Torrent.get(infohash=operation.infohash)
if not torrent or not tag:
return 0

torrent_tag = self.instance.TorrentTag.get(tag=tag, torrent=torrent)
if not torrent_tag or not peer:
return 0

op = self.instance.TorrentTagOp.get(torrent_tag=torrent_tag, peer=peer)
return op.clock if op else 0

def get_tags_operations_for_gossip(self, time_delta, count: int = 10) -> List:
""" Get random operations from the DB that older than time_delta.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ def create_torrent_tag(self, tag='tag', infohash=b'infohash'):

return torrent_tag

def add_operation(self, infohash=b'infohash', tag='', peer=b'', operation=TagOperationEnum.ADD,
is_local_peer=False, timestamp=None):
timestamp = timestamp or self.db.get_next_operation_counter()
message = TagOperation(infohash=infohash, tag=tag, operation=operation,
timestamp=timestamp, creator_public_key=peer)
self.db.add_tag_operation(message, signature=b'', is_local_peer=is_local_peer)
@staticmethod
def create_operation(infohash=b'infohash', tag='tag', peer=b'', operation=TagOperationEnum.ADD, clock=0):
return TagOperation(infohash=infohash, tag=tag, operation=operation, clock=clock, creator_public_key=peer)

def add_operation(self, infohash=b'infohash', tag='tag', peer=b'', operation=TagOperationEnum.ADD,
is_local_peer=False, clock=None):
operation = self.create_operation(infohash, tag, peer, operation, clock)
operation.clock = clock or self.db.get_clock(operation) + 1
self.db.add_tag_operation(operation, signature=b'', is_local_peer=is_local_peer)
commit()

@db_session
Expand Down Expand Up @@ -71,7 +74,7 @@ async def test_update_counter_remove(self):
assert not torrent_tag.local_operation

@db_session
async def test_update_counter_Local(self):
async def test_update_counter_local(self):
torrent_tag = self.create_torrent_tag()

# let's update local operation
Expand All @@ -98,19 +101,19 @@ def assert_all_tables_have_the_only_one_entity():
assert_all_tables_have_the_only_one_entity()

# add an operation from the past
self.add_operation(b'infohash', 'tag', b'peer1', timestamp=0)
self.add_operation(b'infohash', 'tag', b'peer1', clock=0)
assert_all_tables_have_the_only_one_entity()

# add a duplicate operation but from the future
self.add_operation(b'infohash', 'tag', b'peer1', timestamp=1000)
self.add_operation(b'infohash', 'tag', b'peer1', clock=1000)
assert_all_tables_have_the_only_one_entity()

assert self.db.instance.TorrentTagOp.get().operation == TagOperationEnum.ADD
assert self.db.instance.TorrentTag.get().added_count == 1
assert self.db.instance.TorrentTag.get().removed_count == 0

# add a unique operation from the future
self.add_operation(b'infohash', 'tag', b'peer1', operation=TagOperationEnum.REMOVE, timestamp=1001)
self.add_operation(b'infohash', 'tag', b'peer1', operation=TagOperationEnum.REMOVE, clock=1001)
assert_all_tables_have_the_only_one_entity()
assert self.db.instance.TorrentTagOp.get().operation == TagOperationEnum.REMOVE
assert self.db.instance.TorrentTag.get().added_count == 0
Expand Down Expand Up @@ -242,9 +245,12 @@ async def test_suggestions(self):
assert not self.db.get_suggestions(b'infohash') # below the threshold

@db_session
async def test_get_next_operation_counter(self):
assert self.db.get_next_operation_counter() == 1
assert self.db.get_next_operation_counter() == 2
async def test_get_clock_of_operation(self):
operation = self.create_operation(tag='tag1')
assert self.db.get_clock(operation) == 0

self.add_operation(infohash=operation.infohash, tag=operation.tag, peer=operation.creator_public_key, clock=1)
assert self.db.get_clock(operation) == 1

@db_session
async def test_get_tags_operations_for_gossip(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ def modify_tags(self, infohash: bytes, new_tags: Set[str]):
# Create individual tag operations for the added/removed tags
public_key = self.community.my_peer.key.pub().key_to_bin()
for tag in added_tags.union(removed_tags):
operation = TagOperationEnum.ADD if tag in added_tags else TagOperationEnum.REMOVE
counter = self.db.get_next_operation_counter()
operation = TagOperation(infohash=infohash, operation=operation, timestamp=counter,
type_of_operation = TagOperationEnum.ADD if tag in added_tags else TagOperationEnum.REMOVE
operation = TagOperation(infohash=infohash, operation=type_of_operation, clock=0,
creator_public_key=public_key, tag=tag)
operation.clock = self.db.get_clock(operation) + 1
signature = self.community.sign(operation)
self.db.add_tag_operation(operation, signature, is_local_peer=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ async def test_get_suggestions(rest_api, tags_db):
# Add a suggestion to the database
with db_session:
random_key = default_eccrypto.generate_key('low')
counter = tags_db.get_next_operation_counter()
operation = TagOperation(infohash=infohash, tag="test", operation=TagOperationEnum.ADD, timestamp=counter,
operation = TagOperation(infohash=infohash, tag="test", operation=TagOperationEnum.ADD, clock=0,
creator_public_key=random_key.pub().key_to_bin())
tags_db.add_tag_operation(operation, b"")

Expand Down

0 comments on commit fec9ef7

Please sign in to comment.