Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add lock for orphans #106

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions Discreet/Daemon/Daemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ public async Task TestnetMinter(TimeSpan interval, int n, int pid, ChannelReader
bool paused = true;
bool reloop = false;

HashSet<long> produced = new HashSet<long>();

// we start off paused, waiting for the first call to us
while (paused)
{
Expand Down Expand Up @@ -826,7 +828,7 @@ public async Task TestnetMinter(TimeSpan interval, int n, int pid, ChannelReader
if (numProduced % n == pid)
{
await MintLocker.WaitAsync();
await MintTestnetBlock();
await MintTestnetBlock(produced);
MintLocker.Release();
}

Expand Down Expand Up @@ -930,7 +932,7 @@ public async Task MintTestnet()
}
}

public async Task MintTestnetBlock()
public async Task MintTestnetBlock(HashSet<long> debugProduced = null)
{
try
{
Expand All @@ -940,6 +942,18 @@ public async Task MintTestnetBlock()
var txs = txpool.GetTransactionsForBlock();
var blk = Block.Build(txs, new StealthAddress(SQLiteWallet.Wallets["TESTNET_EMISSIONS"].Accounts[0].Address), sigKey);

if (debugProduced != null)
{
if (debugProduced.Contains(blk.Header.Height))
{
Logger.Critical($"Discreet.Daemon: produced a block at a previously seen height!");
}
else
{
debugProduced.Add(blk.Header.Height);
}
}

try
{
DB.ValidationCache vCache = new(blk);
Expand Down
103 changes: 68 additions & 35 deletions Discreet/Network/Handler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,14 @@ public bool IsParallel(Packet p)
case PacketType.SENDTX:
case PacketType.GETTXS:
case PacketType.TXS:
case PacketType.BLOCKS:
case PacketType.GETHEADERS:
case PacketType.HEADERS:
case PacketType.GETPOOL:
case PacketType.POOL:
return true;
case PacketType.SENDBLOCK:
case PacketType.SENDBLOCKS:
case PacketType.BLOCKS:
case PacketType.HEADERS:
case PacketType.ALERT:
case PacketType.NONE:
case PacketType.INVENTORY:
Expand Down Expand Up @@ -1185,18 +1185,26 @@ public async Task HandleSendBlock(SendBlockPacket p, Peerbloom.Connection conn,
{
if (!MessageCache.GetMessageCache().OrphanBlockParents.ContainsKey(p.Block.Header.PreviousBlock))
{
await MessageCache.GetMessageCache().OrphanLock.WaitAsync();

Daemon.Logger.Warn($"HandleSendBlock: orphan block ({p.Block.Header.BlockHash.ToHexShort()}, height {p.Block.Header.Height}) added; querying {conn.Receiver} for previous block", verbose: 3);
MessageCache.GetMessageCache().OrphanBlocks[p.Block.Header.PreviousBlock] = p.Block;
MessageCache.GetMessageCache().OrphanBlockParents[p.Block.Header.BlockHash] = p.Block.Header.PreviousBlock;
Peerbloom.Network.GetNetwork().SendRequest(conn, new Packet(PacketType.GETBLOCKS, new GetBlocksPacket { Blocks = new Cipher.SHA256[] { p.Block.Header.PreviousBlock } }), durationMilliseconds: 60000);

MessageCache.GetMessageCache().OrphanLock.Release();
return;
}
else
{
await MessageCache.GetMessageCache().OrphanLock.WaitAsync();

Daemon.Logger.Warn($"HandleSendBlock: orphan block ({p.Block.Header.BlockHash.ToHexShort()}, height {p.Block.Header.Height}) added", verbose: 3);
MessageCache.GetMessageCache().OrphanBlocks[p.Block.Header.PreviousBlock] = p.Block;
MessageCache.GetMessageCache().OrphanBlockParents[p.Block.Header.BlockHash] = p.Block.Header.PreviousBlock;
CheckRoot(p.Block, conn);

MessageCache.GetMessageCache().OrphanLock.Release();
return;
}
}
Expand Down Expand Up @@ -1349,17 +1357,28 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn)
{
Daemon.Logger.Warn($"HandleBlocks: orphan block ({p.Blocks[0].Header.BlockHash.ToHexShort()}, height {p.Blocks[0].Header.Height}) added; querying {conn.Receiver} for previous block", verbose: 3);

await MessageCache.GetMessageCache().OrphanLock.WaitAsync();

MessageCache.GetMessageCache().OrphanBlocks[p.Blocks[0].Header.PreviousBlock] = p.Blocks[0];
MessageCache.GetMessageCache().OrphanBlockParents[p.Blocks[0].Header.BlockHash] = p.Blocks[0].Header.PreviousBlock;
Peerbloom.Network.GetNetwork().SendRequest(conn, new Packet(PacketType.GETBLOCKS, new GetBlocksPacket { Blocks = new Cipher.SHA256[] { p.Blocks[0].Header.PreviousBlock } }), durationMilliseconds: 60000);

MessageCache.GetMessageCache().OrphanLock.Release();

return;
}
else
{
Daemon.Logger.Warn($"HandleBlocks: orphan block ({p.Blocks[0].Header.BlockHash.ToHexShort()}, height {p.Blocks[0].Header.Height}) added", verbose: 1);

await MessageCache.GetMessageCache().OrphanLock.WaitAsync();

MessageCache.GetMessageCache().OrphanBlocks[p.Blocks[0].Header.PreviousBlock] = p.Blocks[0];
MessageCache.GetMessageCache().OrphanBlockParents[p.Blocks[0].Header.BlockHash] = p.Blocks[0].Header.PreviousBlock;
CheckRoot(p.Blocks[0], conn);

MessageCache.GetMessageCache().OrphanLock.Release();

return;
}
}
Expand All @@ -1368,7 +1387,11 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn)
Daemon.Logger.Error($"HandleBlocks: Malformed or invalid block received from peer {conn.Receiver}: {err.Message} (bogus block for orphan requirement)", err);

/* for now assume invalid root always has invalid leaves */

await MessageCache.GetMessageCache().OrphanLock.WaitAsync();
TossOrphans(p.Blocks[0].Header.BlockHash);
MessageCache.GetMessageCache().OrphanLock.Release();

return;
}

Expand Down Expand Up @@ -1575,44 +1598,54 @@ public void CheckRoot(Block block, Peerbloom.Connection conn)
public async Task AcceptOrphans(Cipher.SHA256 bHash)
{
MessageCache mCache = MessageCache.GetMessageCache();
while (mCache.OrphanBlocks.ContainsKey(bHash))

await mCache.OrphanLock.WaitAsync();

try
{
mCache.OrphanBlocks.Remove(bHash, out var block);
DB.ValidationCache vCache = new DB.ValidationCache(block);
var err = vCache.Validate();
if (err is OrphanBlockException)
while (mCache.OrphanBlocks.ContainsKey(bHash))
{
// simply return
return;
}
if (err != null)
{
Daemon.Logger.Error($"AcceptOrphans: Malformed or invalid block in orphan branch {bHash.ToHexShort()} (height {block.Header.Height}): {err.Message}; tossing branch", err);
TossOrphans(bHash);
return;
}
mCache.OrphanBlocks.Remove(bHash, out var block);
DB.ValidationCache vCache = new DB.ValidationCache(block);
var err = vCache.Validate();
if (err is OrphanBlockException)
{
// simply return
return;
}
if (err != null)
{
Daemon.Logger.Error($"AcceptOrphans: Malformed or invalid block in orphan branch {bHash.ToHexShort()} (height {block.Header.Height}): {err.Message}; tossing branch", err);
TossOrphans(bHash);
return;
}

try
{
await vCache.Flush();
}
catch (Exception e)
{
Daemon.Logger.Error($"AcceptOrphans: an error was encountered while flushing validation cache for block at height {block.Header.Height}: {e.Message}", e);
}
try
{
await vCache.Flush();
}
catch (Exception e)
{
Daemon.Logger.Error($"AcceptOrphans: an error was encountered while flushing validation cache for block at height {block.Header.Height}: {e.Message}", e);
}

try
{
daemon.ProcessBlock(block);
}
catch (Exception e)
{
Daemon.Logger.Error($"AcceptOrphans: an error was encountered while processing block at height {block.Header.Height}: {e.Message}", e);
}
try
{
daemon.ProcessBlock(block);
}
catch (Exception e)
{
Daemon.Logger.Error($"AcceptOrphans: an error was encountered while processing block at height {block.Header.Height}: {e.Message}", e);
}

OnBlockSuccess?.Invoke(new BlockSuccessEventArgs { Block = block });
bHash = block.Header.BlockHash;
mCache.OrphanBlockParents.Remove(bHash, out _);
OnBlockSuccess?.Invoke(new BlockSuccessEventArgs { Block = block });
bHash = block.Header.BlockHash;
mCache.OrphanBlockParents.Remove(bHash, out _);
}
}
finally
{
mCache.OrphanLock.Release();
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions Discreet/Network/MessageCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Concurrent;
using Discreet.Coin.Models;
using Discreet.DB;
using System.Threading;

namespace Discreet.Network
{
Expand Down Expand Up @@ -39,6 +40,7 @@ public static MessageCache GetMessageCache()

public ConcurrentDictionary<Cipher.SHA256, Block> OrphanBlocks;
public ConcurrentDictionary<Cipher.SHA256, Cipher.SHA256> OrphanBlockParents = new(new Cipher.SHA256EqualityComparer());
public readonly SemaphoreSlim OrphanLock = new SemaphoreSlim(1, 1);

public MessageCache()
{
Expand Down
Loading