Skip to content

Commit

Permalink
Fix too many open files error (#430)
Browse files Browse the repository at this point in the history
* Release resources when they are no longer needed

* Avoid too many connections are connecting to a peer at a time

* Update dependencies: Akka 1.3.10
  • Loading branch information
yongjiema authored and erikzhang committed Nov 3, 2018
1 parent 6ff2b9a commit c401c05
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 47 deletions.
23 changes: 21 additions & 2 deletions neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
protected readonly ConcurrentDictionary<IActorRef, IPEndPoint> ConnectedPeers = new ConcurrentDictionary<IActorRef, IPEndPoint>();
protected ImmutableHashSet<IPEndPoint> UnconnectedPeers = ImmutableHashSet<IPEndPoint>.Empty;
protected ImmutableHashSet<IPEndPoint> ConnectingPeers = ImmutableHashSet<IPEndPoint>.Empty;

public int ListenerPort { get; private set; }
protected abstract int ConnectedMax { get; }
protected abstract int UnconnectedMax { get; }
protected virtual int ConnectingMax => ConnectedMax - ConnectedPeers.Count;

static Peer()
{
Expand All @@ -62,7 +64,12 @@ protected void ConnectToPeer(IPEndPoint endPoint)
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
return;
if (ConnectedPeers.Values.Contains(endPoint)) return;
tcp_manager.Tell(new Tcp.Connect(endPoint));
ImmutableInterlocked.Update(ref ConnectingPeers, p =>
{
if (p.Count >= ConnectingMax || p.Contains(endPoint)) return p;
tcp_manager.Tell(new Tcp.Connect(endPoint));
return p.Add(endPoint);
});
}

private static bool IsIntranetAddress(IPAddress address)
Expand Down Expand Up @@ -100,7 +107,8 @@ protected override void OnReceive(object message)
case Tcp.Bound _:
tcp_listener = Sender;
break;
case Tcp.CommandFailed _:
case Tcp.CommandFailed commandFailed:
OnTcpCommandFailed(commandFailed.Cmd);
break;
case Terminated terminated:
OnTerminated(terminated.ActorRef);
Expand Down Expand Up @@ -139,6 +147,7 @@ private void OnStart(int port, int ws_port)

private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
{
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote));
ConnectedAddresses.TryGetValue(remote.Address, out int count);
if (count >= MaxConnectionsPerAddress)
{
Expand All @@ -154,6 +163,16 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
}
}

private void OnTcpCommandFailed(Tcp.Command cmd)
{
switch (cmd)
{
case Tcp.Connect connect:
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(((IPEndPoint)connect.RemoteAddress).Unmap()));
break;
}
}

private void OnTerminated(IActorRef actorRef)
{
if (ConnectedPeers.TryRemove(actorRef, out IPEndPoint endPoint))
Expand Down
96 changes: 52 additions & 44 deletions neo/Network/UPnP.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,53 @@ public class UPnP

public static bool Discover()
{
Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
s.ReceiveTimeout = (int)TimeOut.TotalMilliseconds;
s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, 1);
string req = "M-SEARCH * HTTP/1.1\r\n" +
"HOST: 239.255.255.250:1900\r\n" +
"ST:upnp:rootdevice\r\n" +
"MAN:\"ssdp:discover\"\r\n" +
"MX:3\r\n\r\n";
byte[] data = Encoding.ASCII.GetBytes(req);
IPEndPoint ipe = new IPEndPoint(IPAddress.Broadcast, 1900);
using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp))
{
s.ReceiveTimeout = (int)TimeOut.TotalMilliseconds;
s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, 1);
string req = "M-SEARCH * HTTP/1.1\r\n" +
"HOST: 239.255.255.250:1900\r\n" +
"ST:upnp:rootdevice\r\n" +
"MAN:\"ssdp:discover\"\r\n" +
"MX:3\r\n\r\n";
byte[] data = Encoding.ASCII.GetBytes(req);
IPEndPoint ipe = new IPEndPoint(IPAddress.Broadcast, 1900);

DateTime start = DateTime.Now;
DateTime start = DateTime.Now;

s.SendTo(data, ipe);
s.SendTo(data, ipe);
s.SendTo(data, ipe);
s.SendTo(data, ipe);
s.SendTo(data, ipe);
s.SendTo(data, ipe);

byte[] buffer = new byte[0x1000];

do
{
int length;
try
{
length = s.Receive(buffer);
byte[] buffer = new byte[0x1000];

string resp = Encoding.ASCII.GetString(buffer, 0, length).ToLower();
if (resp.Contains("upnp:rootdevice"))
do
{
int length;
try
{
resp = resp.Substring(resp.ToLower().IndexOf("location:") + 9);
resp = resp.Substring(0, resp.IndexOf("\r")).Trim();
if (!string.IsNullOrEmpty(_serviceUrl = GetServiceUrl(resp)))
length = s.Receive(buffer);

string resp = Encoding.ASCII.GetString(buffer, 0, length).ToLower();
if (resp.Contains("upnp:rootdevice"))
{
return true;
resp = resp.Substring(resp.ToLower().IndexOf("location:") + 9);
resp = resp.Substring(0, resp.IndexOf("\r")).Trim();
if (!string.IsNullOrEmpty(_serviceUrl = GetServiceUrl(resp)))
{
return true;
}
}
}
catch
{
continue;
}
}
catch
{
continue;
}
}
while (DateTime.Now - start < TimeOut);
while (DateTime.Now - start < TimeOut);

return false;
return false;
}
}

private static string GetServiceUrl(string resp)
Expand All @@ -69,8 +71,10 @@ private static string GetServiceUrl(string resp)
{
XmlDocument desc = new XmlDocument();
HttpWebRequest request = WebRequest.CreateHttp(resp);
WebResponse response = request.GetResponse();
desc.Load(response.GetResponseStream());
using (WebResponse response = request.GetResponse())
{
desc.Load(response.GetResponseStream());
}
XmlNamespaceManager nsMgr = new XmlNamespaceManager(desc.NameTable);
nsMgr.AddNamespace("tns", "urn:schemas-upnp-org:device-1-0");
XmlNode typen = desc.SelectSingleNode("//tns:device/tns:deviceType/text()", nsMgr);
Expand Down Expand Up @@ -141,13 +145,17 @@ private static XmlDocument SOAPRequest(string url, string soap, string function)
byte[] b = Encoding.UTF8.GetBytes(req);
r.Headers["SOAPACTION"] = "\"urn:schemas-upnp-org:service:WANIPConnection:1#" + function + "\"";
r.ContentType = "text/xml; charset=\"utf-8\"";
Stream reqs = r.GetRequestStream();
reqs.Write(b, 0, b.Length);
XmlDocument resp = new XmlDocument();
WebResponse wres = r.GetResponse();
Stream ress = wres.GetResponseStream();
resp.Load(ress);
return resp;
using (Stream reqs = r.GetRequestStream())
{
reqs.Write(b, 0, b.Length);
XmlDocument resp = new XmlDocument();
WebResponse wres = r.GetResponse();
using (Stream ress = wres.GetResponseStream())
{
resp.Load(ress);
return resp;
}
}
}
}
}
2 changes: 1 addition & 1 deletion neo/neo.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka" Version="1.3.9" />
<PackageReference Include="Akka" Version="1.3.10" />
<PackageReference Include="Microsoft.AspNetCore.ResponseCompression" Version="2.1.1" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel" Version="2.1.3" />
<PackageReference Include="Microsoft.AspNetCore.Server.Kestrel.Https" Version="2.1.3" />
Expand Down

0 comments on commit c401c05

Please sign in to comment.