Skip to content

Commit

Permalink
Add TagComponent
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Sep 30, 2021
1 parent 64e6b64 commit cef9e7d
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pony import orm
from pony.orm import composite_key


def define_binding(db):
class Tag(db.Entity):
id = orm.PrimaryKey(int, auto=True)
infohash = orm.Required(bytes)
tag = orm.Required(str)
creator_public_key = orm.Required(bytes)
creator_sign = orm.Required(bytes)

composite_key(infohash, tag, creator_public_key)

return Tag
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
json_node,
metadata_node,
misc,
torrent_metadata,
tag, torrent_metadata,
torrent_state,
tracker_state,
vsids,
Expand Down Expand Up @@ -201,6 +201,8 @@ def sqlite_disable_sync(_, connection):
self.ChannelVote = channel_vote.define_binding(self._db)
self.ChannelPeer = channel_peer.define_binding(self._db)
self.Vsids = vsids.define_binding(self._db)
self.Tag = tag.define_binding(self._db)


self.ChannelMetadata._channels_dir = channels_dir # pylint: disable=protected-access

Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import json
import random
from binascii import unhexlify

from pony.orm import TransactionIntegrityError, db_session

from ipv8.lazy_community import lazy_wrapper
from tribler_core.components.metadata_store.db.store import MetadataStore
from tribler_core.components.tag.community.tag_payload import TagMessage, TagModel
from tribler_core.modules.tribler_community import TriblerCommunity

GOSSIP_RANDOM_TAGS_COUNT = 10
GOSSIP_RANDOM_PEERS_COUNT = 10


class TagCommunity(TriblerCommunity):
"""
Community for disseminating tags across the network.
"""

community_id = unhexlify('042020c5e5e2ee0727fe99d704b430698e308d98')

def __init__(self, *args, metadata_store: MetadataStore = None,
gossip_interval_for_tags_in_sec=5, **kwargs):
super().__init__(*args, **kwargs)
self.metadata_store = metadata_store

self.add_message_handler(TagMessage, self.on_message)
self.register_task("gossip_random_tags", self.gossip_random_tags, interval=gossip_interval_for_tags_in_sec)

self.logger.info('Tag community initialized')

@lazy_wrapper(TagMessage)
def on_message(self, peer, payload):
self.logger.debug(f'Message received: {payload}')
dictionary = json.loads(payload.json.decode("utf-8"))
tag_model = TagModel.parse_obj(dictionary)
self.logger.debug(f'Write to db: {tag_model}')
try:
with db_session():
self.metadata_store.Tag(**tag_model.to_pony_dict())
except TransactionIntegrityError: # ignore all duplicates
pass

def gossip_random_tags(self):
peer = random.choice(self.get_peers())
with db_session:
random_tags = list(self.metadata_store.Tag.select_random(GOSSIP_RANDOM_TAGS_COUNT))
self.logger.debug(f'Gossip tags({len(random_tags)})')

for tag in random_tags:
tag_model = TagModel.from_pony(tag)
self.logger.debug(f'Sending tag: f{tag_model}')
self.ez_send(peer, TagMessage(tag_model.json().encode("utf-8")))
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from types import SimpleNamespace

from pydantic import BaseModel

from ipv8.messaging.lazy_payload import VariablePayload, vp_compile


class TagModel(BaseModel):
infohash: str
tag: str
creator_public_key: str
creator_sign: str

@staticmethod
def from_pony(tag):
return TagModel(
infohash=tag.infohash.decode("utf-8"),
tag=tag.tag,
creator_public_key=tag.creator_public_key.decode("utf-8"),
creator_sign=tag.creator_sign.decode("utf-8")
)

def to_pony_dict(self):
return vars(SimpleNamespace(
infohash=self.infohash.encode("utf-8"),
tag=self.tag,
creator_public_key=self.creator_public_key.encode("utf-8"),
creator_sign=self.creator_sign.encode("utf-8")
))


@vp_compile
class TagMessage(VariablePayload):
msg_id = 1
format_list = ['varlenI']
names = ['json']
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from pony.orm import db_session

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8
from tribler_core.components.metadata_store.db.store import MetadataStore
from tribler_core.components.tag.community.tag_community import TagCommunity
from tribler_core.utilities.path_util import Path

GOSSIP_INTERVAL_FOR_RANDOM_TAGS = 0.1 # in seconds


class TestTagCommunity(TestBase):

def setUp(self):
super().setUp()
self.count = 0
self.metadata_store_set = set()
self.initialize(TagCommunity, 2)

async def tearDown(self):
for metadata_store in self.metadata_store_set:
metadata_store.shutdown()
await super().tearDown()

def create_node(self, *args, **kwargs):
metadata_store = MetadataStore(Path(self.temporary_directory()) / f"{self.count}",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"))
self.metadata_store_set.add(metadata_store)

self.count += 1
return MockIPv8("curve25519", TagCommunity, metadata_store=metadata_store,
gossip_interval_for_tags_in_sec=GOSSIP_INTERVAL_FOR_RANDOM_TAGS)

async def test_gossip(self):
node0_db = self.overlay(0).metadata_store.Tag
node1_db = self.overlay(1).metadata_store.Tag
with db_session:
for i in range(5):
bytes_str = f'{i}'.encode() * 20
node1_db(infohash=bytes_str, tag=f'{i}', creator_public_key=bytes_str, creator_sign=bytes_str)

with db_session:
assert node0_db.select().count() == 0
assert node1_db.select().count() == 5

await self.introduce_nodes()
await self.deliver_messages(timeout=GOSSIP_INTERVAL_FOR_RANDOM_TAGS * 2)

with db_session:
assert node0_db.select().count() == 5
assert node1_db.select().count() == 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from types import SimpleNamespace

import pytest

from tribler_core.components.tag.community.tag_payload import TagModel

pytestmark = pytest.mark.asyncio


async def test_model_from_pony():
tag = SimpleNamespace(infohash=b'infohash', tag='tag', creator_public_key=b'creator_public_key',
creator_sign=b'creator_sign')
tag_model = TagModel.from_pony(tag)
assert tag_model.infohash == 'infohash'
assert tag_model.tag == 'tag'
assert tag_model.creator_public_key == 'creator_public_key'
assert tag_model.creator_sign == 'creator_sign'

assert tag_model.json() == '{"infohash": "infohash", "tag": "tag", "creator_public_key": "creator_public_key", ' \
'"creator_sign": "creator_sign"}'


async def test_pony_dict_from_model():
tag_model = TagModel(infohash='infohash', tag='tag', creator_public_key='creator_public_key',
creator_sign='creator_sign')
tag = tag_model.to_pony_dict()

assert tag['infohash'] == b'infohash'
assert tag['tag'] == 'tag'
assert tag['creator_public_key'] == b'creator_public_key'
assert tag['creator_sign'] == b'creator_sign'
43 changes: 43 additions & 0 deletions src/tribler-core/tribler_core/components/tag/tag_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from ipv8.peerdiscovery.discovery import RandomWalk
from ipv8_service import IPv8

from tribler_core.components.base import Component
from tribler_core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler_core.components.ipv8 import Ipv8Component
from tribler_core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler_core.components.tag.community.tag_community import TagCommunity

INFINITE = -1


class TagComponent(Component):
community: TagCommunity
_ipv8: IPv8

async def run(self):
await super().run()

ipv8_component = await self.require_component(Ipv8Component)
metadata_store_component = await self.require_component(MetadataStoreComponent)

self._ipv8 = ipv8_component.ipv8

community = TagCommunity(
ipv8_component.peer,
self._ipv8.endpoint,
self._ipv8.network,
metadata_store=metadata_store_component.mds
)
self.community = community

self._ipv8.add_strategy(community, RandomWalk(community), 30)
self._ipv8.add_strategy(community, RemovePeers(community), INFINITE)

community.bootstrappers.append(ipv8_component.make_bootstrapper())

# await self.init_endpoints(endpoints=['remote_query', 'channels', 'collections'],
# values={'gigachannel_community': community})

async def shutdown(self):
await super().shutdown()
await self._ipv8.unload_overlay(self.community)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest

from tribler_core.components.base import Session
from tribler_core.components.ipv8 import Ipv8Component
from tribler_core.components.masterkey.masterkey_component import MasterKeyComponent
from tribler_core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler_core.components.restapi import RESTComponent
from tribler_core.components.tag.tag_component import TagComponent


# pylint: disable=protected-access


@pytest.mark.asyncio
async def test_metadata_store_component(tribler_config):
tribler_config.ipv8.enabled = True
tribler_config.libtorrent.enabled = True
tribler_config.chant.enabled = True
components = [MasterKeyComponent(), Ipv8Component(), RESTComponent(), MetadataStoreComponent(), TagComponent()]
session = Session(tribler_config, components)
with session:
comp = TagComponent.instance()
await session.start()

assert comp.started.is_set() and not comp.failed
assert comp.community

await session.shutdown()

0 comments on commit cef9e7d

Please sign in to comment.