From 60b12340dc43d5da4f84439fa2986989327cf280 Mon Sep 17 00:00:00 2001 From: Rodrigo Appelt Date: Sat, 30 Dec 2023 20:09:47 -0300 Subject: [PATCH] failed attempt to implement quic --- Unichain.P2P/Extensions.cs | 47 ++++++ Unichain.P2P/Nodes/Node.cs | 218 +++++++++++++++++++++++++ Unichain.P2P/Nodes/QuicNode.cs | 108 +++++++++++++ Unichain.P2P/Nodes/TcpNode.cs | 86 ++++++++++ Unichain.P2P/Pool.cs | 42 +++-- Unichain.P2P/Program.cs | 31 ++-- Unichain.P2P/StreamExtensions.cs | 25 --- Unichain.P2P/TcpNode.cs | 269 ------------------------------- Unichain.P2P/Unichain.P2P.csproj | 3 +- Unichain.P2P/UnichainNode.cs | 1 + 10 files changed, 499 insertions(+), 331 deletions(-) create mode 100644 Unichain.P2P/Extensions.cs create mode 100644 Unichain.P2P/Nodes/Node.cs create mode 100644 Unichain.P2P/Nodes/QuicNode.cs create mode 100644 Unichain.P2P/Nodes/TcpNode.cs delete mode 100644 Unichain.P2P/StreamExtensions.cs delete mode 100644 Unichain.P2P/TcpNode.cs diff --git a/Unichain.P2P/Extensions.cs b/Unichain.P2P/Extensions.cs new file mode 100644 index 0000000..ce2bef5 --- /dev/null +++ b/Unichain.P2P/Extensions.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading.Tasks; + +namespace Unichain.P2P; +public static class Extensions { + public static Guid ReadGuid(this BinaryReader reader) { + Span bytes = stackalloc byte[16]; + int read = reader.Read(bytes); + if (read != 16) { + throw new EndOfStreamException("Could not read 16 bytes for Guid"); + } + return new Guid(bytes); + } + public static void Write(this BinaryWriter writer, Guid guid) { + Span bytes = stackalloc byte[16]; + if (!guid.TryWriteBytes(bytes)) { + throw new EndOfStreamException("Could not write 16 bytes for Guid"); + } + writer.Write(bytes); + } + + /// + /// Gets the normalized IP address of a node Address in comparison with a origin node. + /// + /// The address to compare with + /// The address to normalize + /// The normalized address + public static IPAddress Normalize(this Address origin, Address other) { + if (origin.PublicIp.Equals(other.PublicIp)) { + // same network + if (origin.PrivateIp.Equals(other.PrivateIp)) { + // same computer + return IPAddress.Loopback; + } else { + return other.PrivateIp; + } + } else { + // different networks + return other.PublicIp; + } + } +} diff --git a/Unichain.P2P/Nodes/Node.cs b/Unichain.P2P/Nodes/Node.cs new file mode 100644 index 0000000..74b192c --- /dev/null +++ b/Unichain.P2P/Nodes/Node.cs @@ -0,0 +1,218 @@ +using System.Net.Sockets; +using System.Net; +using System.Security.Cryptography; +using System.Text.Json; +using System.Text; +using Unichain.P2P.Packets; + +namespace Unichain.P2P.Nodes; + +/// +/// A generic node to implement P2P communication for the Unichain network. +/// +public abstract class Node +{ + + #region Variables + + /// + public Address Address => address; + + /// + public List
Peers => new(peers); + + + /// + /// The address that identifies this node. + /// + protected readonly Address address; + + /// + /// A list with all the peers that this node is connected/knows about + /// + protected List
peers = []; + + /// + /// The internal thread that will run the node. + /// + protected readonly Thread thread; + + /// + /// Source for the cancellation token + /// + protected readonly CancellationTokenSource cancellationTokenSource = new(); + + + /// + /// A list to record recently sent broadcast messages + /// + private readonly FixedList lastPropagations = new(10); + + /// + /// Class to log messages to the console + /// + private Logger logger; + + #endregion + + #region Concrete Methods + + protected Node(int port) + { + address = IpManager.GetCurrentAddress(Guid.NewGuid(), port); + thread = new(ThreadMain); + logger = new(nameof(Node) + " " + port.ToString()); + } + + /// + /// Starts the internal thread of this node. + /// + /// The bootnode to get peers. If this is null, it will be + /// a new bootnode. Effectively creating a new network + /// + public virtual void Start(Address? bootnode) + { + if (bootnode is not null) + { + FetchPeers(bootnode); + } + logger.Log($"Starting node with {peers.Count} peers..."); + thread.Start(); + } + + /// + /// Asks the node to stop acception connections and sending messages + /// + public void Stop() + { + cancellationTokenSource.Cancel(); + try + { + thread.Join(); + } + catch (ThreadStateException e) + { + logger.LogError($"Failed to stop node! {e.Message}"); + } + catch (ThreadInterruptedException e) + { + logger.LogError($"Failed to stop node! {e.Message}"); + } + } + + /// + /// Gets a list of peers from the bootnode and broadcasts our address to them. + /// + /// The address of the bootnode + private void FetchPeers(Address bootnode) + { + // get the list of knowns peers from the bootnode + Request req = new RequestBuilder() + .WithMethod(RequestMethod.GET) + .WithRoute(Route.Peers) + .WithSender(address) + .Build(); + SendRequest(req, bootnode); + Response resp = ReadResponse(bootnode); + + if (resp.StatusCode != StatusCode.OK) + { + logger.LogError($"Failed to connect to the bootnode! Response: ${resp.StatusCode}"); + return; + } + + Encoding encoding = Encoding.GetEncoding(resp.Content.Headers["encoding"]); + string json = encoding.GetString(resp.Content.Payload); + var addresses = JsonSerializer.Deserialize>(json); + if (addresses is null) + { + logger.LogError($"Failed to deserialize peers!"); + return; + } + logger.Log($"Got {addresses.Count} peers from bootnode"); + peers = addresses; + + // send our address as a broadcast + byte[] payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(address)); + Content ctn = new ContentBuilder() + .WithHeader("encoding", Encoding.UTF8.WebName) + .WithPayload(payload) + .Build(); + + logger.Log($"Sending our address to the bootnode..."); + req = new RequestBuilder() + .WithMethod(RequestMethod.POST) + .WithRoute(Route.Peers_Join) + .WithSender(address) + .WithContent(ctn) + .WithBroadcast() + .Build(); + + SendRequest(req, bootnode); + lastPropagations.Add(Convert.ToHexString(req.GetHash())); + } + + /// + /// Spreads a broadcast across the network + /// + /// The request that was sent to this machine + protected void Broadcast(Request req) + { + string hash = Convert.ToHexString(req.GetHash()); + if (lastPropagations.Contains(hash)) + { + logger.Log($"I have already propagated {hash}!"); + return; + } + lastPropagations.Add(hash); + + Parallel.ForEach(peers, peer => + { + logger.Log($"Broadcasting to peer {peer}..."); + SendRequest(req, peer); + }); + } + + #endregion + + #region Abstract Methods + + /// + /// Reads a request from a + /// + /// The client that sent the request + /// The request object + protected abstract Request ReadRequest(Address address); + + /// + /// Reads a response sent from a + /// + /// The client that received the Request and sent the Response + /// + protected abstract Response ReadResponse(Address address); + + /// + /// Sends a request to a + /// + /// + /// + protected abstract void SendRequest(Request request, Address address); + + /// + /// Sends a response to a + /// + /// The response that will be sent + /// The client that made the request and will receive the response + protected abstract void SendResponse(Response response, Address address); + + /// + /// Performs the logic for a request. This is run in the internal thread of the node. + /// + /// The Request that was sent + /// The response object + protected abstract Response Process(Request request); + + protected abstract void ThreadMain(); + + #endregion +} diff --git a/Unichain.P2P/Nodes/QuicNode.cs b/Unichain.P2P/Nodes/QuicNode.cs new file mode 100644 index 0000000..e4b2a0c --- /dev/null +++ b/Unichain.P2P/Nodes/QuicNode.cs @@ -0,0 +1,108 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Quic; +using System.Net.Sockets; +using System.Runtime.Versioning; +using Unichain.P2P.Packets; + +namespace Unichain.P2P.Nodes; + +/// +/// A node that used the QUIC protocol to communicate with other nodes +/// +[SupportedOSPlatform("windows")] +[SupportedOSPlatform("linux")] +[SupportedOSPlatform("macOS")] +public class QuicNode : Node { + + #region Variables + + /// + /// The logger that manages the printing of messages to the console + /// + private readonly Logger logger; + + /// + /// The listener in the QUIC protocol + /// + private readonly QuicListener quicListener; + + #endregion + + /// + /// Initialized a new instance of the class + /// + /// The port that will be used + /// Thrown when the platform doesn't support QUIC + public QuicNode(int port) : base(port) { + if(!QuicListener.IsSupported || !QuicConnection.IsSupported) { + throw new NotSupportedException("QUIC is not supported on this platform. If running on linux, check if" + + "libmsquic is installed and check if TLS 1.3 is supported"); + } + + logger = new Logger(nameof(QuicNode) + " " + port.ToString()); + + } + + public override void Start(Address? bootnode) { + quicListener = await QuicListener.ListenAsync(new QuicListenerOptions() { + ListenEndPoint = new IPEndPoint(IPAddress.Any, address.Port) + }, cancellationTokenSource.Token); + base.Start(bootnode); + } + + + protected override Response Process(Request request) { + throw new NotImplementedException(); + } + + protected override Request ReadRequest(Address address) { + throw new NotImplementedException(); + } + + protected override Response ReadResponse(Address address) { + throw new NotImplementedException(); + } + + protected override void SendRequest(Request request, Address address) { + throw new NotImplementedException(); + } + + protected override void SendResponse(Response response, Address address) { + throw new NotImplementedException(); + } + + protected override void ThreadMain() => ThreadMainAsync().Wait(); + + private async Task ThreadMainAsync() { + logger.Log($"Listening..."); + + // the listen loop + while (!cancellationTokenSource.IsCancellationRequested) { + + + + NetworkStream inStream = incoming.GetStream(); + + // Read the request + Request request = Request.Read(inStream); + + // Process the request + Response response = Process(request); + + // Send the response or broadcast + if (!request.IsBroadcast) { + response.Write(inStream); + } else { + Broadcast(request); + } + + // Close the connection + logger.Log($"Closed connection with {((IPEndPoint)incoming.Client.RemoteEndPoint!).Address}"); + incoming.Close(); + } + + } +} diff --git a/Unichain.P2P/Nodes/TcpNode.cs b/Unichain.P2P/Nodes/TcpNode.cs new file mode 100644 index 0000000..8dc84d5 --- /dev/null +++ b/Unichain.P2P/Nodes/TcpNode.cs @@ -0,0 +1,86 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Text.Json; +using Unichain.P2P.Packets; + +namespace Unichain.P2P.Nodes; +public abstract class TcpNode : Node +{ + + #region Variables + + /// + /// Listener to receive messages from other nodes + /// + private readonly TcpListener tcpListener; + + /// + /// Logger to log messages to the console + /// + private readonly Logger logger; + + #endregion + + /// + /// Initializes variables for the + /// + /// The port that this node will listen + protected TcpNode(int port) : base(port) + { + tcpListener = new(new IPEndPoint(IPAddress.Any, port)); + logger = new Logger(nameof(TcpNode) + " " + port.ToString()); + } + + + #region Protected Methods + + + protected static Request ReadRequest(TcpClient client) => Request.Read(client.GetStream()); + + protected static void SendRequest(Request request, TcpClient client) => request.Write(client.GetStream()); + + protected static void SendResponse(Response response, TcpClient client) => response.Write(client.GetStream()); + + protected static Response ReadResponse(TcpClient client) => Response.Read(client.GetStream()); + + #endregion + + #region Private Methods + + protected override void ThreadMain() + { + tcpListener.Start(); + logger.Log($"Listening..."); + + // the listen loop + while (!cancellationTokenSource.IsCancellationRequested) + { + TcpClient incoming = tcpListener.AcceptTcpClient(); + NetworkStream inStream = incoming.GetStream(); + + // Read the request + Request request = Request.Read(inStream); + + // Process the request + Response response = Process(request); + + // Send the response or broadcast + if (!request.IsBroadcast) + { + response.Write(inStream); + } + else + { + Broadcast(request); + } + + // Close the connection + logger.Log($"Closed connection with {((IPEndPoint)incoming.Client.RemoteEndPoint!).Address}"); + incoming.Close(); + } + } + + #endregion + +} diff --git a/Unichain.P2P/Pool.cs b/Unichain.P2P/Pool.cs index 3d5a08a..922ef72 100644 --- a/Unichain.P2P/Pool.cs +++ b/Unichain.P2P/Pool.cs @@ -1,29 +1,27 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Net.Sockets; -using System.Text; -using System.Threading.Tasks; +namespace Unichain.P2P; -namespace Unichain.P2P { - internal class Pool(Func factory, int maxPoolSize) { - private readonly List pool; +/// +/// A simple object pool. +/// +/// The type that this pool will hold +/// A function to generate new objects +/// The maximum amount of objects this pool can hold +internal class Pool(Func factory, int maxPoolSize) { + private readonly List pool; - public T Get() { - if (pool.Count == 0) { - return factory(); - } - - T item = pool[0]; - pool.RemoveAt(0); - return item; + public T Get() { + if (pool.Count == 0) { + return factory(); } - public void Return(T item) { - if (pool.Count < maxPoolSize) { - pool.Add(item); - } + T item = pool[0]; + pool.RemoveAt(0); + return item; + } + + public void Return(T item) { + if (pool.Count < maxPoolSize) { + pool.Add(item); } - } } diff --git a/Unichain.P2P/Program.cs b/Unichain.P2P/Program.cs index 171905f..9267bef 100644 --- a/Unichain.P2P/Program.cs +++ b/Unichain.P2P/Program.cs @@ -1,20 +1,23 @@ -using System.Security.Cryptography; +using System.Net; +using System.Net.Quic; +using System.Security.Cryptography; using System.Text; using Unichain.P2P; -List nodes = []; -int bootnodePort = 1234; -const int nodeCount = 1; -var bootnode = new UnichainNode(bootnodePort); -for(int i= 1; i <= nodeCount; i++) { - var node = new UnichainNode(bootnodePort + i); - nodes.Add(node); -} +//List nodes = []; -bootnode.Start(null); +//int bootnodePort = 1234; +//const int nodeCount = 1; +//var bootnode = new UnichainNode(bootnodePort); +//for(int i= 1; i <= nodeCount; i++) { +// var node = new UnichainNode(bootnodePort + i); +// nodes.Add(node); +//} -Parallel.ForEach(nodes, node => { - node.Start(bootnode.Address); -}); -await bootnode.Stop(); \ No newline at end of file +//bootnode.Start(null); + +//Parallel.ForEach(nodes, node => { +// node.Start(bootnode.Address); +//}); +//bootnode.Stop(); \ No newline at end of file diff --git a/Unichain.P2P/StreamExtensions.cs b/Unichain.P2P/StreamExtensions.cs deleted file mode 100644 index 4bb8ca3..0000000 --- a/Unichain.P2P/StreamExtensions.cs +++ /dev/null @@ -1,25 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Unichain.P2P { - public static class StreamExtensions { - public static Guid ReadGuid(this BinaryReader reader) { - Span bytes = stackalloc byte[16]; - int read = reader.Read(bytes); - if (read != 16) { - throw new EndOfStreamException("Could not read 16 bytes for Guid"); - } - return new Guid(bytes); - } - public static void Write(this BinaryWriter writer, Guid guid) { - Span bytes = stackalloc byte[16]; - if (!guid.TryWriteBytes(bytes)) { - throw new EndOfStreamException("Could not write 16 bytes for Guid"); - } - writer.Write(bytes); - } - } -} diff --git a/Unichain.P2P/TcpNode.cs b/Unichain.P2P/TcpNode.cs deleted file mode 100644 index 1d099f6..0000000 --- a/Unichain.P2P/TcpNode.cs +++ /dev/null @@ -1,269 +0,0 @@ -using System.Net; -using System.Net.Sockets; -using System.Text; -using System.Text.Json; -using Unichain.P2P.Packets; - -namespace Unichain.P2P; -public abstract class TcpNode { - - #region Variables - - /// - /// The address that identifies this node. - /// - protected readonly Address address; - - /// - public Address Address => address; - - /// - /// A list with all the peers that this node is connected/knows about - /// - protected List
peers = []; - - /// - public List
Peers => new(peers); - - /// - /// Listener to receive messages from other nodes - /// - private readonly TcpListener tcpListener; - - /// - /// Logger to log messages to the console - /// - private readonly Logger logger; - - /// - /// The internal thread that will run the node. - /// - private readonly Thread thread; - - /// - /// Source for the cancellation token - /// - private readonly CancellationTokenSource cancellationTokenSource = new(); - - /// - /// A list to record recently sent broadcast messages - /// - private readonly FixedList lastPropagations = new(10); - - #endregion - - /// - /// Initializes variables for the - /// - /// The port that this node will listen - protected TcpNode(int port) { - address = IpManager.GetCurrentAddress(Guid.NewGuid(), port); - - tcpListener = new(new IPEndPoint(IPAddress.Any, port)); - logger = new Logger(nameof(TcpNode) + " " + port.ToString()); - thread = new(ThreadMain); - } - - - #region Public Methods - - /// - /// Starts the internal thread of this node. - /// - /// The bootnode to get peers. If this is null, it will be - /// a new bootnode. Effectively creating a new network - /// - public void Start(Address? bootnode) { - if (bootnode is not null) { - FetchPeers(bootnode); - } - logger.Log($"Starting node with {peers.Count} peers..."); - thread.Start(); - } - - /// - /// Asks the node to stop acception connections and sending messages - /// - /// - public async Task Stop() { - cancellationTokenSource.Cancel(); - try { - await Task.Run(thread.Join); - }catch(ThreadStateException e) { - logger.LogError($"Failed to stop node! {e.Message}"); - }catch(ThreadInterruptedException e) { - logger.LogError($"Failed to stop node! {e.Message}"); - } - } - - #endregion - - #region Protected Methods - - /// - /// Reads a request from a - /// - /// The client that sent the request - /// The request object - protected static Request ReadRequest(TcpClient client) => Request.Read(client.GetStream()); - - /// - /// Sends a request to a - /// - /// - /// - protected static void SendRequest(Request request, TcpClient client) => request.Write(client.GetStream()); - - /// - /// Performs the logic for a request. This is run in the internal thread of the node. - /// - /// The Request that was sent - /// The response object - protected abstract Response Process(Request request); - - /// - /// Sends a response to a - /// - /// The response that will be sent - /// The client that made the request and will receive the response - protected static void SendResponse(Response response, TcpClient client) => response.Write(client.GetStream()); - - /// - /// Reads a response sent from a - /// - /// The client that received the Request and sent the Response - /// - protected static Response ReadResponse(TcpClient client) => Response.Read(client.GetStream()); - - #endregion - - #region Private Methods - - private void ThreadMain() { - tcpListener.Start(); - logger.Log($"Listening..."); - - // the listen loop - while (!cancellationTokenSource.IsCancellationRequested) { - TcpClient incoming = tcpListener.AcceptTcpClient(); - NetworkStream inStream = incoming.GetStream(); - - // Read the request - Request request = Request.Read(inStream); - - // Process the request - Response response = Process(request); - - // Send the response or broadcast - if (!request.IsBroadcast) { - response.Write(inStream); - } else { - Broadcast(request); - } - - // Close the connection - logger.Log($"Closed connection with {((IPEndPoint)incoming.Client.RemoteEndPoint!).Address}"); - incoming.Close(); - } - } - - /// - /// Gets the peers from the bootnode - /// - /// The address of the bootnode - /// - private void FetchPeers(Address bootnode) { - // get the list of knowns peers from the bootnode - IPAddress ipAddr = GetNormalizedIp(bootnode); - - using (TcpClient tcpClient = new(new IPEndPoint(ipAddr, bootnode.Port))) { - Request req = new RequestBuilder() - .WithMethod(RequestMethod.GET) - .WithRoute(Route.Peers) - .WithSender(address) - .Build(); - if (!tcpClient.Connected) { - tcpClient.Connect(ipAddr, bootnode.Port); - } - - SendRequest(req, tcpClient); - Response resp = ReadResponse(tcpClient); - - if (resp.StatusCode != StatusCode.OK) { - logger.LogError($"Failed to connect to the bootnode! Response: ${resp.StatusCode}"); - return; - } - - Encoding encoding = Encoding.GetEncoding(resp.Content.Headers["encoding"]); - string json = encoding.GetString(resp.Content.Payload); - var addresses = JsonSerializer.Deserialize>(json); - if (addresses is null) { - logger.LogError($"Failed to deserialize peers!"); - return; - } - logger.Log($"Got {addresses.Count} peers from bootnode"); - peers = addresses; - } - - // now we send our address to the bootnode - using (TcpClient tcpClient = new(new IPEndPoint(ipAddr, bootnode.Port))) { - if (!tcpClient.Connected) { - tcpClient.Connect(ipAddr, bootnode.Port); - } - byte[] payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(address)); - Content ctn = new ContentBuilder() - .WithHeader("encoding", Encoding.UTF8.WebName) - .WithPayload(payload) - .Build(); - - logger.Log($"Sending our address to the bootnode..."); - Request req = new RequestBuilder() - .WithMethod(RequestMethod.POST) - .WithRoute(Route.Peers_Join) - .WithSender(address) - .WithContent(ctn) - .WithBroadcast() - .Build(); - - SendRequest(req, tcpClient); - lastPropagations.Add(Convert.ToHexString(req.GetHash())); - } - } - - /// - /// Spreads a broadcast across the network - /// - /// The request that was sent to this machine - private void Broadcast(Request req) { - string hash = Convert.ToHexString(req.GetHash()); - if (lastPropagations.Contains(hash)) { - logger.Log($"I have already propagated {hash}!"); - return; - } - lastPropagations.Add(hash); - - Parallel.ForEach(peers, peer => { - IPAddress ipAddr = GetNormalizedIp(peer); - using TcpClient tcpClient = new(new IPEndPoint(ipAddr, peer.Port)); - logger.Log($"Broadcasting to peer {peer}..."); - SendRequest(req, tcpClient); - }); - } - - private IPAddress GetNormalizedIp(Address nodeAddress) { - if (address.PublicIp.Equals(nodeAddress.PublicIp)) { - // same network - if (address.PrivateIp.Equals(nodeAddress.PrivateIp)) { - // same computer - return IPAddress.Loopback; - } else { - return nodeAddress.PrivateIp; - } - } else { - // different networks - return nodeAddress.PublicIp; - } - } - #endregion - -} diff --git a/Unichain.P2P/Unichain.P2P.csproj b/Unichain.P2P/Unichain.P2P.csproj index 91b464a..65fde15 100644 --- a/Unichain.P2P/Unichain.P2P.csproj +++ b/Unichain.P2P/Unichain.P2P.csproj @@ -1,10 +1,11 @@ - + Exe net8.0 enable enable + true diff --git a/Unichain.P2P/UnichainNode.cs b/Unichain.P2P/UnichainNode.cs index 4376791..053500f 100644 --- a/Unichain.P2P/UnichainNode.cs +++ b/Unichain.P2P/UnichainNode.cs @@ -2,6 +2,7 @@ using System.Net.Sockets; using System.Text; using System.Text.Json; +using Unichain.P2P.Nodes; using Unichain.P2P.Packets; namespace Unichain.P2P;