diff --git a/Discreet/DB/BlockBuffer.cs b/Discreet/DB/BlockBuffer.cs index 89ac7b8..1f8f959 100644 --- a/Discreet/DB/BlockBuffer.cs +++ b/Discreet/DB/BlockBuffer.cs @@ -269,7 +269,15 @@ public async Task ForceFlush() public async Task Start() { _pIndex = DataView.GetView().GetOutputIndex(); - DateTime lastFlush = DateTime.MinValue; + + _ = Task.Factory.StartNew(async () => + { + var timer = new PeriodicTimer(_flushInterval); + while (await timer.WaitForNextTickAsync()) + { + await _buffer.Writer.WriteAsync(_signaler); + } + }); await foreach(var block in _buffer.Reader.ReadAllAsync()) { @@ -304,20 +312,6 @@ public async Task Start() } UpdateBuffers(block); - - // check received - if (DateTime.Now.Subtract(lastFlush) > _flushInterval) - { - // flush - Flush(buffer); - - lock (buffer) - { - buffer.Clear(); - } - - lastFlush = DateTime.Now; - } } } } diff --git a/Discreet/Network/Handler.cs b/Discreet/Network/Handler.cs index a8dff4b..14b2f23 100644 --- a/Discreet/Network/Handler.cs +++ b/Discreet/Network/Handler.cs @@ -124,7 +124,7 @@ public async Task NeededInventoryStart(CancellationToken token) } } - public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseconds = 0, Action callback = null) + public bool RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseconds = 0, Action callback = null) { bool success = NeededInventory.TryGetValue(req, out var reqset); if (!success || reqset == null) @@ -139,37 +139,65 @@ public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseco var timestamp = DateTime.UtcNow.Ticks; var durTicks = durMilliseconds * 10_000L; + bool needed = false; if (packet.GetType() == typeof(GetTransactionsPacket)) { var gettx = packet as GetTransactionsPacket; + var newGettxs = new List(); foreach (var tx in gettx.Transactions) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.Transaction, tx), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(tx); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } } + + gettx.Transactions = newGettxs.ToArray(); } else if (packet.GetType() == typeof(GetBlocksPacket)) { var gettx = packet as GetBlocksPacket; + var newGettxs = new List(); foreach (var block in gettx.Blocks) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.Block, block), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(block); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } } + + gettx.Blocks = newGettxs.ToArray(); } else if (packet.GetType() == typeof(GetHeadersPacket)) { var gettx = packet as GetHeadersPacket; + var newGettxs = new List(); if (gettx.Headers == null) { for (long i = gettx.StartingHeight; i < gettx.StartingHeight + gettx.Count; i++) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.BlockHeader, new Cipher.SHA256(i)), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(new Cipher.SHA256(i)); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } + } + + if (newGettxs.Count == gettx.Count) + { + newGettxs = null; } } else @@ -177,15 +205,27 @@ public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseco foreach (var header in gettx.Headers) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.BlockHeader, header), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(header); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } } } + + if (newGettxs != null) + { + gettx.Headers = newGettxs.ToArray(); + } } else { Daemon.Logger.Error($"Handler.RegisterNeeded: cannot accept packet of type {packet.GetType()}"); } + + return needed; } internal (bool, List) CheckFulfillment(IPacketBody packet, IPEndPoint resp) diff --git a/Discreet/Network/Peerbloom/Network.cs b/Discreet/Network/Peerbloom/Network.cs index 15a700c..994d5fa 100644 --- a/Discreet/Network/Peerbloom/Network.cs +++ b/Discreet/Network/Peerbloom/Network.cs @@ -749,22 +749,23 @@ public bool SendRequest(Connection conn, Core.Packet packet, long durationMillis Daemon.Logger.Info($"Network.SendRequest: Sending request {packet.Header.Command} to {conn.Receiver}", verbose: 2); // hacky; make specific functions for sending packets which call this instead (in the future) + bool success = false; if (packet.Header.Command == Core.PacketType.GETBLOCKS) { - handler.RegisterNeeded((Core.Packets.GetBlocksPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); + success = handler.RegisterNeeded((Core.Packets.GetBlocksPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); } else if (packet.Header.Command == Core.PacketType.GETTXS) { - handler.RegisterNeeded((Core.Packets.GetTransactionsPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); + success = handler.RegisterNeeded((Core.Packets.GetTransactionsPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); } else if (packet.Header.Command == Core.PacketType.GETHEADERS) { - handler.RegisterNeeded((Core.Packets.GetHeadersPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); + success = handler.RegisterNeeded((Core.Packets.GetHeadersPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); } - conn.Send(packet); + if (success) conn.Send(packet); - return true; + return success; } public bool Send(IPEndPoint endpoint, Core.Packet packet)