#!/usr/bin/env node const fs = require("fs"); const pipe = require("it-pipe"); const minimist = require("minimist"); const util = require("util"); // Libp2p imports. const { NOISE } = require("@chainsafe/libp2p-noise"); const libp2p = require("libp2p"); const libp2pGetPeer = require("libp2p/src/get-peer"); const KadDHT = require("libp2p-kad-dht"); const Mplex = require("libp2p-mplex"); const TCP = require("libp2p-tcp"); // Global variables. let _args = null; let _localNode = null; async function _main$() { // Parse arguments. _args = minimist(process.argv.slice(2)); // Start the libp2p node. _localNode = await libp2p.create({ addresses: { listen: ["/ip4/127.0.0.1/tcp/0"], }, modules: { transport: [TCP], streamMuxer: [Mplex], connEncryption: [NOISE], dht: KadDHT, }, config: { dht: { enabled: true, }, }, }); _localNode.connectionManager.on("peer:connect", (libp2pConnection) => { _log(`New connection ${_repr(libp2pConnection)}`); }); const _PROTOCOLS = ["/myproject/raw"]; await _localNode.handle(_PROTOCOLS, ({stream, connection, protocol}) => { _log(`Incoming stream ${_repr(stream)} for connection ${_repr(connection)} (protocol ${_repr(protocol)})`); _onStream$(connection, stream); }); await _localNode.start(); _log(`Libp2p node successully started with peer-id ${_localNode.peerId.toB58String()}, listening on:`); for (const _multiaddr of _localNode.multiaddrs) { _log(` - ${_multiaddr}`); } // Connect to a remote node. if (_args.connect) { const _peerInfo = libp2pGetPeer(_args.connect); if (_peerInfo.multiaddrs === undefined) { throw new Error(`Invalid peer address ${_repr(_args.connect)}`); } _localNode.peerStore.addressBook.set(_peerInfo.id, _peerInfo.multiaddrs); const _remoteConnection = await _localNode.dial(_peerInfo.id); _log(`Successully connected with ${_args.connect} (${_repr(_remoteConnection)})`); const _streamOut = await _remoteConnection.newStream(_PROTOCOLS); _log(`Stream successfully opened with ${_args.connect}: ${_repr(_streamOut)}`); await _onStream$(_remoteConnection, _streamOut.stream); } } async function _onStream$(connection, stream) { if (_args.send) { // Read the file, prepare chunks. const _bytes = fs.readFileSync(_args.send, {flag: "r"}); const _packetSize = _args.size || 100; const _packets = []; for (let _offset = 0; _offset < _bytes.length; _offset += _packetSize) { _packets.push(_bytes.slice(_offset, _offset + _packetSize)); if (_args.debug) { _log(" " + [ `- chunk#${_packets.length}:`, _repr(_packets[_packets.length - 1]), `(${_packets[_packets.length - 1].length} bytes)`, ].join(" ")); } } // Send the data. _log(`Sending '${_args.send}' to ${_repr(connection)}:`); const _res = await pipe( async function*() { while (_packets.length > 0) { const _packet = _packets.shift(); if (_args.debug) { _log(`Sending ${_repr(_packet)} (${_packet.length} bytes)`); } yield _packet; } }, stream, ); _log(`${_args.send} successfully sent to ${_repr(connection)} (${_bytes.length} bytes, res = ${_repr(_res)})`); } else { _log(`Receiving data from ${_repr(connection)}`); // Receive data. const _packets = []; const _res = await pipe( stream, async function(source) { for await (const _bl of source) { if (_args.debug) { _log(`Received from pipe: ${_repr(_bl)} (${_bl.length} bytes)`); } _packets.push(_bl.slice()); } }, ); // Final status. let _bytes = 0; for (const _packet of _packets) { _bytes += _packet.length; } _log(`Data successfully received from ${_repr(connection)} (${_packets.length} packets, ${_bytes} bytes, res = ${_repr(_res)})`); } } function _log(msg) { console.log(`${new Date().toISOString()} - ${msg}`); } function _repr(obj) { return util.inspect(obj, { depth: 1, compact: true, breakLength: Infinity, //maxArrayLength: Infinity, //maxStringLength: Infinity, }); } _main$();