#!/usr/bin/env node import * as fs from "fs"; import { pipe } from "it-pipe"; import minimist from "minimist"; import * as util from "util"; // Libp2p imports. import * as libp2p from "libp2p"; import { Noise } from "@chainsafe/libp2p-noise"; import { KadDHT } from "@libp2p/kad-dht"; import { Mplex } from "@libp2p/mplex"; import { peerIdFromString } from "@libp2p/peer-id"; import { TCP } from "@libp2p/tcp"; import { Multiaddr } from "@multiformats/multiaddr"; // Global variables. let _args = null; let _localNode = null; async function _main$() { // Parse arguments. _args = minimist(process.argv.slice(2)); // Start the libp2p node. _localNode = await libp2p.createLibp2p({ addresses: { listen: ["/ip4/127.0.0.1/tcp/0"], }, transports: [new TCP()], streamMuxers: [new Mplex()], connectionEncryption: [new Noise()], dht: new KadDHT(), }); _localNode.connectionManager.addEventListener("peer:connect", (event) => { _log(`New connection ${_repr(event)}`); }); const _PROTOCOLS = ["/myproject/raw"]; await _localNode.handle(_PROTOCOLS, ({stream, connection, protocol}) => { _log(`Incoming stream ${_repr(stream)} for connection ${_repr(connection)} (protocol ${_repr(protocol)})`); _onStream$(connection, stream); }); await _localNode.start(); _log(`Libp2p node successully started with peer-id ${_localNode.peerId}, listening on:`); for (const _multiaddr of _localNode.getMultiaddrs()) { _log(` - ${_multiaddr}`); } // Connect to a remote node. if (_args.connect) { const _addr = new Multiaddr(_args.connect); const _peerId = _addr.getPeerId(); if (! _peerId) { throw new Error(`Invalid peer address ${_repr(libp2pPeerAddress)}`); } const _peerInfo = { id: peerIdFromString(_peerId), multiaddrs: [_addr], protocols: [], }; _localNode.peerStore.addressBook.set(_peerInfo.id, [_addr]); const _remoteConnection = await _localNode.dial(_peerInfo.id); _log(`Successully connected with ${_args.connect} (${_repr(_remoteConnection)})`); _log(`Opening stream with ${_repr(_args.connect)} on ${_repr(_PROTOCOLS)}`); const _streamOut = await _remoteConnection.newStream(_PROTOCOLS); _log(`Stream successfully opened with ${_args.connect}: ${_repr(_streamOut)}`); await _onStream$(_remoteConnection, _streamOut); } } async function _onStream$(connection, stream) { if (_args.send) { // Read the file, prepare chunks. const _bytes = fs.readFileSync(_args.send, {flag: "r"}); const _packetSize = _args.size || 100; const _packets = []; for (let _offset = 0; _offset < _bytes.length; _offset += _packetSize) { _packets.push(_bytes.slice(_offset, _offset + _packetSize)); if (_args.debug) { _log(" " + [ `- chunk#${_packets.length}:`, _repr(_packets[_packets.length - 1]), `(${_packets[_packets.length - 1].length} bytes)`, ].join(" ")); } } // Send the data. _log(`Sending '${_args.send}' to ${_repr(connection)}:`); const _res = await pipe( async function*() { while (_packets.length > 0) { const _packet = _packets.shift(); if (_args.debug) { _log(`Sending ${_repr(_packet)} (${_packet.length} bytes)`); } yield _packet; } }, stream, ); _log(`${_args.send} successfully sent to ${_repr(connection)} (${_bytes.length} bytes, res = ${_repr(_res)})`); } else { _log(`Receiving data from ${_repr(connection)}`); // Receive data. const _packets = []; const _res = await pipe( stream, async function(source) { for await (const _ui8 of source) { if (_args.debug) { _log(`Received from pipe: ${_repr(Buffer.from(_ui8.subarray()))} (${_ui8.length} bytes)`); } _packets.push(Buffer.from(_ui8.subarray())); } }, ); // Final status. let _bytes = 0; for (const _packet of _packets) { _bytes += _packet.length; } _log(`Data successfully received from ${_repr(connection)} (${_packets.length} packets, ${_bytes} bytes, res = ${_repr(_res)})`); } } function _log(msg) { console.log(`${new Date().toISOString()} - ${msg}`); } function _repr(obj) { return util.inspect(obj, { depth: 1, compact: true, breakLength: Infinity, //maxArrayLength: Infinity, //maxStringLength: Infinity, }); } _main$();