diff --git a/Discreet/DB/BlockBuffer.cs b/Discreet/DB/BlockBuffer.cs index d7d5b45..3f7394c 100644 --- a/Discreet/DB/BlockBuffer.cs +++ b/Discreet/DB/BlockBuffer.cs @@ -1,11 +1,14 @@ -using Discreet.Coin.Comparers; +using Discreet.Cipher; +using Discreet.Coin.Comparers; using Discreet.Coin.Models; using Discreet.Common; using Discreet.Common.Serialize; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -13,6 +16,7 @@ namespace Discreet.DB { /// /// Provides a multiple-writer, single-reader instance for writing blocks to the blockchain. Can be configured to write periodically or on each new block. + /// Maintains an internal state for the ValidationCache for validating block data prior to writing to the database. /// public class BlockBuffer { @@ -37,18 +41,199 @@ public static BlockBuffer Instance private static bool _flushEveryBlock = false; public static bool FlushEveryBlock { set { _flushEveryBlock = value; } } + private List buffer; + private HashSet spentKeys = new HashSet(); + private ConcurrentDictionary blockCache = new ConcurrentDictionary(); + private ConcurrentDictionary outputCache = new ConcurrentDictionary(); + private ConcurrentDictionary transactionCache = new ConcurrentDictionary(new SHA256EqualityComparer()); + private ConcurrentDictionary inputCache = new ConcurrentDictionary(new TTXInputEqualityComparer()); + private uint _pIndex; + private readonly object _pLock = new object(); + + private ReaderWriterLockSlim _readWriteLock = new ReaderWriterLockSlim(); + + /// + /// Tries to get a block header from either the DataView or the BlockBuffer's buffer. + /// + /// + /// + /// + public bool TryGetBlockHeader(SHA256 hash, out BlockHeader header) + { + var err = DataView.GetView().TryGetBlockHeader(hash, out var rv); + if (err) + { + header = rv; + return true; + } + else + { + // try getting it from the cache + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + if (blk == null) + { + header = null; + return false; + } + else + { + header = blk.Header; + return true; + } + } + } + + public uint GetOutputIndex() + { + lock (_pLock) + { + return Math.Max(_pIndex, DataView.GetView().GetOutputIndex()); + } + } + + public BlockHeader GetBlockHeader(SHA256 hash) + { + try + { + return DataView.GetView().GetBlockHeader(hash); + } + catch (Exception e) + { + try + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + if (blk == null) throw new Exception(); + return blk.Header; + } + catch + { + throw e; + } + } + } + + public bool BlockExists(SHA256 hash) + { + var succ = DataView.GetView().BlockExists(hash); + + if (succ) + { + return true; + } + else + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + return blk != null; + } + } + + public bool ContainsTransaction(SHA256 hash) + { + var succ = DataView.GetView().ContainsTransaction(hash); + if (succ) + { + return true; + } + else + { + return transactionCache.ContainsKey(hash); + } + } + + public TTXOutput GetPubOutput(TTXInput tin) + { + try + { + return DataView.GetView().GetPubOutput(tin); + } + catch (Exception e) + { + try + { + return inputCache[tin]; + } + catch + { + throw e; + } + } + } + + public bool CheckSpentKey(Key k) + { + lock (spentKeys) + { + return DataView.GetView().CheckSpentKey(k) && !spentKeys.Contains(k); + } + } + + public TXOutput GetOutput(uint idx) + { + try + { + return DataView.GetView().GetOutput(idx); + } + catch (Exception e) + { + try + { + return outputCache[idx]; + } + catch + { + throw e; + } + } + } + + public TXOutput[] GetMixins(uint[] idxs) + { + TXOutput[] rv = new TXOutput[idxs.Length]; + for (int i = 0; i < idxs.Length; i++) + { + rv[i] = GetOutput(idxs[i]); + } + + return rv; + } + public BlockBuffer() { _buffer = Channel.CreateUnbounded(); } + public long GetChainHeight() + { + lock (buffer) + { + if (buffer.Count == 0) return DataView.GetView().GetChainHeight(); + return Math.Max(DataView.GetView().GetChainHeight(), buffer.Select(x => x.Header.Height).Max()); + } + } + /// /// Starts the block buffer's flusher. /// /// public async Task Start() { - List buf = new List(); + _pIndex = DataView.GetView().GetOutputIndex(); + buffer = new List(); DateTime lastFlush = DateTime.MinValue; await foreach(var block in _buffer.Reader.ReadAllAsync()) @@ -59,21 +244,75 @@ public async Task Start() } else { - buf.Add(block); + lock (buffer) + { + buffer.Add(block); + } + + UpdateBuffers(block); + // check received if (DateTime.Now.Subtract(lastFlush) > _flushInterval) { // flush - Flush(buf); - buf.Clear(); + Flush(buffer); + + lock (buffer) + { + buffer.Clear(); + } + lastFlush = DateTime.Now; } } } } - public void Flush(List blocks) + private void UpdateBuffers(Block block) { + if (blockCache.ContainsKey(block.Header.Height)) return; + + blockCache[block.Header.Height] = block; + + foreach (var tx in block.Transactions) + { + transactionCache[tx.TxID] = tx; + + lock (spentKeys) + { + for (int i = 0; i < tx.NumPInputs; i++) + { + spentKeys.Add(tx.PInputs[i].KeyImage); + } + } + + for (int i = 0; i < tx.NumTInputs; i++) + { + if (inputCache.ContainsKey(tx.TInputs[i])) + { + inputCache.Remove(tx.TInputs[i], out _); + } + } + + for (int i = 0; i < tx.NumTOutputs; i++) + { + inputCache[new TTXInput { Offset = (byte)i, TxSrc = tx.TxID }] = tx.TOutputs[i]; + } + + lock (_pLock) + { + for (int i = 0; i < tx.NumPOutputs; i++) + { + outputCache[++_pIndex] = tx.POutputs[i]; + } + } + } + } + + private void Flush(List _blocks) + { + // sort blocks by height + var blocks = _blocks.OrderBy(x => x.Header.Height).ToList(); List updates = new List(); // get previous indexers @@ -85,6 +324,8 @@ public void Flush(List blocks) foreach (var block in blocks) { + if (DataView.GetView().BlockExists(block.Header.BlockHash)) continue; + // add block info updates.Add(new UpdateEntry { key = block.Header.BlockHash.Bytes, value = Serialization.Int64(block.Header.Height), rule = UpdateRule.ADD, type = UpdateType.BLOCKHEIGHT }); updates.Add(new UpdateEntry { key = Serialization.Int64(block.Header.Height), value = block.Serialize(), rule = UpdateRule.ADD, type = UpdateType.BLOCK }); @@ -144,6 +385,22 @@ public void Flush(List blocks) // perform flush DataView.GetView().Flush(updates); + + lock (_pLock) + { + _pIndex = DataView.GetView().GetOutputIndex(); + } + + // dump data + lock (spentKeys) + { + spentKeys.Clear(); + } + + blockCache.Clear(); + inputCache.Clear(); + transactionCache.Clear(); + outputCache.Clear(); } } } diff --git a/Discreet/DB/DataView.cs b/Discreet/DB/DataView.cs index 10e28b2..b4cfcf7 100644 --- a/Discreet/DB/DataView.cs +++ b/Discreet/DB/DataView.cs @@ -152,6 +152,7 @@ public bool TryGetBlockHeader(Cipher.SHA256 blockHash, out BlockHeader header) } catch (Exception ex) { + // Feb 24 2024 11:27 PM: why do we log on attempting to get a block header? Daemon.Logger.Error(ex.Message, ex); header = null; return false; diff --git a/Discreet/DB/ValidationCache.cs b/Discreet/DB/ValidationCache.cs index c4e7b15..3e5fc14 100644 --- a/Discreet/DB/ValidationCache.cs +++ b/Discreet/DB/ValidationCache.cs @@ -15,8 +15,9 @@ public class ValidationCache { /* raw data and db access */ private DataView dataView; + private BlockBuffer blockBuffer; private Block block; // for validating single block - private List blocks; // for validating multiple blocks + private List blocks = null; // for validating multiple blocks public Block CurBlock { get { return block; } } @@ -39,8 +40,9 @@ public class ValidationCache public ValidationCache(Block blk) { dataView = DataView.GetView(); + blockBuffer = BlockBuffer.Instance; block = blk; - pIndex = dataView.GetOutputIndex(); + pIndex = blockBuffer.GetOutputIndex(); tIndex = dataView.GetTransactionIndexer(); updates = new List(); @@ -49,15 +51,16 @@ public ValidationCache(Block blk) txs = new(new Cipher.KeyComparer()); newOutputs = new(); blocksCache = new(new Cipher.SHA256EqualityComparer()); - previousHeight = dataView.GetChainHeight(); + previousHeight = blockBuffer.GetChainHeight(); outputsCache = new(); } public ValidationCache(List blks) { dataView = DataView.GetView(); + blockBuffer = BlockBuffer.Instance; blocks = blks; - pIndex = dataView.GetOutputIndex(); + pIndex = blockBuffer.GetOutputIndex(); tIndex = dataView.GetTransactionIndexer(); updates = new List(); @@ -66,7 +69,7 @@ public ValidationCache(List blks) txs = new(new Cipher.KeyComparer()); newOutputs = new(); blocksCache = new(new Cipher.SHA256EqualityComparer()); - previousHeight = dataView.GetChainHeight(); + previousHeight = blockBuffer.GetChainHeight(); outputsCache = new(); } @@ -129,7 +132,10 @@ private Exception validate(bool many = false) /* validate basic data */ if (block.Header.Version != 1 && block.Header.Version != 2) return new VerifyException("Block", $"Unsupported version (blocks are either version 1 or 2); got version {block.Header.Version}"); if (!block.Hash().Equals(block.Header.BlockHash)) return new VerifyException("Block", $"Block hash in header does not match calculated block hash"); - if (dataView.BlockExists(block.Header.BlockHash)) return new VerifyException("Block", $"Block already present"); + if (blockBuffer.BlockExists(block.Header.BlockHash)) + { + return new VerifyException("Block", $"Block already present"); + } if (block.Transactions == null || block.Transactions.Length == 0) return new VerifyException("Block", "Block contains no transactions"); if (block.Header.NumTXs != block.Transactions.Length) return new VerifyException("Block", $"Block tx mismatch: expected {block.Header.NumTXs}; got {block.Transactions.Length}"); if (block.GetSize() != block.Header.BlockSize) return new VerifyException("Block", $"Block size mismatch: expected {block.Header.BlockSize}; got {block.GetSize()}"); @@ -225,7 +231,8 @@ private Exception validate(bool many = false) { if (blocksCache.Count == 0) { - var pbsucc = dataView.TryGetBlockHeader(block.Header.PreviousBlock, out var prevBlockHeader); + // use BlockBuffer instead + var pbsucc = BlockBuffer.Instance.TryGetBlockHeader(block.Header.PreviousBlock, out var prevBlockHeader); if (prevBlockHeader == null) return new VerifyException("Block", "Could not get previous block"); if (prevBlockHeader.Height + 1 != block.Header.Height) return new VerifyException("Block", "Previous block height + 1 does not equal block height"); if (previousHeight + 1 != block.Header.Height) return new VerifyException("Block", "Chain height + 1 does not equal block height"); @@ -240,15 +247,23 @@ private Exception validate(bool many = false) } else { - if (!dataView.BlockExists(block.Header.PreviousBlock)) + if (!blockBuffer.BlockExists(block.Header.PreviousBlock)) { - return new OrphanBlockException("Orphan block detected", dataView.GetChainHeight(), block.Header.Height, block); + return new OrphanBlockException("Orphan block detected", blockBuffer.GetChainHeight(), block.Header.Height, block); } else { - var prevHeader = dataView.GetBlockHeader(block.Header.PreviousBlock); - if (prevHeader.Height + 1 != block.Header.Height) return new VerifyException("Block", "Previous block height + 1 does not equal block height"); - if (dataView.GetChainHeight() + 1 != block.Header.Height) return new VerifyException("Block", "Chain height + 1 does not equal block height"); + try + { + var prevHeader = blockBuffer.GetBlockHeader(block.Header.PreviousBlock); + if (prevHeader.Height + 1 != block.Header.Height) return new VerifyException("Block", "Previous block height + 1 does not equal block height"); + if (blockBuffer.GetChainHeight() + 1 != block.Header.Height) return new VerifyException("Block", "Chain height + 1 does not equal block height"); + } + catch (Exception e) + { + + } + } } @@ -259,7 +274,7 @@ private Exception validate(bool many = false) /* reject if duplicate or in main branch */ if (txs.Contains(tx.TxID.ToKey())) return new VerifyException("Block", $"Transaction {tx.TxID.ToHexShort()} already present in block"); - if (dataView.ContainsTransaction(tx.TxID)) return new VerifyException("Block", $"Transaction {tx.TxID.ToHexShort()} already present in main branch"); + if (blockBuffer.ContainsTransaction(tx.TxID)) return new VerifyException("Block", $"Transaction {tx.TxID.ToHexShort()} already present in main branch"); /* transparent checks */ TTXOutput[] tinVals = new TTXOutput[tx.NumTInputs]; @@ -282,7 +297,7 @@ private Exception validate(bool many = false) HashSet missingTxs = new HashSet(); for (int j = 0; j < tx.NumTInputs; j++) { - if (!txs.Contains(tx.TInputs[j].TxSrc.ToKey()) && !dataView.ContainsTransaction(tx.TInputs[j].TxSrc)) missingTxs.Add(tx.TInputs[j].TxSrc); + if (!txs.Contains(tx.TInputs[j].TxSrc.ToKey()) && !blockBuffer.ContainsTransaction(tx.TInputs[j].TxSrc)) missingTxs.Add(tx.TInputs[j].TxSrc); } /* reject if missing outputs */ @@ -299,7 +314,7 @@ private Exception validate(bool many = false) { try { - tinVals[j] = dataView.GetPubOutput(tx.TInputs[j]); + tinVals[j] = blockBuffer.GetPubOutput(tx.TInputs[j]); } catch { @@ -331,7 +346,10 @@ private Exception validate(bool many = false) for (int j = 0; j < tx.NumPInputs; j++) { /* verify no duplicate spends in pool or main branch */ - if (!dataView.CheckSpentKey(tx.PInputs[j].KeyImage) || spentKeys.Contains(tx.PInputs[j].KeyImage)) return new VerifyException("Block", $"Private input's key image ({tx.PInputs[j].KeyImage.ToHexShort()}) already spent"); + if (!blockBuffer.CheckSpentKey(tx.PInputs[j].KeyImage) || spentKeys.Contains(tx.PInputs[j].KeyImage)) + { + return new VerifyException("Block", $"Private input's key image ({tx.PInputs[j].KeyImage.ToHexShort()}) already spent"); + } /* verify existence of all mixins */ if (many) @@ -344,7 +362,7 @@ private Exception validate(bool many = false) { try { - mixins[j][k] = dataView.GetOutput(tx.PInputs[j].Offsets[k]); + mixins[j][k] = blockBuffer.GetOutput(tx.PInputs[j].Offsets[k]); } catch { @@ -357,7 +375,7 @@ private Exception validate(bool many = false) { try { - mixins[j] = dataView.GetMixins(tx.PInputs[j].Offsets); + mixins[j] = blockBuffer.GetMixins(tx.PInputs[j].Offsets); } catch { @@ -596,16 +614,47 @@ private Exception validate(bool many = false) return null; } - public async Task Flush() + public async Task Flush(List goodBlocks = null) { // We no longer flush updates here; TODO: remove updates from ValidationCache //dataView.Flush(updates); - await BlockBuffer.Instance.Writer.WriteAsync(block); if (blocks != null) { - foreach (var block in blocks) + if (goodBlocks != null) + { + foreach (var blk in goodBlocks) + { + await BlockBuffer.Instance.Writer.WriteAsync(blk); + } + } + else { - Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); + foreach (var blk in blocks) + { + await BlockBuffer.Instance.Writer.WriteAsync(blk); + } + } + } + else + { + await BlockBuffer.Instance.Writer.WriteAsync(block); + } + + if (blocks != null) + { + if (goodBlocks != null) + { + foreach (var block in goodBlocks) + { + Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); + } + } + else + { + foreach (var block in blocks) + { + Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); + } } } else diff --git a/Discreet/Daemon/Daemon.cs b/Discreet/Daemon/Daemon.cs index 3aaeaf8..970586f 100644 --- a/Discreet/Daemon/Daemon.cs +++ b/Discreet/Daemon/Daemon.cs @@ -442,7 +442,7 @@ public async Task Start() // first, flush valid blocks Logger.Error(exc.Message, exc); Logger.Error($"An invalid block was found during syncing (height {_beginHeight}). Re-requesting all future blocks (up to height {_newHeight}) and flushing valid blocks to disk"); - await vcache.Flush(); + await vcache.Flush(goodBlocks); // publish to ZMQ and syncer queues if (goodBlocks != null && goodBlocks.Count > 0)