Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

standard for discovering related hypercores #7

Open
8 of 11 tasks
Tracked by #1
serapath opened this issue Feb 26, 2020 · 5 comments
Open
8 of 11 tasks
Tracked by #1

standard for discovering related hypercores #7

serapath opened this issue Feb 26, 2020 · 5 comments
Assignees

Comments

@serapath
Copy link
Member

serapath commented Feb 26, 2020

@todo


@serapath
Copy link
Member Author

serapath commented Mar 21, 2020

kappa-core / multifeed / multiplex

multifeed - mux

MULTIPLEXER

function Multiplexer (isInitiator, key, opts) { // `key` - protocol encryption key

  self._id = opts._id || Math.floor(Math.random() * 10000).toString(16)

  var LOCALOFFER          = []
  var REQUESTED_FEEDS     = []
  var REMOTE_OFFER        = []
  var ACTIVE_FEED_STREAMS = {}
  
  // Open a virtual feed that has the key set to the shared key
  self._feed = stream.open(key, function onopen () {
    // ...
    self._handshakeExt.send({ client: MULTIFEED, version: PROTOCOL_VERSION, userData: opts.userData })
  })
  
  // PROTOCOL:
 
  // PEER A send OFFER
  // 1. CREATE & EMIT 'manifest', called by `multifeed: mux.ready(...)` AND `multifeed: _addFeed(...)`
  self.offerFeeds = function (keys, opts) { // opts = custom data for 'want' selections
    self._localOffer = [...self._localOffer, ...keys] // 2. REMEMBER LOCAL OFFER
    self._manifestExt.send({keys})
  }
   
  // PEER B receive OFFER
  self._manifestExt = extension(EXT_MANIFEST, msg => { // 3. 
    REMOTE_OFFER = [...REMOTE_OFFER, ...msg.keys] // 4. ADD REMOTE OFFER
    self.emit('manifest', msg, self.requestFeeds) // => triggers REQUEST
  })
 
  // PEER B send REQUEST
  // 5. Sends your wishlist to remote AND `mux.on('manifest', function onManifest(m) { mux.requestFeeds(m.keys) }`
  // for classical multifeed `ACCEPT_ALL` behaviour both must call `want(remoteHas)`
  self.requestFeeds = function (keys) {
    REQUESTED_FEEDS = [...REQUESTED_FEEDS, ...keys] // 6. REMEMBER REQUESTED FEEDS
    self._requestFeedsExt.send(keys) // only request new feeds
  }

  // PEER A - receive REQUEST
  self._requestFeedsExt = extension(EXT_REQUEST_FEEDS, keys => keys => { // 5a. by other PEER
      var filtered = uniq(keys.filter(key => {
      if (!~LOCALOFFER.indexOf(key)) return // got request for non-offered feed
      return true // All good, we accept the key request
    }))
    self._replicateFeedsExt.send(filtered) // Tell remote which keys we will replicate
    self._replicateFeeds(filtered) // Start replicating as promised
  ))
 
 
  // PEER B - receive REPLICATION OFFER
  self._replicateFeedsExt = extension(EXT_REPLICATE_FEEDS, keys => { // feeds
    var filtered = keys.filter( key => !~REQUESTED_FEEDS.indexOf(key))
    // Start replicating as requested.
    self._replicateFeeds(filtered, () => self.stream.emit('remote-feeds')) // 
  })


  // Initializes new replication streams for feeds and joins their streams into
  // the main stream.
  self._replicateFeeds = function (keys, cb) {

    self.emit('replicate', keys, once(startFeedReplication))

    return keys
    
    // PEER A + B
    function startFeedReplication (keys) {
      var feeds = keys
      var pending = feeds.length

      // only the feeds passed to `feeds` option will be replicated (sent or received)
      // hypercore-protocol has built in protection against receiving unexpected/not asked for data.

      feeds.forEach(feed => {
        var hexKey = feed.key.toString('hex')

        // prevent a feed from being folded into the main stream twice.
        if (typeof ACTIVE_FEED_STREAMS[hexKey] !== 'undefined') return (!--pending) ? cb() : void 0

        var fStream = feed.replicate(self._initiator, Object.assign({}, { // REPLICATE FEED
          live: opts.live, download: opts.download, upload: opts.upload, encrypt: opts.encrypt, stream: self.stream
        }))

        ACTIVE_FEED_STREAMS[hexKey] = fStream // Store reference to this particular feed stream

        function cleanup (_, res) { // delete feed stream reference
          if (ACTIVE_FEED_STREAMS[hexKey]) delete ACTIVE_FEED_STREAMS[hexKey]
        }
        fStream.once('end', cleanup)
        fStream.once('error', cleanup)

        if (!--pending) cb()
      })

      // Bail on replication entirely if there were no feeds to add, and none are pending or active.
      if (feeds.length === 0 && Object.keys(ACTIVE_FEED_STREAMS).length === 0) {
        debug('[REPLICATION] terminating mux: no feeds to sync')
        self._feed.close()
        process.nextTick(cb)
      }
    }
  }
}

multifeed - index

LOCAL PEER STORAGE

self._streams = [] // all peers in the form of `mux` objects

LOCAL FEED STORAGE

self._feeds = {
  [feedkey1]: feed1,
  [feedkey2]: feed2,
  [feedkey3]: feed3,
}

on READY offers all it's FEED KEYS to PEERS

mux.ready(function () {
  var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
  mux.offerFeeds(keys)
})

stores locally and offers new FEED to PEERS

Multifeed.prototype._addFeed = function (feed, name) {
  self._feeds[name] = feed
  self._feedKeyToFeed[feed.key.toString('hex')] = feed
  feed.setMaxListeners(Infinity)
  self.emit('feed', feed, name)

  // forward live feed announcements
  if (!self._streams.length) return // no-op if no live-connections
  var hexKey = feed.key.toString('hex')
  // Tell each remote that we have a new key available unless it's already being replicated
  self._streams.forEach(function (mux) {
    if (!~mux.knownFeeds().indexOf(hexKey)) mux.offerFeeds([hexKey])
  })
}

add new peer (mux)

Multifeed.prototype.replicate = function (isInitiator, opts) {
  // All multifeeds get a random or passed in `_id`
  // When "ready" they make a feed using a default or passed encryptionKey and set it as `_root` feed

  // MAKE SESSION
  var mux = multiplexer(isInitiator, self._root.key, Object.assign({}, opts, {_id: this._id}))
  /* on ready */ self._streams.push(mux)

  // KEY EXCHANGE listener
  mux.on('manifest', function onManifest(m) { mux.requestFeeds(m.keys) }

  mux.on('replicate', function onReplicate(keys, done) { // REPLICATION REQUEST?
    await addMissingKeys(keys)
    // 1. make sure all keys are proper feedkeys
    // 2. check non of the given feeds already exist
    // for all new keys:
    //   1. make a storage with name "myKey" is "self._feeds.length"
    //   2. make new hypercore for the new feed
    //   3. `self._addFeed(feed, myKey)

    // => looks up all existing feeds based on given keys
    // => calls `mux` callback to replicate those feeds

    done(feed) // replicates
  })

  return mux.stream
}

@serapath
Copy link
Member Author

serapath commented Mar 22, 2020

proposal draft:
see datdotorg/datdot-research#17 (comment)

@okdistribute
Copy link

@serapath it's multifeed that's doing that, not cabal, just to be clear :)

@serapath
Copy link
Member Author

@okdistribute when checking i found cabal-core to use kappa-core and kappa-core to use multifeed, so it felt like underneath, that's whats happening on the level of exchanging hypercores.
can you recommend me where or what i should be looking at additionally?

@okdistribute
Copy link

okdistribute commented Mar 23, 2020

Sorry I think I wasn't clear, the datastructure that handles the feeds in cabal is multifeed, and in hyperdrive, it's corestore. So any app built on multifeed or corestore could work with datdot, if you target those datastructures rather than a particular application like hyperdrive or cabal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants