Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change BlockBuffer's buffer to ConcurrentQueue #109

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 29 additions & 42 deletions Discreet/DB/BlockBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ namespace Discreet.DB
/// </summary>
public class BlockBuffer : IView
{
private Channel<Block> _buffer;

public ChannelWriter<Block> Writer { get => _buffer.Writer; }
private ConcurrentQueue<Block> _buffer;

private static BlockBuffer _instance;

Expand Down Expand Up @@ -244,7 +242,7 @@ public TXOutput[] GetMixins(uint[] idxs)

public BlockBuffer()
{
_buffer = Channel.CreateUnbounded<Block>();
_buffer = new ConcurrentQueue<Block>();
_view = DataView.GetView();
}

Expand All @@ -257,9 +255,24 @@ public long GetChainHeight()
}
}

public async Task ForceFlush()
public void WriteToBuffer(Block blk)
{
_buffer.Enqueue(blk);
UpdateBuffers(blk);
}

public void ForceFlush()
{
await _buffer.Writer.WriteAsync(_signaler);
lock (buffer)
{
if (buffer.Count == 0)
{
return;
}

Flush(buffer);
buffer.Clear();
}
}

/// <summary>
Expand All @@ -270,49 +283,23 @@ public async Task Start()
{
_pIndex = DataView.GetView().GetOutputIndex();

_ = Task.Factory.StartNew(async () =>
{
var timer = new PeriodicTimer(_flushInterval);
while (await timer.WaitForNextTickAsync())
{
await _buffer.Writer.WriteAsync(_signaler);
}
});

await foreach(var block in _buffer.Reader.ReadAllAsync())
var timer = new PeriodicTimer(_flushInterval);
while (await timer.WaitForNextTickAsync())
{
if (block == _signaler)
lock (buffer)
{
lock (buffer)
while (_buffer.TryDequeue(out var block))
{
if (buffer.Count == 0)
{
continue;
}
buffer.Add(block);
}

Flush(buffer);

lock (buffer)
if (buffer.Count == 0)
{
buffer.Clear();
continue;
}
}
else
{
if (_flushEveryBlock)
{
Flush(new List<Block> { block });
}
else
{
lock (buffer)
{
buffer.Add(block);
}

UpdateBuffers(block);
}
Flush(buffer);
buffer.Clear();
}
}
}
Expand All @@ -325,7 +312,7 @@ public async Task Start()

public void ClearBlockCache() => _view.ClearBlockCache();

public void AddBlock(Block blk) => _buffer.Writer.TryWrite(blk);
public void AddBlock(Block blk) => _buffer.Enqueue(blk);

public Dictionary<long, Block> GetBlockCache() => _view.GetBlockCache();

Expand Down
8 changes: 4 additions & 4 deletions Discreet/DB/ValidationCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -626,20 +626,20 @@ public async Task Flush(List<Block> goodBlocks = null, bool force = false)
{
foreach (var blk in goodBlocks)
{
await BlockBuffer.Instance.Writer.WriteAsync(blk);
BlockBuffer.Instance.WriteToBuffer(blk);
}
}
else
{
foreach (var blk in blocks)
{
await BlockBuffer.Instance.Writer.WriteAsync(blk);
BlockBuffer.Instance.WriteToBuffer(blk);
}
}
}
else
{
await BlockBuffer.Instance.Writer.WriteAsync(block);
BlockBuffer.Instance.WriteToBuffer(block);
}

if (blocks != null)
Expand All @@ -664,7 +664,7 @@ public async Task Flush(List<Block> goodBlocks = null, bool force = false)
Daemon.TXPool.GetTXPool().UpdatePool(block.Transactions);
}

if (force) await BlockBuffer.Instance.ForceFlush();
if (force) BlockBuffer.Instance.ForceFlush();
}

public static async Task Process(Block block)
Expand Down
1 change: 1 addition & 0 deletions Discreet/Entry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using Discreet.Common.Converters;
using System.Text.Json.Serialization;
using Discreet.Wallets;
using System.Collections.Concurrent;

namespace Discreet
{
Expand Down
2 changes: 1 addition & 1 deletion Discreet/Network/MessageCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public MessageCache()
Versions = new ConcurrentDictionary<IPEndPoint, Core.Packets.Peerbloom.VersionPacket>();
BadVersions = new ConcurrentDictionary<IPEndPoint, Core.Packets.Peerbloom.VersionPacket>();
BlockCache = new ConcurrentDictionary<long, Block>();
OrphanBlocks = new ConcurrentDictionary<Cipher.SHA256, Block>();
OrphanBlocks = new ConcurrentDictionary<Cipher.SHA256, Block>(new Cipher.SHA256EqualityComparer());
HeaderCache = new ConcurrentDictionary<long, BlockHeader>();
}

Expand Down
Loading