Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs in BlockBuffer #99

Merged
merged 1 commit into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 263 additions & 6 deletions Discreet/DB/BlockBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
using Discreet.Coin.Comparers;
using Discreet.Cipher;
using Discreet.Coin.Comparers;
using Discreet.Coin.Models;
using Discreet.Common;
using Discreet.Common.Serialize;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Discreet.DB
{
/// <summary>
/// Provides a multiple-writer, single-reader instance for writing blocks to the blockchain. Can be configured to write periodically or on each new block.
/// Maintains an internal state for the ValidationCache for validating block data prior to writing to the database.
/// </summary>
public class BlockBuffer
{
Expand All @@ -37,18 +41,199 @@ public static BlockBuffer Instance
private static bool _flushEveryBlock = false;
public static bool FlushEveryBlock { set { _flushEveryBlock = value; } }

private List<Block> buffer;
private HashSet<Key> spentKeys = new HashSet<Key>();
private ConcurrentDictionary<long, Block> blockCache = new ConcurrentDictionary<long, Block>();
private ConcurrentDictionary<uint, TXOutput> outputCache = new ConcurrentDictionary<uint, TXOutput>();
private ConcurrentDictionary<SHA256, FullTransaction> transactionCache = new ConcurrentDictionary<SHA256, FullTransaction>(new SHA256EqualityComparer());
private ConcurrentDictionary<TTXInput, TTXOutput> inputCache = new ConcurrentDictionary<TTXInput, TTXOutput>(new TTXInputEqualityComparer());
private uint _pIndex;
private readonly object _pLock = new object();

private ReaderWriterLockSlim _readWriteLock = new ReaderWriterLockSlim();

/// <summary>
/// Tries to get a block header from either the DataView or the BlockBuffer's buffer.
/// </summary>
/// <param name="hash"></param>
/// <param name="header"></param>
/// <returns></returns>
public bool TryGetBlockHeader(SHA256 hash, out BlockHeader header)
{
var err = DataView.GetView().TryGetBlockHeader(hash, out var rv);
if (err)
{
header = rv;
return true;
}
else
{
// try getting it from the cache
Block blk = null;
lock (buffer)
{
blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault();
}

if (blk == null)
{
header = null;
return false;
}
else
{
header = blk.Header;
return true;
}
}
}

public uint GetOutputIndex()
{
lock (_pLock)
{
return Math.Max(_pIndex, DataView.GetView().GetOutputIndex());
}
}

public BlockHeader GetBlockHeader(SHA256 hash)
{
try
{
return DataView.GetView().GetBlockHeader(hash);
}
catch (Exception e)
{
try
{
Block blk = null;
lock (buffer)
{
blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault();
}

if (blk == null) throw new Exception();
return blk.Header;
}
catch
{
throw e;
}
}
}

public bool BlockExists(SHA256 hash)
{
var succ = DataView.GetView().BlockExists(hash);

if (succ)
{
return true;
}
else
{
Block blk = null;
lock (buffer)
{
blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault();
}

return blk != null;
}
}

public bool ContainsTransaction(SHA256 hash)
{
var succ = DataView.GetView().ContainsTransaction(hash);
if (succ)
{
return true;
}
else
{
return transactionCache.ContainsKey(hash);
}
}

public TTXOutput GetPubOutput(TTXInput tin)
{
try
{
return DataView.GetView().GetPubOutput(tin);
}
catch (Exception e)
{
try
{
return inputCache[tin];
}
catch
{
throw e;
}
}
}

public bool CheckSpentKey(Key k)
{
lock (spentKeys)
{
return DataView.GetView().CheckSpentKey(k) && !spentKeys.Contains(k);
}
}

public TXOutput GetOutput(uint idx)
{
try
{
return DataView.GetView().GetOutput(idx);
}
catch (Exception e)
{
try
{
return outputCache[idx];
}
catch
{
throw e;
}
}
}

public TXOutput[] GetMixins(uint[] idxs)
{
TXOutput[] rv = new TXOutput[idxs.Length];
for (int i = 0; i < idxs.Length; i++)
{
rv[i] = GetOutput(idxs[i]);
}

return rv;
}

public BlockBuffer()
{
_buffer = Channel.CreateUnbounded<Block>();
}

public long GetChainHeight()
{
lock (buffer)
{
if (buffer.Count == 0) return DataView.GetView().GetChainHeight();
return Math.Max(DataView.GetView().GetChainHeight(), buffer.Select(x => x.Header.Height).Max());
}
}

/// <summary>
/// Starts the block buffer's flusher.
/// </summary>
/// <returns></returns>
public async Task Start()
{
List<Block> buf = new List<Block>();
_pIndex = DataView.GetView().GetOutputIndex();
buffer = new List<Block>();
DateTime lastFlush = DateTime.MinValue;

await foreach(var block in _buffer.Reader.ReadAllAsync())
Expand All @@ -59,21 +244,75 @@ public async Task Start()
}
else
{
buf.Add(block);
lock (buffer)
{
buffer.Add(block);
}

UpdateBuffers(block);

// check received
if (DateTime.Now.Subtract(lastFlush) > _flushInterval)
{
// flush
Flush(buf);
buf.Clear();
Flush(buffer);

lock (buffer)
{
buffer.Clear();
}

lastFlush = DateTime.Now;
}
}
}
}

public void Flush(List<Block> blocks)
private void UpdateBuffers(Block block)
{
if (blockCache.ContainsKey(block.Header.Height)) return;

blockCache[block.Header.Height] = block;

foreach (var tx in block.Transactions)
{
transactionCache[tx.TxID] = tx;

lock (spentKeys)
{
for (int i = 0; i < tx.NumPInputs; i++)
{
spentKeys.Add(tx.PInputs[i].KeyImage);
}
}

for (int i = 0; i < tx.NumTInputs; i++)
{
if (inputCache.ContainsKey(tx.TInputs[i]))
{
inputCache.Remove(tx.TInputs[i], out _);
}
}

for (int i = 0; i < tx.NumTOutputs; i++)
{
inputCache[new TTXInput { Offset = (byte)i, TxSrc = tx.TxID }] = tx.TOutputs[i];
}

lock (_pLock)
{
for (int i = 0; i < tx.NumPOutputs; i++)
{
outputCache[++_pIndex] = tx.POutputs[i];
}
}
}
}

private void Flush(List<Block> _blocks)
{
// sort blocks by height
var blocks = _blocks.OrderBy(x => x.Header.Height).ToList();
List<UpdateEntry> updates = new List<UpdateEntry>();

// get previous indexers
Expand All @@ -85,6 +324,8 @@ public void Flush(List<Block> blocks)

foreach (var block in blocks)
{
if (DataView.GetView().BlockExists(block.Header.BlockHash)) continue;

// add block info
updates.Add(new UpdateEntry { key = block.Header.BlockHash.Bytes, value = Serialization.Int64(block.Header.Height), rule = UpdateRule.ADD, type = UpdateType.BLOCKHEIGHT });
updates.Add(new UpdateEntry { key = Serialization.Int64(block.Header.Height), value = block.Serialize(), rule = UpdateRule.ADD, type = UpdateType.BLOCK });
Expand Down Expand Up @@ -144,6 +385,22 @@ public void Flush(List<Block> blocks)

// perform flush
DataView.GetView().Flush(updates);

lock (_pLock)
{
_pIndex = DataView.GetView().GetOutputIndex();
}

// dump data
lock (spentKeys)
{
spentKeys.Clear();
}

blockCache.Clear();
inputCache.Clear();
transactionCache.Clear();
outputCache.Clear();
}
}
}
1 change: 1 addition & 0 deletions Discreet/DB/DataView.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public bool TryGetBlockHeader(Cipher.SHA256 blockHash, out BlockHeader header)
}
catch (Exception ex)
{
// Feb 24 2024 11:27 PM: why do we log on attempting to get a block header?
Daemon.Logger.Error(ex.Message, ex);
header = null;
return false;
Expand Down
Loading
Loading