Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly stop importing blocks on system shutdown. #66

Merged
merged 16 commits into from
Apr 7, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions ImportBlocks/BlockImporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text.RegularExpressions;
using Akka.Actor;
using Neo.IO;
using Neo.Ledger;
using Neo.Network.P2P.Payloads;

namespace Neo.Plugins
{
public class BlockImporter : UntypedActor
{
public class StartImport { public IActorRef BlockchainActorRef; public Action OnComplete; }
private IActorRef _blockchainActorRef;
private const int BlocksPerBatch = 10;
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
private bool isImporting;
private IEnumerator<Block> blocksBeingImported;
jsolman marked this conversation as resolved.
Show resolved Hide resolved
private Action _doneAction;

protected override void OnReceive(object message)
{
switch (message)
{
case StartImport startImport:
if (isImporting) return;
isImporting = true;
_blockchainActorRef = startImport.BlockchainActorRef;
_doneAction = startImport.OnComplete;
blocksBeingImported = GetBlocksFromFile().GetEnumerator();
// Start the first import
Self.Tell(new Blockchain.ImportCompleted());
break;
case Blockchain.ImportCompleted _:
// Import the next batch
List<Block> blocksToImport = new List<Block>();
for (int i = 0; i < BlocksPerBatch; i++)
{
if (!blocksBeingImported.MoveNext())
break;
blocksToImport.Add(blocksBeingImported.Current);
}
if (blocksToImport.Count > 0)
_blockchainActorRef.Tell(new Blockchain.Import { Blocks = blocksToImport });
else
_doneAction();
break;
}
}

private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight)
{
if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight)
return true;
return false;
}

private static IEnumerable<Block> GetBlocks(Stream stream, bool read_start = false)
{
using (BinaryReader r = new BinaryReader(stream))
{
uint start = read_start ? r.ReadUInt32() : 0;
uint count = r.ReadUInt32();
uint end = start + count - 1;
if (end <= Blockchain.Singleton.Height) yield break;
for (uint height = start; height <= end; height++)
{
byte[] array = r.ReadBytes(r.ReadInt32());
if (!CheckMaxOnImportHeight(height)) yield break;
if (height > Blockchain.Singleton.Height)
{
Block block = array.AsSerializable<Block>();
yield return block;
}
}
}
}

private IEnumerable<Block> GetBlocksFromFile()
{
const string pathAcc = "chain.acc";
if (File.Exists(pathAcc))
using (FileStream fs = new FileStream(pathAcc, FileMode.Open, FileAccess.Read, FileShare.Read))
foreach (var block in GetBlocks(fs))
yield return block;

const string pathAccZip = pathAcc + ".zip";
if (File.Exists(pathAccZip))
using (FileStream fs = new FileStream(pathAccZip, FileMode.Open, FileAccess.Read, FileShare.Read))
using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read))
using (Stream zs = zip.GetEntry(pathAcc).Open())
foreach (var block in GetBlocks(zs))
yield return block;

var paths = Directory.EnumerateFiles(".", "chain.*.acc", SearchOption.TopDirectoryOnly).Concat(Directory.EnumerateFiles(".", "chain.*.acc.zip", SearchOption.TopDirectoryOnly)).Select(p => new
{
FileName = Path.GetFileName(p),
Start = uint.Parse(Regex.Match(p, @"\d+").Value),
IsCompressed = p.EndsWith(".zip")
}).OrderBy(p => p.Start);

foreach (var path in paths)
{
if (path.Start > Blockchain.Singleton.Height + 1) break;
if (path.IsCompressed)
using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read))
using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read))
using (Stream zs = zip.GetEntry(Path.GetFileNameWithoutExtension(path.FileName)).Open())
foreach (var block in GetBlocks(zs, true))
yield return block;
else
using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read))
foreach (var block in GetBlocks(fs, true))
yield return block;
}
}

public static Props Props()
{
return Akka.Actor.Props.Create(() => new BlockImporter());
}
}
}
88 changes: 9 additions & 79 deletions ImportBlocks/ImportBlocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,28 @@
using Neo.Network.P2P.Payloads;
using Neo.Persistence;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text.RegularExpressions;

namespace Neo.Plugins
{
public class ImportBlocks : Plugin
{
public ImportBlocks()
{
OnImport();
}

private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight)
private IActorRef _blockImporter;
public override void Configure()
{
if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight)
return true;
return false;
Settings.Load(GetConfiguration());
}

public override void Configure()
protected override void OnPluginsLoaded()
{
Settings.Load(GetConfiguration());
_blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props());
SuspendNodeStartup();
_blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete});
vncoelho marked this conversation as resolved.
Show resolved Hide resolved
}

private static IEnumerable<Block> GetBlocks(Stream stream, bool read_start = false)
private void OnImportComplete()
{
using (BinaryReader r = new BinaryReader(stream))
{
uint start = read_start ? r.ReadUInt32() : 0;
uint count = r.ReadUInt32();
uint end = start + count - 1;
if (end <= Blockchain.Singleton.Height) yield break;
for (uint height = start; height <= end; height++)
{
byte[] array = r.ReadBytes(r.ReadInt32());
if (!CheckMaxOnImportHeight(height)) yield break;
if (height > Blockchain.Singleton.Height)
{
Block block = array.AsSerializable<Block>();
yield return block;
}
}
}
ResumeNodeStartup();
}

private bool OnExport(string[] args)
Expand Down Expand Up @@ -138,52 +114,6 @@ private bool OnHelp(string[] args)
return true;
}

private async void OnImport()
{
SuspendNodeStartup();
const string path_acc = "chain.acc";
if (File.Exists(path_acc))
using (FileStream fs = new FileStream(path_acc, FileMode.Open, FileAccess.Read, FileShare.Read))
await System.Blockchain.Ask<Blockchain.ImportCompleted>(new Blockchain.Import
{
Blocks = GetBlocks(fs)
});
const string path_acc_zip = path_acc + ".zip";
if (File.Exists(path_acc_zip))
using (FileStream fs = new FileStream(path_acc_zip, FileMode.Open, FileAccess.Read, FileShare.Read))
using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read))
using (Stream zs = zip.GetEntry(path_acc).Open())
await System.Blockchain.Ask<Blockchain.ImportCompleted>(new Blockchain.Import
{
Blocks = GetBlocks(zs)
});
var paths = Directory.EnumerateFiles(".", "chain.*.acc", SearchOption.TopDirectoryOnly).Concat(Directory.EnumerateFiles(".", "chain.*.acc.zip", SearchOption.TopDirectoryOnly)).Select(p => new
{
FileName = Path.GetFileName(p),
Start = uint.Parse(Regex.Match(p, @"\d+").Value),
IsCompressed = p.EndsWith(".zip")
}).OrderBy(p => p.Start);
foreach (var path in paths)
{
if (path.Start > Blockchain.Singleton.Height + 1) break;
if (path.IsCompressed)
using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read))
using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read))
using (Stream zs = zip.GetEntry(Path.GetFileNameWithoutExtension(path.FileName)).Open())
await System.Blockchain.Ask<Blockchain.ImportCompleted>(new Blockchain.Import
{
Blocks = GetBlocks(zs, true)
});
else
using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read))
await System.Blockchain.Ask<Blockchain.ImportCompleted>(new Blockchain.Import
{
Blocks = GetBlocks(fs, true)
});
}
ResumeNodeStartup();
}

protected override bool OnMessage(object message)
{
if (!(message is string[] args)) return false;
Expand Down
4 changes: 2 additions & 2 deletions ImportBlocks/ImportBlocks.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace>Neo.Plugins</RootNamespace>
</PropertyGroup>

<ItemGroup>
<None Update="ImportBlocks\config.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
Expand All @@ -14,7 +14,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Neo" Version="2.10.0" />
<PackageReference Include="Neo" Version="2.10.1-CI00002" />
</ItemGroup>

</Project>