diff --git a/index.js b/index.js index e0f63c91..b448d8bf 100644 --- a/index.js +++ b/index.js @@ -1,17 +1,15 @@ /*! create-torrent. MIT License. WebTorrent LLC */ const bencode = require('bencode') -const BlockStream = require('block-stream2') +const blockIterator = require('block-iterator') const calcPieceLength = require('piece-length') const corePath = require('path') -const { BlobReadStream } = require('fast-blob-stream') const isFile = require('is-file') const junk = require('junk') const joinIterator = require('join-async-iterator') -const once = require('once') const parallel = require('run-parallel') const queueMicrotask = require('queue-microtask') const sha1 = require('simple-sha1') -const { Transform, PassThrough, Readable } = require('streamx') +require('fast-readable-async-iterator') const getFiles = require('./get-files') // browser exclude @@ -176,10 +174,10 @@ function _parseInput (input, opts, cb) { const file = {} if (isBlob(item)) { - file.getStream = getBlobStream(item) + file.getStream = item.stream() file.length = item.size } else if (Buffer.isBuffer(item)) { - file.getStream = getBufferStream(item) + file.getStream = [item] // wrap in iterable to write entire buffer at once instead of unwrapping all bytes file.length = item.length } else if (isReadable(item)) { file.getStream = getStreamStream(item, file) @@ -206,73 +204,41 @@ function _parseInput (input, opts, cb) { const MAX_OUTSTANDING_HASHES = 5 -function getPieceList (files, pieceLength, estimatedTorrentLength, opts, cb) { - cb = once(cb) +async function getPieceList (files, pieceLength, estimatedTorrentLength, opts, cb) { const pieces = [] let length = 0 let hashedLength = 0 const streams = files.map(file => file.getStream) + const onProgress = opts.onProgress + let remainingHashes = 0 let pieceNum = 0 let ended = false - const multistream = Readable.from(joinIterator(streams)) - const blockstream = new BlockStream(pieceLength, { zeroPadding: false }) - - multistream.on('error', onError) - - multistream - .pipe(blockstream) - .on('data', onData) - .on('end', onEnd) - .on('error', onError) - - function onData (chunk) { - length += chunk.length - - const i = pieceNum - sha1(chunk, hash => { - pieces[i] = hash - remainingHashes -= 1 - if (remainingHashes < MAX_OUTSTANDING_HASHES) { - blockstream.resume() - } - hashedLength += chunk.length - if (opts.onProgress) opts.onProgress(hashedLength, estimatedTorrentLength) - maybeDone() - }) - remainingHashes += 1 - if (remainingHashes >= MAX_OUTSTANDING_HASHES) { - blockstream.pause() + const iterator = blockIterator(joinIterator(streams), pieceLength, { zeroPadding: false }) + try { + for await (const chunk of iterator) { + await new Promise(resolve => { + length += chunk.length + const i = pieceNum + sha1(chunk, hash => { + pieces[i] = hash + --remainingHashes + hashedLength += chunk.length + if (onProgress) onProgress(hashedLength, estimatedTorrentLength) + resolve() + if (ended && remainingHashes === 0) cb(null, Buffer.from(pieces.join(''), 'hex'), length) + }) + ++pieceNum + if (++remainingHashes < MAX_OUTSTANDING_HASHES) resolve() + }) } - pieceNum += 1 - } - - function onEnd () { ended = true - maybeDone() - } - - function onError (err) { - cleanup() + } catch (err) { cb(err) } - - function cleanup () { - multistream.removeListener('error', onError) - blockstream.removeListener('data', onData) - blockstream.removeListener('end', onEnd) - blockstream.removeListener('error', onError) - } - - function maybeDone () { - if (ended && remainingHashes === 0) { - cleanup() - cb(null, Buffer.from(pieces.join(''), 'hex'), length) - } - } } function onFiles (files, opts, cb) { @@ -409,45 +375,18 @@ function isReadable (obj) { } /** - * Convert a `File` to a lazy readable stream. - * @param {File|Blob} file - * @return {function} - */ -function getBlobStream (file) { - return () => new BlobReadStream(file) -} - -/** - * Convert a `Buffer` to a lazy readable stream. - * @param {Buffer} buffer - * @return {function} - */ -function getBufferStream (buffer) { - return () => { - const s = new PassThrough() - s.end(buffer) - return s - } -} - -/** - * Convert a readable stream to a lazy readable stream. Adds instrumentation to track + * Convert a readable stream to a lazy async iterator. Adds instrumentation to track * the number of bytes in the stream and set `file.length`. * + * @generator * @param {Stream} readable * @param {Object} file - * @return {function} + * @return {Uint8Array} stream data/chunk */ -function getStreamStream (readable, file) { - return () => { - const counter = new Transform() - counter._transform = function (data, cb) { - file.length += data.length - this.push(data) - cb() - } - readable.pipe(counter) - return counter +async function * getStreamStream (readable, file) { + for await (const chunk of readable) { + file.length += chunk.length + yield chunk } } diff --git a/package.json b/package.json index 851ebdef..3e1f1107 100644 --- a/package.json +++ b/package.json @@ -19,18 +19,16 @@ }, "dependencies": { "bencode": "^2.0.3", - "block-stream2": "^2.1.0", - "fast-blob-stream": "^1.1.1", + "block-iterator": "^1.0.1", + "fast-readable-async-iterator": "^1.1.1", "is-file": "^1.0.0", "join-async-iterator": "^1.1.1", "junk": "^3.1.0", "minimist": "^1.2.5", - "once": "^1.4.0", "piece-length": "^2.0.1", "queue-microtask": "^1.2.3", "run-parallel": "^1.2.0", - "simple-sha1": "^3.1.0", - "streamx": "^2.12.4" + "simple-sha1": "^3.1.0" }, "devDependencies": { "@webtorrent/semantic-release-config": "1.0.8",