Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending large files over webrtc data channel using Simple peer disconnects the sending peer #393

Closed
SaiMadhav9494 opened this issue Nov 26, 2018 · 5 comments

Comments

@SaiMadhav9494
Copy link

I'm trying to send large files of size ~500mb over the data channel to the other peer with of chunk size of 16kb. I've noticed that the readstream runs faster and shoots the chunks to the data channel and the writestream on the other side freezes for a while, it almost starts to write when the read stream ends sending the chunks. mean while the sending peer goes offline, but the weird part is the data transfer doesn't stop until the writestream writes the data. I'm trying to figure out why the sending peer is getting disconnected while data transfer is going on!
this.peer = new SimplePeer({ initiator: true, trickle: true, reconnectTimer: 1000, channelConfig: { maxRetransmits: 5, reliable: false, ordered: false, maxPacketLifeTime: 3000 }, iceTransportPolicy: 'all', config: {iceServers: token.iceServers} // Config iceServers comes from twilio token });

@t-mullen
Copy link
Collaborator

t-mullen commented Dec 4, 2018

My guess is that you are writing the chunks to the peer without throttling. This might be causing your sending peer to run out of memory, or maybe the chunks are overflowing the internal datachannel buffer. Chunk, throttle, write.

If it's not that, please post how you are reading/chunking/writing to the peer.

You also might want to look at how instant.io+webtorrent implements large file transfer: https://github.com/webtorrent/instant.io/blob/master/client/index.js

FYI: disabling ordered or reliable packets may cause your file to be corrupted.

@SaiMadhav9494
Copy link
Author

Hey Mullen, Thanks for responding. I figured that the backpressure is causing these problems and made some changes accordingly. I tried to handle backpressure at readstream to peer data channel stream. Now I've noticed that the receiving peer goes offline. I guess I've to handle the backpressure at the place where the data is coming out of the peer channel and flowing to the writestream. I'm not sure if this is the right approach and also I will go through the link you've provided. Thanks for the help. Below is the simplified implementation I'm working on.

Sending peer:

readStream.on('data', function(data) {
    event.sender.send('readerData', (event, data));
 });

this.electronService.ipcRenderer.on('readerData', (event, data) => {
      if(this.noOverflow) {
        this.noOverflow = this.peer.write(JSON.stringify(data));   //sending data to recie
      }
      this.peer.once('drain', () => {
        this.noOverflow = this.peer.write(JSON.stringify(data));
      });
 });

Receiving peer:

this.peer.on('data', (data) => {
 var data = JSON.parse(data);
 
 this.electronService.ipcRenderer.send('writeFile', data);
});

ipcMain.on('writeFile', (event, data) => {
 var buffer = new Buffer.from(data);
 channelData.writeStream.write(buffer);
}

@anderspitman
Copy link

@SaiMadhav9494 did you ever get this working? I'd recommend a pull-based backpressure scheme, like what Reactive Streams use.

If you're interested, I'm working on a more general (language- and transport- agnostic) solution to this problem I'm calling "omnistreams". You can check it out here, but it's still very early.

@Demenus
Copy link

Demenus commented Apr 15, 2019

I was facing the same issue, modifying the _write method with a cork call:

if (self._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) {
  self._debug('start backpressure: bufferedAmount %d', self._channel.bufferedAmount)
  self._cb = cb
  self.cork()
}

And calling uncork in:

Peer.prototype._onChannelBufferedAmountLow = function () {
  var self = this
  if (self.destroyed || !self._cb) return
  self._debug('ending backpressure: bufferedAmount %d', self._channel.bufferedAmount)
  var cb = self._cb
  self._cb = null
  self.uncork()
  cb(null)
}

I believe the data is being buffered if we are facing the problem of a bufferedAmount value over MAX_BUFFERED_AMOUNT.

However this is not fully tested and we are keeping our old message queue which looks like this

    send(message) {
        // This is just an array of messages
        this.webRTCMessageQueue.push(message);

        if (this.webRTCPaused) {
            return;
        }

        this.sendMessageQueued();
    }

    sendMessageQueued() {
        this.webRTCPaused = false;
        let message = this.webRTCMessageQueue.shift();

        while (message) {
            if (this.simpePeer._channel.bufferedAmount && this.simpePeer._channel.bufferedAmount > BUFFER_FULL_THRESHOLD) {
                this.webRTCPaused = true;
                this.webRTCMessageQueue.push(message);

                const listener = () => {
                    this.simpePeer._channel.removeEventListener('bufferedamountlow', listener);
                    this.sendMessageQueued();
                };
                this.simpePeer._channel.addEventListener('bufferedamountlow', listener);
                return;
            }

            try {
                this.simpePeer.send(message);
                message = this.webRTCMessageQueue.shift();
            } catch (error) {
                throw new ApplicationError(`Error sending message, reason: ${error.name} - ${error.message}`);
            }
        }
    }

Hope it helps!

@over-engineer
Copy link

@Demenus I use a variation of your message queue, thanks for sharing that! 😄

However, for future readers, I think this.webRTCMessageQueue.push(message) in the while loop has to be this.webRTCMessageQueue.unshift(message) so the message gets back at the beginning of the queue. Otherwise, the receiving peer might not receive chunks in order.

I extend simple-peer’s Peer class, so it looks like this:

import SimplePeer from 'simple-peer';

// …

class Peer extends SimplePeer {
  constructor(opts) {
    super(opts);

    this.webRTCPaused = false;
    this.webRTCMessageQueue = [];
  }

  sendMessageQueued() {
    this.webRTCPaused = false;
    
    let message = this.webRTCMessageQueue.shift();

    while (message) {
      if (this._channel.bufferedAmount && this._channel.bufferedAmount > BUFFER_FULL_THRESHOLD) {
        this.webRTCPaused = true;
        this.webRTCMessageQueue.unshift(message);

        const listener = () => {
          this._channel.removeEventListener('bufferedamountlow', listener);
          this.sendMessageQueued();
        };

        this._channel.addEventListener('bufferedamountlow', listener);
        return;
      }

      try {
        super.send(message);
        message = this.webRTCMessageQueue.shift();
      } catch (error) {
        throw new Error(`Error send message, reason: ${error.name} - ${error.message}`);
      }
    }
  }

  send(chunk) {
    this.webRTCMessageQueue.push(chunk);

    if (this.webRTCPaused) {
      return;
    }

    this.sendMessageQueued();
  }
};

// …

export default Peer;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants