diff --git a/Discreet/Cipher/Native/DiscreetCore.cs b/Discreet/Cipher/Native/DiscreetCore.cs index 065d05f..60f2fad 100644 --- a/Discreet/Cipher/Native/DiscreetCore.cs +++ b/Discreet/Cipher/Native/DiscreetCore.cs @@ -265,8 +265,8 @@ public static extern Triptych triptych_PROVE( [MarshalAs(UnmanagedType.Struct)] Key message); [DllImport("DiscreetCore")] - [return: MarshalAs(UnmanagedType.Bool)] - public static extern bool triptych_VERIFY( + [return: MarshalAs(UnmanagedType.U1)] + public static extern byte triptych_VERIFY( [In, Out] Triptych bp, [MarshalAs(UnmanagedType.LPArray, SizeConst = 64, ArraySubType = UnmanagedType.Struct)] Key[] M, [MarshalAs(UnmanagedType.LPArray, SizeConst = 64, ArraySubType = UnmanagedType.Struct)] Key[] P, diff --git a/Discreet/Cipher/Native/Native.cs b/Discreet/Cipher/Native/Native.cs index 9cc6329..87fb793 100644 --- a/Discreet/Cipher/Native/Native.cs +++ b/Discreet/Cipher/Native/Native.cs @@ -124,8 +124,8 @@ public delegate Triptych triptych_PROVEDelegate( [MarshalAs(UnmanagedType.Struct)] Key r, [MarshalAs(UnmanagedType.Struct)] Key s, [MarshalAs(UnmanagedType.Struct)] Key message); - [return: MarshalAs(UnmanagedType.Bool)] - public delegate bool triptych_VERIFYDelegate( + [return: MarshalAs(UnmanagedType.U1)] + public delegate byte triptych_VERIFYDelegate( [In, Out] Triptych bp, [MarshalAs(UnmanagedType.LPArray, SizeConst = 64, ArraySubType = UnmanagedType.Struct)] Key[] M, [MarshalAs(UnmanagedType.LPArray, SizeConst = 64, ArraySubType = UnmanagedType.Struct)] Key[] P, diff --git a/Discreet/Cipher/Triptych.cs b/Discreet/Cipher/Triptych.cs index dcaa440..0fe0586 100644 --- a/Discreet/Cipher/Triptych.cs +++ b/Discreet/Cipher/Triptych.cs @@ -92,7 +92,7 @@ public static Triptych Prove(Key[] M, Key[] P, Key C_offset, uint l, Key r, Key return proof; } - public static bool triptych_VERIFY(Triptych bp, Key[] M, Key[] P, Key C_offset, Key message) => Native.Native.Instance.triptych_VERIFY(bp, M, P, C_offset, message); + public static bool triptych_VERIFY(Triptych bp, Key[] M, Key[] P, Key C_offset, Key message) => Native.Native.Instance.triptych_VERIFY(bp, M, P, C_offset, message) != 0; public static bool Verify(Triptych bp, Key[] M, Key[] P, Key C_offset, Key message) { diff --git a/Discreet/Coin/Converters/TransactionConverter.cs b/Discreet/Coin/Converters/TransactionConverter.cs index 6d66b2f..6a83815 100644 --- a/Discreet/Coin/Converters/TransactionConverter.cs +++ b/Discreet/Coin/Converters/TransactionConverter.cs @@ -70,6 +70,7 @@ public override Models.FullTransaction Read(ref Utf8JsonReader reader, Type type reader.Read(); version = reader.GetByte(); if (version >= (byte)Config.TransactionVersions.END) throw new JsonException("FullTransaction version exceeds defined range"); + transaction.Version = version; switch (version) { @@ -85,7 +86,7 @@ public override Models.FullTransaction Read(ref Utf8JsonReader reader, Type type while (reader.Read()) { - if (reader.TokenType == JsonTokenType.EndObject) return transaction; + if (reader.TokenType == JsonTokenType.EndObject) break; if (reader.TokenType == JsonTokenType.PropertyName) { @@ -206,6 +207,16 @@ public override Models.FullTransaction Read(ref Utf8JsonReader reader, Type type } } + if (transaction.Version == 3) + { + transaction.NumTInputs = transaction.NumInputs; + transaction.NumTOutputs = transaction.NumOutputs; + } + else if (transaction.Version == 2 || transaction.Version == 1 || transaction.Version == 0) + { + transaction.NumPInputs = transaction.NumInputs; + transaction.NumPOutputs = transaction.NumOutputs; + } return transaction; } diff --git a/Discreet/Coin/Models/Block.cs b/Discreet/Coin/Models/Block.cs index af6c068..186751b 100644 --- a/Discreet/Coin/Models/Block.cs +++ b/Discreet/Coin/Models/Block.cs @@ -9,6 +9,7 @@ using Discreet.Common.Serialize; using System.Text.Json; using Discreet.Daemon; +using Discreet.DB; namespace Discreet.Coin.Models { @@ -84,8 +85,9 @@ public static Block Build(List txs, StealthAddress miner, Key s block.Header.BlockSize += txs[i].GetSize(); } - DB.DataView dataView = DB.DataView.GetView(); - + // because of block buffer, we need to use that instead + IView dataView = BlockBuffer.Instance; + block.Header.Height = dataView.GetChainHeight() + 1; if (block.Header.Height > 0) diff --git a/Discreet/DB/AlreadyPresentException.cs b/Discreet/DB/AlreadyPresentException.cs new file mode 100644 index 0000000..b449ac5 --- /dev/null +++ b/Discreet/DB/AlreadyPresentException.cs @@ -0,0 +1,18 @@ +using Discreet.Common.Exceptions; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Discreet.DB +{ + public class AlreadyPresentException : VerifyException + { + public AlreadyPresentException(string msg) : base("Discreet.Coin.Verify: " + msg) { } + + public AlreadyPresentException(string type, string msg) : base("Discreet.Coin." + type + ".Verify: " + msg) { } + + public AlreadyPresentException(string type, string vertype, string msg) : base("Discreet.Coin." + type + "Verify" + vertype + ": " + msg) { } + } +} diff --git a/Discreet/DB/BlockBuffer.cs b/Discreet/DB/BlockBuffer.cs new file mode 100644 index 0000000..669cc61 --- /dev/null +++ b/Discreet/DB/BlockBuffer.cs @@ -0,0 +1,575 @@ +using Discreet.Cipher; +using Discreet.Coin.Comparers; +using Discreet.Coin.Models; +using Discreet.Common; +using Discreet.Common.Serialize; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Security.Policy; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Discreet.DB +{ + /// + /// Provides a multiple-writer, single-reader instance for writing blocks to the blockchain. Can be configured to write periodically or on each new block. + /// Maintains an internal state for the ValidationCache for validating block data prior to writing to the database. + /// Wraps DataView for providing an IView. + /// + public class BlockBuffer : IView + { + private ConcurrentQueue _buffer; + + private static BlockBuffer _instance; + + public static BlockBuffer Instance + { + get + { + if (_instance == null) _instance = new BlockBuffer(); + return _instance; + } + } + + private static TimeSpan _flushInterval = TimeSpan.FromSeconds(1); + public static TimeSpan FlushInterval { set { _flushInterval = TimeSpan.FromTicks(Math.Max(value.Ticks, TimeSpan.FromSeconds(1).Ticks)); } } + + private static bool _flushEveryBlock = false; + public static bool FlushEveryBlock { set { _flushEveryBlock = value; } } + + private List buffer = new List(); + private HashSet spentKeys = new HashSet(); + private ConcurrentDictionary blockCache = new ConcurrentDictionary(); + private ConcurrentDictionary outputCache = new ConcurrentDictionary(); + private ConcurrentDictionary transactionCache = new ConcurrentDictionary(new SHA256EqualityComparer()); + private ConcurrentDictionary inputCache = new ConcurrentDictionary(new TTXInputEqualityComparer()); + private uint _pIndex; + private readonly object _pLock = new object(); + + private DataView _view; + + private static readonly Block _signaler = new Block(); + + /// + /// Tries to get a block header from either the DataView or the BlockBuffer's buffer. + /// + /// + /// + /// + public bool TryGetBlockHeader(SHA256 hash, out BlockHeader header) + { + var err =_view.TryGetBlockHeader(hash, out var rv); + if (err) + { + header = rv; + return true; + } + else + { + // try getting it from the cache + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + if (blk == null) + { + header = null; + return false; + } + else + { + header = blk.Header; + return true; + } + } + } + + public uint GetOutputIndex() + { + lock (_pLock) + { + return Math.Max(_pIndex, DataView.GetView().GetOutputIndex()); + } + } + + public BlockHeader GetBlockHeader(SHA256 hash) + { + try + { + return _view.GetBlockHeader(hash); + } + catch (Exception e) + { + try + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + if (blk == null) throw new Exception(); + return blk.Header; + } + catch + { + throw e; + } + } + } + + public BlockHeader GetBlockHeader(long height) + { + try + { + return _view.GetBlockHeader(height); + } + catch (Exception e) + { + try + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.Height == height).FirstOrDefault(); + } + + if (blk == null) throw new Exception(); + return blk.Header; + } + catch + { + throw e; + } + } + } + + public bool BlockExists(SHA256 hash) + { + var succ = _view.BlockExists(hash); + + if (succ) + { + return true; + } + else + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + return blk != null; + } + } + + public bool ContainsTransaction(SHA256 hash) + { + var succ = DataView.GetView().ContainsTransaction(hash); + if (succ) + { + return true; + } + else + { + return transactionCache.ContainsKey(hash); + } + } + + public TTXOutput GetPubOutput(TTXInput tin) + { + try + { + return _view.GetPubOutput(tin); + } + catch (Exception e) + { + try + { + return inputCache[tin]; + } + catch + { + throw e; + } + } + } + + public bool CheckSpentKey(Key k) + { + lock (spentKeys) + { + return _view.CheckSpentKey(k) && !spentKeys.Contains(k); + } + } + + public TXOutput GetOutput(uint idx) + { + try + { + return DataView.GetView().GetOutput(idx); + } + catch (Exception e) + { + try + { + return outputCache[idx]; + } + catch + { + throw e; + } + } + } + + public TXOutput[] GetMixins(uint[] idxs) + { + TXOutput[] rv = new TXOutput[idxs.Length]; + for (int i = 0; i < idxs.Length; i++) + { + rv[i] = GetOutput(idxs[i]); + } + + return rv; + } + + public BlockBuffer() + { + _buffer = new ConcurrentQueue(); + _view = DataView.GetView(); + } + + public long GetChainHeight() + { + lock (buffer) + { + if (buffer.Count == 0) return DataView.GetView().GetChainHeight(); + return Math.Max(DataView.GetView().GetChainHeight(), buffer.Select(x => x.Header.Height).Max()); + } + } + + public void WriteToBuffer(Block blk) + { + _buffer.Enqueue(blk); + UpdateBuffers(blk); + } + + public void ForceFlush() + { + lock (buffer) + { + if (buffer.Count == 0) + { + return; + } + + Flush(buffer); + buffer.Clear(); + } + } + + /// + /// Starts the block buffer's flusher. + /// + /// + public async Task Start() + { + _pIndex = DataView.GetView().GetOutputIndex(); + + var timer = new PeriodicTimer(_flushInterval); + while (await timer.WaitForNextTickAsync()) + { + lock (buffer) + { + while (_buffer.TryDequeue(out var block)) + { + buffer.Add(block); + } + + if (buffer.Count == 0) + { + continue; + } + + Flush(buffer); + buffer.Clear(); + } + } + } + + public IEnumerable GetBlocks(long startHeight, long limit) => _view.GetBlocks(startHeight, limit); + + public void AddBlockToCache(Block blk) => _view.AddBlockToCache(blk); + + public bool BlockCacheHas(SHA256 blk) => _view.BlockCacheHas(blk); + + public void ClearBlockCache() => _view.ClearBlockCache(); + + public void AddBlock(Block blk) => _buffer.Enqueue(blk); + + public Dictionary GetBlockCache() => _view.GetBlockCache(); + + public void RemovePubOutput(TTXInput inp) => _view.RemovePubOutput(inp); + + public uint[] GetOutputIndices(SHA256 hash) => _view.GetOutputIndices(hash); + + public (TXOutput[], int) GetMixins(uint idx) => _view.GetMixins(idx); + + public (TXOutput[], int) GetMixinsUniform(uint idx) => _view.GetMixinsUniform(idx); + + public FullTransaction GetTransaction(SHA256 hash) + { + try + { + return _view.GetTransaction(hash); + } + catch(Exception e) + { + try + { + return transactionCache[hash]; + } + catch + { + throw e; + } + } + } + + public FullTransaction GetTransaction(ulong txid) => _view.GetTransaction(txid); + + public Block GetBlock(long height) + { + try + { + return _view.GetBlock(height); + } + catch (Exception e) + { + try + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.Height == height).FirstOrDefault(); + } + + if (blk == null) throw new Exception(); + return blk; + } + catch + { + throw e; + } + } + } + + public Block GetBlock(SHA256 hash) + { + try + { + return _view.GetBlock(hash); + } + catch (Exception e) + { + try + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + if (blk == null) throw new Exception(); + return blk; + } + catch + { + throw e; + } + } + } + + public ulong GetTransactionIndexer() => _view.GetTransactionIndexer(); + + public ulong GetTransactionIndex(SHA256 h) => _view.GetTransactionIndex(h); + + public long GetBlockHeight(SHA256 hash) + { + try + { + return _view.GetBlockHeight(hash); + } + catch (Exception e) + { + try + { + Block blk = null; + lock (buffer) + { + blk = buffer.Where(x => x.Header.BlockHash == hash).FirstOrDefault(); + } + + if (blk == null) throw new Exception(); + return blk.Header.Height; + } + catch + { + throw e; + } + } + } + + public bool BlockHeightExists(long h) + { + lock (buffer) + { + return _view.BlockHeightExists(h) || buffer.Any(x => x.Header.Height == h); + } + } + + public void Flush(IEnumerable updates) => throw new Exception("Discreet.DB.BlockBuffer: cannot directly update the block buffer's database!"); + + private void UpdateBuffers(Block block) + { + if (blockCache.ContainsKey(block.Header.Height)) return; + + blockCache[block.Header.Height] = block; + + foreach (var tx in block.Transactions) + { + transactionCache[tx.TxID] = tx; + + lock (spentKeys) + { + for (int i = 0; i < tx.NumPInputs; i++) + { + spentKeys.Add(tx.PInputs[i].KeyImage); + } + } + + for (int i = 0; i < tx.NumTInputs; i++) + { + if (inputCache.ContainsKey(tx.TInputs[i])) + { + inputCache.Remove(tx.TInputs[i], out _); + } + } + + for (int i = 0; i < tx.NumTOutputs; i++) + { + inputCache[new TTXInput { Offset = (byte)i, TxSrc = tx.TxID }] = tx.TOutputs[i]; + } + + lock (_pLock) + { + for (int i = 0; i < tx.NumPOutputs; i++) + { + outputCache[++_pIndex] = tx.POutputs[i]; + } + } + } + } + + private void Flush(List _blocks) + { + // sort blocks by height + var blocks = _blocks.OrderBy(x => x.Header.Height).ToList(); + List updates = new List(); + + // get previous indexers + var tIndex = DataView.GetView().GetTransactionIndexer(); + var pIndex = DataView.GetView().GetOutputIndex(); + var bIndex = blocks.Select(x => x.Header.Height).Max(); + + Dictionary pubUpdates = new Dictionary(new TTXInputEqualityComparer()); + + foreach (var block in blocks) + { + if (DataView.GetView().BlockExists(block.Header.BlockHash)) continue; + + // add block info + updates.Add(new UpdateEntry { key = block.Header.BlockHash.Bytes, value = Serialization.Int64(block.Header.Height), rule = UpdateRule.ADD, type = UpdateType.BLOCKHEIGHT }); + updates.Add(new UpdateEntry { key = Serialization.Int64(block.Header.Height), value = block.Serialize(), rule = UpdateRule.ADD, type = UpdateType.BLOCK }); + updates.Add(new UpdateEntry { key = Serialization.Int64(block.Header.Height), value = block.Header.Serialize(), rule = UpdateRule.ADD, type = UpdateType.BLOCKHEADER }); + + // add transactions + foreach (var tx in block.Transactions) + { + tIndex++; + updates.Add(new UpdateEntry { key = tx.TxID.Bytes, value = Serialization.UInt64(tIndex), rule = UpdateRule.ADD, type = UpdateType.TXINDEX }); + updates.Add(new UpdateEntry { key = Serialization.UInt64(tIndex), value = tx.Serialize(), rule = UpdateRule.ADD, type = UpdateType.TX }); + + // pouts + uint[] uarr = new uint[tx.NumPOutputs]; + for (int i = 0; i < tx.NumPOutputs; i++) + { + pIndex++; + updates.Add(new UpdateEntry { key = Serialization.UInt32(pIndex), value = tx.POutputs[i].Serialize(), rule = UpdateRule.ADD, type = UpdateType.OUTPUT }); + uarr[i] = pIndex; + } + + // pout indices + updates.Add(new UpdateEntry { key = tx.TxID.Bytes, value = Serialization.UInt32Array(uarr), rule = UpdateRule.ADD, type = UpdateType.OUTPUTINDICES }); + + // spent keys + for (int i = 0; i < tx.NumPInputs; i++) + { + updates.Add(new UpdateEntry { key = tx.PInputs[i].KeyImage.bytes, value = ChainDB.ZEROKEY, rule = UpdateRule.ADD, type = UpdateType.SPENTKEY }); + } + + // tinputs + for (int i = 0; i < tx.NumTInputs; i++) + { + if (pubUpdates.ContainsKey(tx.TInputs[i])) pubUpdates.Remove(tx.TInputs[i]); + else updates.Add(new UpdateEntry { key = tx.TInputs[i].Serialize(), value = ChainDB.ZEROKEY, rule = UpdateRule.DEL, type = UpdateType.PUBOUTPUT }); + } + + // touts + for (int i = 0; i < tx.NumTOutputs; i++) + { + pubUpdates[new TTXInput { TxSrc = tx.TxID, Offset = (byte)i }] = tx.TOutputs[i]; + } + } + } + + // new touts + foreach ((var txi, var txo) in pubUpdates) + { + txo.TransactionSrc = txi.TxSrc; + updates.Add(new UpdateEntry { key = txi.Serialize(), value = txo.Serialize(), rule = UpdateRule.ADD, type = UpdateType.PUBOUTPUT }); + } + + // update indexers + updates.Add(new UpdateEntry { key = Encoding.ASCII.GetBytes("indexer_tx"), value = Serialization.UInt64(tIndex), rule = UpdateRule.UPDATE, type = UpdateType.TXINDEXER }); + updates.Add(new UpdateEntry { key = Encoding.ASCII.GetBytes("indexer_output"), value = Serialization.UInt32(pIndex), rule = UpdateRule.UPDATE, type = UpdateType.OUTPUTINDEXER }); + updates.Add(new UpdateEntry { key = Encoding.ASCII.GetBytes("height"), value = Serialization.Int64(bIndex), rule = UpdateRule.UPDATE, type = UpdateType.HEIGHT }); + + // perform flush + DataView.GetView().Flush(updates); + + lock (_pLock) + { + _pIndex = DataView.GetView().GetOutputIndex(); + } + + // dump data + lock (spentKeys) + { + spentKeys.Clear(); + } + + blockCache.Clear(); + inputCache.Clear(); + transactionCache.Clear(); + outputCache.Clear(); + } + } +} diff --git a/Discreet/DB/DataView.cs b/Discreet/DB/DataView.cs index 10e28b2..b4cfcf7 100644 --- a/Discreet/DB/DataView.cs +++ b/Discreet/DB/DataView.cs @@ -152,6 +152,7 @@ public bool TryGetBlockHeader(Cipher.SHA256 blockHash, out BlockHeader header) } catch (Exception ex) { + // Feb 24 2024 11:27 PM: why do we log on attempting to get a block header? Daemon.Logger.Error(ex.Message, ex); header = null; return false; diff --git a/Discreet/DB/ValidationCache.cs b/Discreet/DB/ValidationCache.cs index b8b2aa7..f307dea 100644 --- a/Discreet/DB/ValidationCache.cs +++ b/Discreet/DB/ValidationCache.cs @@ -3,6 +3,7 @@ using Discreet.Common; using Discreet.Common.Exceptions; using Discreet.Common.Serialize; +using Discreet.Daemon; using System; using System.Collections.Generic; using System.Linq; @@ -15,8 +16,9 @@ public class ValidationCache { /* raw data and db access */ private DataView dataView; + private BlockBuffer blockBuffer; private Block block; // for validating single block - private List blocks; // for validating multiple blocks + private List blocks = null; // for validating multiple blocks public Block CurBlock { get { return block; } } @@ -39,8 +41,9 @@ public class ValidationCache public ValidationCache(Block blk) { dataView = DataView.GetView(); + blockBuffer = BlockBuffer.Instance; block = blk; - pIndex = dataView.GetOutputIndex(); + pIndex = blockBuffer.GetOutputIndex(); tIndex = dataView.GetTransactionIndexer(); updates = new List(); @@ -49,15 +52,16 @@ public ValidationCache(Block blk) txs = new(new Cipher.KeyComparer()); newOutputs = new(); blocksCache = new(new Cipher.SHA256EqualityComparer()); - previousHeight = dataView.GetChainHeight(); + previousHeight = blockBuffer.GetChainHeight(); outputsCache = new(); } public ValidationCache(List blks) { dataView = DataView.GetView(); + blockBuffer = BlockBuffer.Instance; blocks = blks; - pIndex = dataView.GetOutputIndex(); + pIndex = blockBuffer.GetOutputIndex(); tIndex = dataView.GetTransactionIndexer(); updates = new List(); @@ -66,7 +70,7 @@ public ValidationCache(List blks) txs = new(new Cipher.KeyComparer()); newOutputs = new(); blocksCache = new(new Cipher.SHA256EqualityComparer()); - previousHeight = dataView.GetChainHeight(); + previousHeight = blockBuffer.GetChainHeight(); outputsCache = new(); } @@ -127,9 +131,13 @@ private Exception validateMany() private Exception validate(bool many = false) { /* validate basic data */ + if (block == null) return new VerifyException("Block", "Block was null"); if (block.Header.Version != 1 && block.Header.Version != 2) return new VerifyException("Block", $"Unsupported version (blocks are either version 1 or 2); got version {block.Header.Version}"); if (!block.Hash().Equals(block.Header.BlockHash)) return new VerifyException("Block", $"Block hash in header does not match calculated block hash"); - if (dataView.BlockExists(block.Header.BlockHash)) return new VerifyException("Block", $"Block already present"); + if (blockBuffer.BlockExists(block.Header.BlockHash)) + { + return new AlreadyPresentException("Block", $"Block already present"); + } if (block.Transactions == null || block.Transactions.Length == 0) return new VerifyException("Block", "Block contains no transactions"); if (block.Header.NumTXs != block.Transactions.Length) return new VerifyException("Block", $"Block tx mismatch: expected {block.Header.NumTXs}; got {block.Transactions.Length}"); if (block.GetSize() != block.Header.BlockSize) return new VerifyException("Block", $"Block size mismatch: expected {block.Header.BlockSize}; got {block.GetSize()}"); @@ -225,7 +233,8 @@ private Exception validate(bool many = false) { if (blocksCache.Count == 0) { - var pbsucc = dataView.TryGetBlockHeader(block.Header.PreviousBlock, out var prevBlockHeader); + // use BlockBuffer instead + var pbsucc = BlockBuffer.Instance.TryGetBlockHeader(block.Header.PreviousBlock, out var prevBlockHeader); if (prevBlockHeader == null) return new VerifyException("Block", "Could not get previous block"); if (prevBlockHeader.Height + 1 != block.Header.Height) return new VerifyException("Block", "Previous block height + 1 does not equal block height"); if (previousHeight + 1 != block.Header.Height) return new VerifyException("Block", "Chain height + 1 does not equal block height"); @@ -240,15 +249,23 @@ private Exception validate(bool many = false) } else { - if (!dataView.BlockExists(block.Header.PreviousBlock)) + if (!blockBuffer.BlockExists(block.Header.PreviousBlock)) { - return new OrphanBlockException("Orphan block detected", dataView.GetChainHeight(), block.Header.Height, block); + return new OrphanBlockException("Orphan block detected", blockBuffer.GetChainHeight(), block.Header.Height, block); } else { - var prevHeader = dataView.GetBlockHeader(block.Header.PreviousBlock); - if (prevHeader.Height + 1 != block.Header.Height) return new VerifyException("Block", "Previous block height + 1 does not equal block height"); - if (dataView.GetChainHeight() + 1 != block.Header.Height) return new VerifyException("Block", "Chain height + 1 does not equal block height"); + try + { + var prevHeader = blockBuffer.GetBlockHeader(block.Header.PreviousBlock); + if (prevHeader.Height + 1 != block.Header.Height) return new VerifyException("Block", "Previous block height + 1 does not equal block height"); + if (blockBuffer.GetChainHeight() + 1 != block.Header.Height) return new VerifyException("Block", "Chain height + 1 does not equal block height"); + } + catch (Exception e) + { + + } + } } @@ -259,7 +276,7 @@ private Exception validate(bool many = false) /* reject if duplicate or in main branch */ if (txs.Contains(tx.TxID.ToKey())) return new VerifyException("Block", $"Transaction {tx.TxID.ToHexShort()} already present in block"); - if (dataView.ContainsTransaction(tx.TxID)) return new VerifyException("Block", $"Transaction {tx.TxID.ToHexShort()} already present in main branch"); + if (blockBuffer.ContainsTransaction(tx.TxID)) return new VerifyException("Block", $"Transaction {tx.TxID.ToHexShort()} already present in main branch"); /* transparent checks */ TTXOutput[] tinVals = new TTXOutput[tx.NumTInputs]; @@ -282,7 +299,7 @@ private Exception validate(bool many = false) HashSet missingTxs = new HashSet(); for (int j = 0; j < tx.NumTInputs; j++) { - if (!txs.Contains(tx.TInputs[j].TxSrc.ToKey()) && !dataView.ContainsTransaction(tx.TInputs[j].TxSrc)) missingTxs.Add(tx.TInputs[j].TxSrc); + if (!txs.Contains(tx.TInputs[j].TxSrc.ToKey()) && !blockBuffer.ContainsTransaction(tx.TInputs[j].TxSrc)) missingTxs.Add(tx.TInputs[j].TxSrc); } /* reject if missing outputs */ @@ -299,7 +316,7 @@ private Exception validate(bool many = false) { try { - tinVals[j] = dataView.GetPubOutput(tx.TInputs[j]); + tinVals[j] = blockBuffer.GetPubOutput(tx.TInputs[j]); } catch { @@ -331,12 +348,16 @@ private Exception validate(bool many = false) for (int j = 0; j < tx.NumPInputs; j++) { /* verify no duplicate spends in pool or main branch */ - if (!dataView.CheckSpentKey(tx.PInputs[j].KeyImage) || spentKeys.Contains(tx.PInputs[j].KeyImage)) return new VerifyException("Block", $"Private input's key image ({tx.PInputs[j].KeyImage.ToHexShort()}) already spent"); + if (!blockBuffer.CheckSpentKey(tx.PInputs[j].KeyImage) || spentKeys.Contains(tx.PInputs[j].KeyImage)) + { + return new VerifyException("Block", $"Private input's key image ({tx.PInputs[j].KeyImage.ToHexShort()}) already spent"); + } /* verify existence of all mixins */ if (many) { mixins[j] = new TXOutput[64]; + for (int k = 0; k < 64; k++) { bool mixinSuccess = outputsCache.TryGetValue(tx.PInputs[j].Offsets[k], out mixins[j][k]); @@ -344,7 +365,7 @@ private Exception validate(bool many = false) { try { - mixins[j][k] = dataView.GetOutput(tx.PInputs[j].Offsets[k]); + mixins[j][k] = blockBuffer.GetOutput(tx.PInputs[j].Offsets[k]); } catch { @@ -357,7 +378,7 @@ private Exception validate(bool many = false) { try { - mixins[j] = dataView.GetMixins(tx.PInputs[j].Offsets); + mixins[j] = blockBuffer.GetMixins(tx.PInputs[j].Offsets); } catch { @@ -463,7 +484,6 @@ private Exception validate(bool many = false) Array.Copy(tx.SigningHash.Bytes, data, 32); Array.Copy(tx.TInputs[j].Hash(tinVals[j]).Bytes, 0, data, 32, 32); Cipher.SHA256 checkSig = Cipher.SHA256.HashData(data); - if (!tx.TSignatures[j].Verify(checkSig)) return new VerifyException("Block", $"Transparent input at index {j} has invalid signature"); } @@ -596,28 +616,63 @@ private Exception validate(bool many = false) return null; } - public void Flush() + public async Task Flush(List goodBlocks = null, bool force = false) { - dataView.Flush(updates); + // We no longer flush updates here; TODO: remove updates from ValidationCache + //dataView.Flush(updates); + if (blocks != null) + { + if (goodBlocks != null) + { + foreach (var blk in goodBlocks) + { + BlockBuffer.Instance.WriteToBuffer(blk); + } + } + else + { + foreach (var blk in blocks) + { + BlockBuffer.Instance.WriteToBuffer(blk); + } + } + } + else + { + BlockBuffer.Instance.WriteToBuffer(block); + } + if (blocks != null) { - foreach (var block in blocks) + if (goodBlocks != null) { - Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); + foreach (var block in goodBlocks) + { + Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); + } + } + else + { + foreach (var block in blocks) + { + Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); + } } } else { Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions); } + + if (force) BlockBuffer.Instance.ForceFlush(); } - public static void Process(Block block) + public static async Task Process(Block block) { ValidationCache vCache = new ValidationCache(block); var exc = vCache.Validate(); if (exc != null) throw exc; - vCache.Flush(); + await vCache.Flush(); } } } diff --git a/Discreet/Daemon/BlockAuth/AuthKeys.cs b/Discreet/Daemon/BlockAuth/AuthKeys.cs index 58c5f2f..324f10d 100644 --- a/Discreet/Daemon/BlockAuth/AuthKeys.cs +++ b/Discreet/Daemon/BlockAuth/AuthKeys.cs @@ -14,22 +14,22 @@ public class AuthKeys public static List Defaults = new List { - Key.FromHex("60f13a18c379061795fd094be1e4043513fe00c94b3a9b5c0502949a1515970d"), - Key.FromHex("554fdfa82121ac55400fdbfde3403f608843bfb7e2ae17f05eca6e2f70dda038"), - Key.FromHex("d485ff6a2268e0bd2dc0c6b5d190044f8a84491b653b2b603d072ed6f13d1376"), - Key.FromHex("ef72d2be9afe943b03f7eabe17fe53c327db6dc4012c4910daa196961b379eee"), - Key.FromHex("6e4f9047ce3425ed2d11d495dd2cabf2ef6662e3bf77076c97a54a49480a3474"), - Key.FromHex("8fcea78a412b004a76c32d2933f4a43b8f52592c9ca2508b63d0861a786568d9"), - Key.FromHex("d51518eb9295241bd1452bcc5bedd526f480017feb5c62c05b4f895a447020a5"), - Key.FromHex("b8c5314628de3ba604934ac7a7ed3347a3050de2fa94e3f531a9073f821d2f9d"), - Key.FromHex("513fdd96d370d543613cdcee3560c042d0a99db1101d6b95686e1770abefabf8"), - Key.FromHex("ee761e6a2d89329bd4ea86effd65b117b9721fd267924e475a084f14cdab985a"), - Key.FromHex("21ce2b2ea5692e5a0654540ae2dcc698504d7c91c9c7510d899d7025e8191805"), - Key.FromHex("a2dbfee8f2baf02ca05d7d9f145c281556e2da5bbf680463d02de502aef25533"), - Key.FromHex("fae2f381da4ff1ab150e6f1cede5eec7dbd7ed97f049fca80fe0d8d2b02b5036"), - Key.FromHex("e76bcc50c1cac70b6be1cce37139fd9badf884826eed1c8963426ca33175664a"), - Key.FromHex("9195313d0a9240c221b3be02fc979ae1c754abf7471299283d58577d9f6c20f5"), - Key.FromHex("11240f594192a1cbe2736dd6381d6a3abb51bd34c3edd34ba5c675c8c5238c65") + Key.FromHex("278e97176dcded4f2ae1737d94cdf89a912d35605ba75bb9405229277be6a571"), + Key.FromHex("93d27ebf14c0b49bcc3dcc7a2043c709af899f3fdb8e5b30f4476eb4460f2c9f"), + Key.FromHex("6622d9a0f291ea3b082a1d6f371ec33bd22e943f56e5f4cbada3d6c0c5de7a3c"), + Key.FromHex("06f9dd3d77ca9273fec225da076b0c167e2fbc8b2ce3bb8a714c83c7217ee632"), + Key.FromHex("f88785b89df395b182819b0c4ad4690d37491410902458f262d166c58af8fe0e"), + Key.FromHex("b1edd92e26ea5211c14df50cee75d28c747285a1b761d6d9ceaedbdf6570b9cd"), + Key.FromHex("6b4273b92ffbde6d8bec5a9f232b9051aadd36acd43bbbc8a950e0d725d19f49"), + Key.FromHex("ab815fd16a44964e9b50367cc44dcf5f860e49050d3afeacf685168120d18f72"), + Key.FromHex("16fef0a95c0d26c84591c5d182d7fd684b8631a38a439e5ce128427804b3baa1"), + Key.FromHex("f4e02d87a0342a4b7ad6cbe3d06c033d6081c00301e28e89a76eeb5075454d78"), + Key.FromHex("b8a0e039a64fc9757d54c0034224596895d437a7500566e87e90eadd983de827"), + Key.FromHex("d829da97b9a8304801dfd1ad625f4e6a294372eb9cb1125d37745fc7780c2a93"), + Key.FromHex("a3c624de57d508c7828184e9a96ad0ee483dd85b40b9aa7553b65419cacb21f9"), + Key.FromHex("8e3903e795c4731d6e7fdada6968dc2f3514394c05c6b40b3b3fde2afdd27de2"), + Key.FromHex("b5b635a3aa5bc52253520ea30725f7f5e939004600c577c3f7b66f4ba885ab03"), + Key.FromHex("dd49a46a00d59025b2c247aad1898adf38bdcf8109b9b979bb7e50fdcd795da0") }; public AuthKeys(DaemonConfig conf) diff --git a/Discreet/Daemon/Daemon.cs b/Discreet/Daemon/Daemon.cs index 538bb42..a4fa51a 100644 --- a/Discreet/Daemon/Daemon.cs +++ b/Discreet/Daemon/Daemon.cs @@ -17,6 +17,7 @@ using Discreet.Common.Serialize; using Discreet.Daemon.BlockAuth; using System.Threading.Channels; +using Discreet.DB; namespace Discreet.Daemon { @@ -37,7 +38,7 @@ public class Daemon Network.Handler handler; Network.Peerbloom.Network network; Network.MessageCache messageCache; - DB.DataView dataView; + IView dataView; DaemonConfig config; SQLiteWallet emissionsWallet; @@ -58,14 +59,14 @@ public class Daemon public bool RPCLive { get; set; } = false; - private object MintLocker = new(); + private SemaphoreSlim MintLocker = new(1, 1); public Daemon() { txpool = TXPool.GetTXPool(); network = Network.Peerbloom.Network.GetNetwork(); messageCache = Network.MessageCache.GetMessageCache(); - dataView = DB.DataView.GetView(); + dataView = BlockBuffer.Instance; config = DaemonConfig.GetConfig(); @@ -96,7 +97,7 @@ public async Task Restart() txpool = TXPool.GetTXPool(); network = Network.Peerbloom.Network.GetNetwork(); messageCache = Network.MessageCache.GetMessageCache(); - dataView = DB.DataView.GetView(); + dataView = BlockBuffer.Instance; config = DaemonConfig.GetConfig(); @@ -161,7 +162,7 @@ public async Task Start() } else { - BuildGenesis(); + await BuildGenesis(); } } @@ -170,6 +171,9 @@ public async Task Start() _ = _rpcServer.Start(); _ = Task.Factory.StartNew(async () => await ZMQ.Publisher.Instance.Start(DaemonConfig.GetConfig().ZMQPort.Value)); + Logger.Info($"Starting Block Buffer..."); + _ = Task.Factory.StartNew(async () => await DB.BlockBuffer.Instance.Start()); + await network.Start(); await network.Bootstrap(); if (syncFromPeers && IsBlockAuthority) @@ -433,13 +437,17 @@ public async Task Start() { var vcache = new DB.ValidationCache(messageCache.GetAllCachedBlocks(_beginHeight, _newHeight)); (exc, _beginHeight, var goodBlocks, var reget) = vcache.ValidateReturnFailures(); - + if (exc is AlreadyPresentException apex) + { + // ignore this + exc = null; + } if (exc != null && reget != null) { // first, flush valid blocks Logger.Error(exc.Message, exc); Logger.Error($"An invalid block was found during syncing (height {_beginHeight}). Re-requesting all future blocks (up to height {_newHeight}) and flushing valid blocks to disk"); - vcache.Flush(); + await vcache.Flush(goodBlocks, true); // publish to ZMQ and syncer queues if (goodBlocks != null && goodBlocks.Count > 0) @@ -490,7 +498,7 @@ public async Task Start() } else { - vcache.Flush(); + await vcache.Flush(); } } while (exc != null); } @@ -568,7 +576,7 @@ public async Task Start() beginningHeight++; }*/ - beginningHeight = dataView.GetChainHeight() + 1; + beginningHeight = BlockBuffer.Instance.GetChainHeight() + 1; while (!messageCache.BlockCache.IsEmpty) { @@ -581,7 +589,7 @@ public async Task Start() // re-request all blocks up to current minimum height var minheight = messageCache.BlockCache.Keys.Min(); - handler.LastSeenHeight = dataView.GetChainHeight(); + handler.LastSeenHeight = BlockBuffer.Instance.GetChainHeight(); toBeFulfilled = 0; missedItems.Clear(); usablePeers.Clear(); @@ -642,7 +650,7 @@ public async Task Start() try { - vCache.Flush(); + await vCache.Flush(); } catch (Exception e) { @@ -659,7 +667,8 @@ public async Task Start() ZMQ.Publisher.Instance.Publish(ZMQ_DAEMON_SYNC, zmqSyncF); Logger.Info("Fetching TXPool..."); - network.Send(messageCache.Versions.Keys.First(), new Network.Core.Packet(Network.Core.PacketType.GETPOOL, new Network.Core.Packets.GetPoolPacket())); + var conn2fetch = messageCache.Versions.Keys.FirstOrDefault() ?? network.OutboundConnectedPeers.Keys.FirstOrDefault(); + network.Send(conn2fetch, new Network.Core.Packet(Network.Core.PacketType.GETPOOL, new Network.Core.Packets.GetPoolPacket())); } if (IsBlockAuthority) @@ -710,7 +719,7 @@ public async Task Start() if (DaemonConfig.GetConfig().DbgConfig.CheckBlockchain.Value) { Logger.Debug("Checking for missing blocks..."); - var blockchainHeight = dataView.GetChainHeight(); + var blockchainHeight = BlockBuffer.Instance.GetChainHeight(); List missingBlocks = new(); for (long i = 0; i < blockchainHeight; i++) { @@ -754,6 +763,8 @@ public async Task TestnetMinter(TimeSpan interval, int n, int pid, ChannelReader bool paused = true; bool reloop = false; + HashSet produced = new HashSet(); + // we start off paused, waiting for the first call to us while (paused) { @@ -816,10 +827,9 @@ public async Task TestnetMinter(TimeSpan interval, int n, int pid, ChannelReader if (numProduced % n == pid) { - lock (MintLocker) - { - MintTestnetBlock(); - } + await MintLocker.WaitAsync(); + await MintTestnetBlock(produced); + MintLocker.Release(); } numProduced++; @@ -841,14 +851,13 @@ public async Task Minter() // // Mint(); //} - lock (MintLocker) - { - MintTestnet(); - } + await MintLocker.WaitAsync(); + await MintTestnet(); + MintLocker.Release(); } } - public void Mint() + public async Task Mint() { /*if (wallet.Addresses[0].Type != 0) { @@ -872,7 +881,7 @@ public void Mint() { Logger.Error($"Discreet.Mint: validating minted block resulted in error: {vErr.Message}", vErr); } - vCache.Flush(); + await vCache.Flush(); } catch (Exception e) { @@ -887,7 +896,7 @@ public void Mint() } } - public void MintTestnet() + public async Task MintTestnet() { try { @@ -904,7 +913,7 @@ public void MintTestnet() { Logger.Error($"Discreet.Mint: validating minted block resulted in error: {vErr.Message}", vErr); } - vCache.Flush(); + await vCache.Flush(); } catch (Exception e) { @@ -923,7 +932,7 @@ public void MintTestnet() } } - public void MintTestnetBlock() + public async Task MintTestnetBlock(HashSet debugProduced = null) { try { @@ -933,6 +942,18 @@ public void MintTestnetBlock() var txs = txpool.GetTransactionsForBlock(); var blk = Block.Build(txs, new StealthAddress(SQLiteWallet.Wallets["TESTNET_EMISSIONS"].Accounts[0].Address), sigKey); + if (debugProduced != null) + { + if (debugProduced.Contains(blk.Header.Height)) + { + Logger.Critical($"Discreet.Daemon: produced a block at a previously seen height!"); + } + else + { + debugProduced.Add(blk.Header.Height); + } + } + try { DB.ValidationCache vCache = new(blk); @@ -941,7 +962,7 @@ public void MintTestnetBlock() { Logger.Error($"Discreet.Mint: validating minted block resulted in error: {vErr.Message}", vErr); } - vCache.Flush(); + await vCache.Flush(); } catch (Exception e) { @@ -960,7 +981,7 @@ public void MintTestnetBlock() } } - public void BuildGenesis() + public async Task BuildGenesis() { /* time to build the megablock */ //Console.WriteLine("Please, enter the wallets and amounts (input stop token EXIT when finished)"); @@ -996,31 +1017,39 @@ public void BuildGenesis() }*/ //addresses.Add(new StealthAddress(wallet.Addresses[0].Address)); - addresses.Add(new StealthAddress(wallet.Accounts[0].Address)); - coins.Add(45_000_000_0_000_000_000UL); + try + { + addresses.Add(new StealthAddress(wallet.Accounts[0].Address)); + coins.Add(45_000_000_0_000_000_000UL); - Logger.Info("Creating genesis block..."); + Logger.Info("Creating genesis block..."); - var block = Block.BuildGenesis(addresses.ToArray(), coins.ToArray(), 4096, DefaultBlockAuth.Instance.Keyring.SigningKeys.First()); - DB.ValidationCache vCache = new DB.ValidationCache(block); - var exc = vCache.Validate(); - if (exc == null) - Logger.Info("Genesis block successfully created."); - else - throw new Exception($"Could not create genesis block: {exc}"); + var block = Block.BuildGenesis(addresses.ToArray(), coins.ToArray(), 4096, DefaultBlockAuth.Instance.Keyring.SigningKeys.First()); + DB.ValidationCache vCache = new DB.ValidationCache(block); + var exc = vCache.Validate(); + if (exc == null) + Logger.Info("Genesis block successfully created."); + else + throw new Exception($"Could not create genesis block: {exc}"); - try - { - vCache.Flush(); + try + { + await vCache.Flush(); + } + catch (Exception e) + { + Logger.Error(new DatabaseException("Discreet.Daemon.Daemon.ProcessBlock", e.Message).Message, e); + } + + ProcessBlock(block); + + Logger.Info("Successfully created the genesis block."); } catch (Exception e) { - Logger.Error(new DatabaseException("Discreet.Daemon.Daemon.ProcessBlock", e.Message).Message, e); + await Console.Out.WriteLineAsync($"{e.GetType().Name}: {e.Message}\n{e.StackTrace}"); + Environment.Exit(1); } - - ProcessBlock(block); - - Logger.Info("Successfully created the genesis block."); } public async Task BlockReceiver() diff --git a/Discreet/Daemon/TXPool.cs b/Discreet/Daemon/TXPool.cs index 591b5e2..18428de 100644 --- a/Discreet/Daemon/TXPool.cs +++ b/Discreet/Daemon/TXPool.cs @@ -7,6 +7,7 @@ using Discreet.Coin.Models; using Discreet.Common.Exceptions; using Discreet.Common.Serialize; +using Discreet.DB; namespace Discreet.Daemon { @@ -70,7 +71,7 @@ public void Deserialize(byte[] data) private ConcurrentDictionary orphanTxs; - private DB.DataView view; + private DB.IView view; public TXPool() { @@ -83,7 +84,7 @@ public TXPool() newOutputs = new(); updateNewOutputs = new(); - view = DB.DataView.GetView(); + view = BlockBuffer.Instance; //DB.DisDB db = DB.DisDB.GetDB(); diff --git a/Discreet/Discreet.csproj b/Discreet/Discreet.csproj index 252562e..3f9c396 100644 --- a/Discreet/Discreet.csproj +++ b/Discreet/Discreet.csproj @@ -6,7 +6,7 @@ AnyCPU;x64 x64 win-x64;linux-x64;osx-x64 - 0.2.48 + 0.2.57 Daemon_icon.ico diff --git a/Discreet/Entry.cs b/Discreet/Entry.cs index 22bda32..8eb0a3f 100644 --- a/Discreet/Entry.cs +++ b/Discreet/Entry.cs @@ -22,6 +22,13 @@ using Discreet.Common.Serialize; using Discreet.Network; using Discreet.Network.Core.Packets.Peerbloom; +using Discreet.Coin.Models; +using System.Text.Json; +using Discreet.Coin.Converters; +using Discreet.Common.Converters; +using System.Text.Json.Serialization; +using Discreet.Wallets; +using System.Collections.Concurrent; namespace Discreet { diff --git a/Discreet/Network/Handler.cs b/Discreet/Network/Handler.cs index 995a9cf..50c25c3 100644 --- a/Discreet/Network/Handler.cs +++ b/Discreet/Network/Handler.cs @@ -7,8 +7,10 @@ using System.Threading; using System.Threading.Tasks; using Discreet.Coin.Models; +using Discreet.DB; using Discreet.Network.Core; using Discreet.Network.Core.Packets; +using RocksDbSharp; namespace Discreet.Network { @@ -87,6 +89,8 @@ public class Handler public ConcurrentQueue<(Core.Packet, Peerbloom.Connection)> InboundPacketQueue = new(); + //private ConcurrentQueue<> _seenPropagation = new ConcurrentQueue(); + /* back reference to the Daemon */ public Daemon.Daemon daemon; @@ -123,7 +127,7 @@ public async Task NeededInventoryStart(CancellationToken token) } } - public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseconds = 0, Action callback = null) + public bool RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseconds = 0, Action callback = null) { bool success = NeededInventory.TryGetValue(req, out var reqset); if (!success || reqset == null) @@ -138,37 +142,65 @@ public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseco var timestamp = DateTime.UtcNow.Ticks; var durTicks = durMilliseconds * 10_000L; + bool needed = false; if (packet.GetType() == typeof(GetTransactionsPacket)) { var gettx = packet as GetTransactionsPacket; + var newGettxs = new List(); foreach (var tx in gettx.Transactions) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.Transaction, tx), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(tx); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } } + + gettx.Transactions = newGettxs.ToArray(); } else if (packet.GetType() == typeof(GetBlocksPacket)) { var gettx = packet as GetBlocksPacket; + var newGettxs = new List(); foreach (var block in gettx.Blocks) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.Block, block), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(block); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } } + + gettx.Blocks = newGettxs.ToArray(); } else if (packet.GetType() == typeof(GetHeadersPacket)) { var gettx = packet as GetHeadersPacket; + var newGettxs = new List(); if (gettx.Headers == null) { for (long i = gettx.StartingHeight; i < gettx.StartingHeight + gettx.Count; i++) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.BlockHeader, new Cipher.SHA256(i)), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(new Cipher.SHA256(i)); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } + } + + if (newGettxs.Count == gettx.Count) + { + newGettxs = null; } } else @@ -176,15 +208,27 @@ public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseco foreach (var header in gettx.Headers) { var ivref = new InventoryVectorRef(new InventoryVector(ObjectType.BlockHeader, header), reqset, callback, req); - reqset.Add(ivref); - InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + if (!reqset.Contains(ivref)) + { + newGettxs.Add(header); + needed = true; + reqset.Add(ivref); + InventoryTimeouts.TryAdd(ivref, (timestamp, durTicks)); + } } } + + if (newGettxs != null) + { + gettx.Headers = newGettxs.ToArray(); + } } else { Daemon.Logger.Error($"Handler.RegisterNeeded: cannot accept packet of type {packet.GetType()}"); } + + return needed; } internal (bool, List) CheckFulfillment(IPacketBody packet, IPEndPoint resp) @@ -280,6 +324,25 @@ public void RegisterNeeded(IPacketBody packet, IPEndPoint req, long durMilliseco }); return (true, fulfilled); } + else if (packet is NotFoundPacket p) + { + List notFounds = new List(); + foreach (var vec in p.Inventory) + { + var binv = new InventoryVectorRef(vec); + bool remsuccess = reqset.TryGetValue(binv, out var trueinv); + if (!remsuccess || trueinv == null) return (false, null); + notFounds.Add(trueinv); + } + + notFounds.ForEach(x => + { + reqset.Remove(x); + InventoryTimeouts.Remove(x, out _); + }); + + return (true, notFounds); + } else { Daemon.Logger.Error($"Handler.CheckFulfillment: cannot check packet of type {packet.GetType()}"); @@ -466,14 +529,14 @@ public bool IsParallel(Packet p) case PacketType.SENDTX: case PacketType.GETTXS: case PacketType.TXS: - case PacketType.BLOCKS: case PacketType.GETHEADERS: - case PacketType.HEADERS: case PacketType.GETPOOL: case PacketType.POOL: return true; case PacketType.SENDBLOCK: case PacketType.SENDBLOCKS: + case PacketType.BLOCKS: + case PacketType.HEADERS: case PacketType.ALERT: case PacketType.NONE: case PacketType.INVENTORY: @@ -716,7 +779,7 @@ public Core.Packets.Peerbloom.VersionPacket MakeVersionPacket() Version = Daemon.DaemonConfig.GetConfig().NetworkVersion.Value, Services = Services, Timestamp = DateTime.UtcNow.Ticks, - Height = DB.DataView.GetView().GetChainHeight(), + Height = BlockBuffer.Instance.GetChainHeight(), Port = Daemon.DaemonConfig.GetConfig().Port.Value, Syncing = State == PeerState.Syncing }; @@ -821,7 +884,7 @@ public async Task HandleVerAck(Core.Packets.Peerbloom.VerAck p, Peerbloom.Connec public async Task HandleGetBlocks(GetBlocksPacket p, IPEndPoint senderEndpoint) { - DB.DataView dataView = DB.DataView.GetView(); + IView dataView = BlockBuffer.Instance; List blocks = new(); List notFound = new(); @@ -891,7 +954,7 @@ public async Task HandleGetBlocks(GetBlocksPacket p, IPEndPoint senderEndpoint) public async Task HandleGetTxs(GetTransactionsPacket p, IPEndPoint senderEndpoint) { - DB.DataView dataView = DB.DataView.GetView(); + IView dataView = BlockBuffer.Instance; List txs = new(); List notFound = new(); @@ -1034,7 +1097,7 @@ public async Task HandleSendTx(SendTransactionPacket p, IPEndPoint senderEndpoin var txhash = p.Tx.TxID; /* sometimes a SendTx can occur as propagation for a recently added block */ - if (DB.DataView.GetView().ContainsTransaction(txhash)) + if (BlockBuffer.Instance.ContainsTransaction(txhash)) { Daemon.Logger.Debug($"HandleSendTx: Transaction received was in a previous block"); return; @@ -1155,7 +1218,9 @@ public async Task HandleSendBlock(SendBlockPacket p, Peerbloom.Connection conn, } else { - if (!DB.DataView.GetView().BlockExists(p.Block.Header.BlockHash)) + Daemon.Logger.Info($"HandleSendBlock: received block with height {p.Block.Header.Height}", verbose: 99999); + + if (!BlockBuffer.Instance.BlockExists(p.Block.Header.BlockHash)) { /* create validation cache and perform check */ DB.ValidationCache vCache = new DB.ValidationCache(p.Block); @@ -1165,17 +1230,26 @@ public async Task HandleSendBlock(SendBlockPacket p, Peerbloom.Connection conn, { if (!MessageCache.GetMessageCache().OrphanBlockParents.ContainsKey(p.Block.Header.PreviousBlock)) { + await MessageCache.GetMessageCache().OrphanLock.WaitAsync(); + Daemon.Logger.Warn($"HandleSendBlock: orphan block ({p.Block.Header.BlockHash.ToHexShort()}, height {p.Block.Header.Height}) added; querying {conn.Receiver} for previous block", verbose: 3); MessageCache.GetMessageCache().OrphanBlocks[p.Block.Header.PreviousBlock] = p.Block; - MessageCache.GetMessageCache().OrphanBlockParents[p.Block.Header.BlockHash] = 0; + MessageCache.GetMessageCache().OrphanBlockParents[p.Block.Header.BlockHash] = p.Block.Header.PreviousBlock; Peerbloom.Network.GetNetwork().SendRequest(conn, new Packet(PacketType.GETBLOCKS, new GetBlocksPacket { Blocks = new Cipher.SHA256[] { p.Block.Header.PreviousBlock } }), durationMilliseconds: 60000); + + MessageCache.GetMessageCache().OrphanLock.Release(); return; } else { + await MessageCache.GetMessageCache().OrphanLock.WaitAsync(); + Daemon.Logger.Warn($"HandleSendBlock: orphan block ({p.Block.Header.BlockHash.ToHexShort()}, height {p.Block.Header.Height}) added", verbose: 3); MessageCache.GetMessageCache().OrphanBlocks[p.Block.Header.PreviousBlock] = p.Block; - MessageCache.GetMessageCache().OrphanBlockParents[p.Block.Header.BlockHash] = 0; + MessageCache.GetMessageCache().OrphanBlockParents[p.Block.Header.BlockHash] = p.Block.Header.PreviousBlock; + CheckRoot(p.Block, conn); + + MessageCache.GetMessageCache().OrphanLock.Release(); return; } } @@ -1198,7 +1272,7 @@ public async Task HandleSendBlock(SendBlockPacket p, Peerbloom.Connection conn, /* accept block and propagate */ try { - vCache.Flush(); + await vCache.Flush(); } catch (Exception e) { @@ -1231,7 +1305,7 @@ public async Task HandleSendBlock(SendBlockPacket p, Peerbloom.Connection conn, OnTransactionReceived?.Invoke(new TransactionReceivedEventArgs { Tx = tx, Success = true }); } /* check orphan data and process accordingly */ - AcceptOrphans(p.Block.Header.BlockHash); + await AcceptOrphans(p.Block.Header.BlockHash); } else { @@ -1249,13 +1323,19 @@ public async Task HandleSendBlocks(SendBlocksPacket p, Peerbloom.Connection conn return; } - var propagate = p.Blocks.Any(x => !DB.DataView.GetView().BlockExists(x.Hash())); + // don't propagate endlessly + var propagate = p.Blocks.Where(x => !BlockBuffer.Instance.BlockExists(x.Hash())); + if (propagate.Any()) + { + propagate = propagate.Where(x => !MessageCache.GetMessageCache().OrphanBlocks.ContainsKey(x.Header.BlockHash)); + } + foreach (var block in p.Blocks.OrderBy(x => x.Header.Height)) { await HandleSendBlock(new SendBlockPacket { Block = block }, conn, false); } - if (propagate) Peerbloom.Network.GetNetwork().Broadcast(new Packet(PacketType.SENDBLOCKS, p)); + if (propagate.Any()) Peerbloom.Network.GetNetwork().Broadcast(new Packet(PacketType.SENDBLOCKS, p)); } public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) @@ -1269,7 +1349,7 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) } if (State == PeerState.Syncing) { - DB.DataView dataView = DB.DataView.GetView(); + IView dataView = BlockBuffer.Instance; fulfilled.Sort((x, y) => x.block.Header.Height.CompareTo(y.block.Header.Height)); foreach (var ivref in fulfilled) @@ -1307,7 +1387,10 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) Daemon.Logger.Error("HandleBlocks: queried peer returned more than one block (not syncing; orphan block most likely)"); return; } - if (DB.DataView.GetView().BlockExists(p.Blocks[0].Header.BlockHash)) + + Daemon.Logger.Info($"HandleBlocks: received block with height {p.Blocks[0].Header.Height}", verbose: 99999); + + if (BlockBuffer.Instance.BlockExists(p.Blocks[0].Header.BlockHash)) { Daemon.Logger.Error("HandleBlocks: queried peer returned an existing block; ignoring"); } @@ -1323,16 +1406,28 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) { Daemon.Logger.Warn($"HandleBlocks: orphan block ({p.Blocks[0].Header.BlockHash.ToHexShort()}, height {p.Blocks[0].Header.Height}) added; querying {conn.Receiver} for previous block", verbose: 3); + await MessageCache.GetMessageCache().OrphanLock.WaitAsync(); + MessageCache.GetMessageCache().OrphanBlocks[p.Blocks[0].Header.PreviousBlock] = p.Blocks[0]; - MessageCache.GetMessageCache().OrphanBlockParents[p.Blocks[0].Header.BlockHash] = 0; + MessageCache.GetMessageCache().OrphanBlockParents[p.Blocks[0].Header.BlockHash] = p.Blocks[0].Header.PreviousBlock; Peerbloom.Network.GetNetwork().SendRequest(conn, new Packet(PacketType.GETBLOCKS, new GetBlocksPacket { Blocks = new Cipher.SHA256[] { p.Blocks[0].Header.PreviousBlock } }), durationMilliseconds: 60000); + + MessageCache.GetMessageCache().OrphanLock.Release(); + return; } else { Daemon.Logger.Warn($"HandleBlocks: orphan block ({p.Blocks[0].Header.BlockHash.ToHexShort()}, height {p.Blocks[0].Header.Height}) added", verbose: 1); + + await MessageCache.GetMessageCache().OrphanLock.WaitAsync(); + MessageCache.GetMessageCache().OrphanBlocks[p.Blocks[0].Header.PreviousBlock] = p.Blocks[0]; - MessageCache.GetMessageCache().OrphanBlockParents[p.Blocks[0].Header.BlockHash] = 0; + MessageCache.GetMessageCache().OrphanBlockParents[p.Blocks[0].Header.BlockHash] = p.Blocks[0].Header.PreviousBlock; + CheckRoot(p.Blocks[0], conn); + + MessageCache.GetMessageCache().OrphanLock.Release(); + return; } } @@ -1341,15 +1436,21 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) Daemon.Logger.Error($"HandleBlocks: Malformed or invalid block received from peer {conn.Receiver}: {err.Message} (bogus block for orphan requirement)", err); /* for now assume invalid root always has invalid leaves */ + + await MessageCache.GetMessageCache().OrphanLock.WaitAsync(); TossOrphans(p.Blocks[0].Header.BlockHash); + MessageCache.GetMessageCache().OrphanLock.Release(); + return; } /* orphan data is valid; validate branch and publish changes */ Daemon.Logger.Info($"HandleBlocks: Root found for orphan branch beginning with block {p.Blocks[0].Header.BlockHash.ToHexShort()}", verbose: 1); + MessageCache.GetMessageCache().OrphanBlockParents.Remove(p.Blocks[0].Header.BlockHash, out _); + try { - vCache.Flush(); + await vCache.Flush(); } catch (Exception e) { @@ -1368,7 +1469,7 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) } /* recursively accept orphan blocks from message cache */ - AcceptOrphans(p.Blocks[0].Header.BlockHash); + await AcceptOrphans(p.Blocks[0].Header.BlockHash); //success fulfilled.ForEach(x => x.callback?.Invoke(x.peer, x.vector, true, RequestCallbackContext.SUCCESS)); @@ -1378,7 +1479,7 @@ public async Task HandleBlocks(BlocksPacket p, Peerbloom.Connection conn) public async Task HandleGetHeaders(GetHeadersPacket p, Peerbloom.Connection conn) { - DB.DataView dataView = DB.DataView.GetView(); + IView dataView = BlockBuffer.Instance; List headers = new(); List notFound = new(); @@ -1457,8 +1558,6 @@ public async Task HandleHeaders(HeadersPacket p, Peerbloom.Connection conn) } if (State == PeerState.Syncing) { - DB.DataView dataView = DB.DataView.GetView(); - fulfilled.Sort((x,y) => x.header.Height.CompareTo(y.header.Height)); foreach (var ivref in fulfilled) @@ -1530,46 +1629,77 @@ public void TossOrphans(Cipher.SHA256 bHash) } } + public void CheckRoot(Block block, Peerbloom.Connection conn) + { + MessageCache mCache = MessageCache.GetMessageCache(); + var hash = block.Header.PreviousBlock; + while (mCache.OrphanBlockParents.ContainsKey(hash)) + { + hash = mCache.OrphanBlockParents[hash]; + } + + // request previous block + Peerbloom.Network.GetNetwork().SendRequest(conn, new Packet(PacketType.GETBLOCKS, new GetBlocksPacket { Blocks = new Cipher.SHA256[] { hash } }), durationMilliseconds: 60000); + } + /// /// Accepts all blocks on an orphan branch, if any. /// /// - public void AcceptOrphans(Cipher.SHA256 bHash) + public async Task AcceptOrphans(Cipher.SHA256 bHash) { MessageCache mCache = MessageCache.GetMessageCache(); - while (mCache.OrphanBlocks.ContainsKey(bHash)) + + await mCache.OrphanLock.WaitAsync(); + + try { - mCache.OrphanBlocks.Remove(bHash, out var block); - DB.ValidationCache vCache = new DB.ValidationCache(block); - var err = vCache.Validate(); - if (err != null) + while (mCache.OrphanBlocks.ContainsKey(bHash)) { - Daemon.Logger.Error($"AcceptOrphans: Malformed or invalid block in orphan branch {bHash.ToHexShort()} (height {block.Header.Height}): {err.Message}; tossing branch", err); - TossOrphans(bHash); - return; - } + mCache.OrphanBlocks.Remove(bHash, out var block); + DB.ValidationCache vCache = new DB.ValidationCache(block); + var err = vCache.Validate(); + if (err is OrphanBlockException) + { + // simply return + Daemon.Logger.Info($"AcceptOrphans: orphan block remains at height {block.Header.Height}", verbose: 3); + return; + } + if (err != null) + { + Daemon.Logger.Error($"AcceptOrphans: Malformed or invalid block in orphan branch {bHash.ToHexShort()} (height {block.Header.Height}): {err.Message}; tossing branch", err); + TossOrphans(bHash); + return; + } - try - { - vCache.Flush(); - } - catch (Exception e) - { - Daemon.Logger.Error($"AcceptOrphans: an error was encountered while flushing validation cache for block at height {block.Header.Height}: {e.Message}", e); - } + Daemon.Logger.Info($"AcceptOrphans: accepted block with height {block.Header.Height}", verbose: 99999); - try - { - daemon.ProcessBlock(block); - } - catch (Exception e) - { - Daemon.Logger.Error($"AcceptOrphans: an error was encountered while processing block at height {block.Header.Height}: {e.Message}", e); - } + try + { + await vCache.Flush(); + } + catch (Exception e) + { + Daemon.Logger.Error($"AcceptOrphans: an error was encountered while flushing validation cache for block at height {block.Header.Height}: {e.Message}", e); + } - OnBlockSuccess?.Invoke(new BlockSuccessEventArgs { Block = block }); - bHash = block.Header.BlockHash; - mCache.OrphanBlockParents.Remove(bHash, out _); + try + { + daemon.ProcessBlock(block); + } + catch (Exception e) + { + Daemon.Logger.Error($"AcceptOrphans: an error was encountered while processing block at height {block.Header.Height}: {e.Message}", e); + } + + OnBlockSuccess?.Invoke(new BlockSuccessEventArgs { Block = block }); + bHash = block.Header.BlockHash; + mCache.OrphanBlockParents.Remove(bHash, out _); + } + } + finally + { + mCache.OrphanLock.Release(); } } } diff --git a/Discreet/Network/MessageCache.cs b/Discreet/Network/MessageCache.cs index a7fc308..aa5b157 100644 --- a/Discreet/Network/MessageCache.cs +++ b/Discreet/Network/MessageCache.cs @@ -7,6 +7,8 @@ using System.Net; using System.Collections.Concurrent; using Discreet.Coin.Models; +using Discreet.DB; +using System.Threading; namespace Discreet.Network { @@ -25,6 +27,7 @@ public static MessageCache GetMessageCache() } } + // TODO: February 27 2024 9:30 PM - look into LRU caching for these; simple finite size ConcurrentQueue would work. public ConcurrentBag Messages; public ConcurrentBag Rejections; public HashSet Alerts; @@ -36,7 +39,8 @@ public static MessageCache GetMessageCache() private long _headerMax = -1; public ConcurrentDictionary OrphanBlocks; - public ConcurrentDictionary OrphanBlockParents = new(new Cipher.SHA256EqualityComparer()); + public ConcurrentDictionary OrphanBlockParents = new(new Cipher.SHA256EqualityComparer()); + public readonly SemaphoreSlim OrphanLock = new SemaphoreSlim(1, 1); public MessageCache() { @@ -46,13 +50,13 @@ public MessageCache() Versions = new ConcurrentDictionary(); BadVersions = new ConcurrentDictionary(); BlockCache = new ConcurrentDictionary(); - OrphanBlocks = new ConcurrentDictionary(); + OrphanBlocks = new ConcurrentDictionary(new Cipher.SHA256EqualityComparer()); HeaderCache = new ConcurrentDictionary(); } public bool AddHeaderToCache(BlockHeader header) { - var dataView = DB.DataView.GetView(); + var dataView = BlockBuffer.Instance; var _curHeight = dataView.GetChainHeight(); if (header == null) return false; @@ -106,19 +110,19 @@ public bool AddHeaderToCache(BlockHeader header) { if (BlockCache.ContainsKey(blk.Header.Height)) { - Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} (height {blk.Header.Height}) already in database!"); + Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} (height {blk.Header.Height}) already in database!", verbose: 3); return (true, ""); } if (blk.Transactions == null || blk.Transactions.Length == 0 || blk.Header.NumTXs == 0) { - Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} has no transactions!"); + Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} has no transactions!", verbose: 2); return (false, "block has no transactions"); } if ((long)blk.Header.Timestamp > DateTime.UtcNow.AddHours(2).Ticks) { - Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} from too far in the future!"); + Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} from too far in the future!", verbose: 1); return (false, "block too far from future"); } @@ -127,14 +131,14 @@ public bool AddHeaderToCache(BlockHeader header) { if ((!tx.HasInputs() || !tx.HasOutputs()) && (tx.Version != 0)) { - Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} has a transaction without inputs or outputs!"); + Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} has a transaction without inputs or outputs!", verbose: 1); return (false, "invalid transactions"); } } if (blk.GetMerkleRoot() != blk.Header.MerkleRoot) { - Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} has invalid Merkle root"); + Daemon.Logger.Error($"AddBlockToCache: Block {blk.Header.BlockHash.ToHexShort()} has invalid Merkle root", verbose: 1); return (false, "invalid merkle root"); } diff --git a/Discreet/Network/Peerbloom/Network.cs b/Discreet/Network/Peerbloom/Network.cs index 15a700c..994d5fa 100644 --- a/Discreet/Network/Peerbloom/Network.cs +++ b/Discreet/Network/Peerbloom/Network.cs @@ -749,22 +749,23 @@ public bool SendRequest(Connection conn, Core.Packet packet, long durationMillis Daemon.Logger.Info($"Network.SendRequest: Sending request {packet.Header.Command} to {conn.Receiver}", verbose: 2); // hacky; make specific functions for sending packets which call this instead (in the future) + bool success = false; if (packet.Header.Command == Core.PacketType.GETBLOCKS) { - handler.RegisterNeeded((Core.Packets.GetBlocksPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); + success = handler.RegisterNeeded((Core.Packets.GetBlocksPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); } else if (packet.Header.Command == Core.PacketType.GETTXS) { - handler.RegisterNeeded((Core.Packets.GetTransactionsPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); + success = handler.RegisterNeeded((Core.Packets.GetTransactionsPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); } else if (packet.Header.Command == Core.PacketType.GETHEADERS) { - handler.RegisterNeeded((Core.Packets.GetHeadersPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); + success = handler.RegisterNeeded((Core.Packets.GetHeadersPacket)packet.Body, conn.Receiver, durationMilliseconds, callback); } - conn.Send(packet); + if (success) conn.Send(packet); - return true; + return success; } public bool Send(IPEndPoint endpoint, Core.Packet packet)