diff --git a/android/com/mw/beam/beamwallet/core/entities/dto/NotificationDTO.java b/android/com/mw/beam/beamwallet/core/entities/dto/NotificationDTO.java index e899fa140..46cbf11f2 100644 --- a/android/com/mw/beam/beamwallet/core/entities/dto/NotificationDTO.java +++ b/android/com/mw/beam/beamwallet/core/entities/dto/NotificationDTO.java @@ -29,9 +29,9 @@ public enum State public enum Type // used to point Notification::Type in Wallet::switchOnOffNotifications() { - SoftwareUpdateAvailable, // 0 + Unused, // 0 AddressStatusChanged, - Unused, + WalletImplUpdateAvailable, BeamNews, TransactionFailed, TransactionCompleted diff --git a/android/jni.cpp b/android/jni.cpp index a7a615154..ee07409a9 100644 --- a/android/jni.cpp +++ b/android/jni.cpp @@ -56,15 +56,6 @@ namespace { static auto logger = Logger::create(LOG_LEVEL_DEBUG, LOG_LEVEL_DEBUG, LOG_LEVEL_DEBUG, "wallet_", (fs::path(appData) / fs::path("logs")).string()); - Rules::get().pForks[1].m_Height = 10; - Rules::get().pForks[2].m_Height = 20; - Rules::get().MaxRollback = 10; - Rules::get().CA.LockPeriod = 10; - Rules::get().Shielded.m_ProofMax.n = 4; - Rules::get().Shielded.m_ProofMax.M = 3; - Rules::get().Shielded.m_ProofMin.n = 4; - Rules::get().Shielded.m_ProofMin.M = 2; - Rules::get().Shielded.MaxWindowBacklog = 150; Rules::get().UpdateChecksum(); LOG_INFO() << "Beam Mobile Wallet " << appVersion << " (" << BRANCH_NAME << ") library: " << PROJECT_VERSION; LOG_INFO() << "Rules signature: " << Rules::get().get_SignatureStr(); @@ -73,12 +64,12 @@ namespace std::map initNotifications(bool initialValue) { return std::map { - { Notification::Type::SoftwareUpdateAvailable, initialValue }, - { Notification::Type::BeamNews, initialValue }, - { Notification::Type::WalletImplUpdateAvailable, false }, - { Notification::Type::TransactionCompleted, initialValue }, - { Notification::Type::TransactionFailed, initialValue }, - { Notification::Type::AddressStatusChanged, initialValue } + { Notification::Type::SoftwareUpdateAvailable, false }, + { Notification::Type::BeamNews, initialValue }, + { Notification::Type::WalletImplUpdateAvailable, initialValue }, + { Notification::Type::TransactionCompleted, initialValue }, + { Notification::Type::TransactionFailed, initialValue }, + { Notification::Type::AddressStatusChanged, initialValue } }; } @@ -601,7 +592,7 @@ JNIEXPORT void JNICALL BEAM_JAVA_WALLET_INTERFACE(switchOnOffExchangeRates)(JNIE JNIEXPORT void JNICALL BEAM_JAVA_WALLET_INTERFACE(switchOnOffNotifications)(JNIEnv *env, jobject thiz, jint notificationTypeEnum, jboolean isActive) { - if (notificationTypeEnum < static_cast(Notification::Type::SoftwareUpdateAvailable) + if (notificationTypeEnum <= static_cast(Notification::Type::SoftwareUpdateAvailable) || notificationTypeEnum > static_cast(Notification::Type::TransactionCompleted)) { LOG_ERROR() << "Notification type is not valid!!!"; diff --git a/android/wallet_model.cpp b/android/wallet_model.cpp index 41096670e..fea1fe5b8 100644 --- a/android/wallet_model.cpp +++ b/android/wallet_model.cpp @@ -163,23 +163,26 @@ namespace void callSoftwareUpdateNotification(JNIEnv* env, const Notification& notification, ChangeAction action) { - VersionInfo versionInfo; + WalletImplVerInfo walletVersionInfo; - if (fromByteBuffer(notification.m_content, versionInfo)) + if (fromByteBuffer(notification.m_content, walletVersionInfo)) { jobject jNotificationInfo = fillNotificationInfo(env, notification); jobject jVersionInfo = env->AllocObject(VersionInfoClass); { - setIntField(env, VersionInfoClass, jVersionInfo, "application", beam::underlying_cast(versionInfo.m_application)); - setLongField(env, VersionInfoClass, jVersionInfo, "versionMajor", versionInfo.m_version.m_major); - setLongField(env, VersionInfoClass, jVersionInfo, "versionMinor", versionInfo.m_version.m_minor); - setLongField(env, VersionInfoClass, jVersionInfo, "versionRevision", versionInfo.m_version.m_revision); + setIntField(env, VersionInfoClass, jVersionInfo, "application", beam::underlying_cast(walletVersionInfo.m_application)); + setLongField(env, VersionInfoClass, jVersionInfo, "versionMajor", walletVersionInfo.m_version.m_major); + setLongField(env, VersionInfoClass, jVersionInfo, "versionMinor", walletVersionInfo.m_version.m_minor); + setLongField(env, VersionInfoClass, jVersionInfo, "versionRevision", walletVersionInfo.m_UIrevision); } - jmethodID callback = env->GetStaticMethodID(WalletListenerClass, "onNewVersionNotification", "(IL" BEAM_JAVA_PATH "/entities/dto/NotificationDTO;L" BEAM_JAVA_PATH "/entities/dto/VersionInfoDTO;)V"); + if (walletVersionInfo.m_application == VersionInfo::Application::AndroidWallet) + { + jmethodID callback = env->GetStaticMethodID(WalletListenerClass, "onNewVersionNotification", "(IL" BEAM_JAVA_PATH "/entities/dto/NotificationDTO;L" BEAM_JAVA_PATH "/entities/dto/VersionInfoDTO;)V"); - env->CallStaticVoidMethod(WalletListenerClass, callback, action, jNotificationInfo, jVersionInfo); + env->CallStaticVoidMethod(WalletListenerClass, callback, action, jNotificationInfo, jVersionInfo); + } env->DeleteLocalRef(jNotificationInfo); env->DeleteLocalRef(jVersionInfo); @@ -569,8 +572,10 @@ void WalletModel::onNotificationsChanged(ChangeAction action, const std::vector< switch(notification.m_type) { case Notification::Type::SoftwareUpdateAvailable: - callSoftwareUpdateNotification(env, notification, action); - break; + break; + case Notification::Type::WalletImplUpdateAvailable: + callSoftwareUpdateNotification(env, notification, action); + break; case Notification::Type::AddressStatusChanged: callAddressStatusNotification(env, notification, action); break; diff --git a/beam/cli.cpp b/beam/cli.cpp index 43ec83864..fc6c11a0d 100644 --- a/beam/cli.cpp +++ b/beam/cli.cpp @@ -383,6 +383,17 @@ int main_impl(int argc, char* argv[]) }); } + if (vm.count(cli::MANUAL_ROLLBACK)) + { + Height h = vm[cli::MANUAL_ROLLBACK].as(); + if (h >= Rules::HeightGenesis) + node.get_Processor().ManualRollbackTo(h); + else + node.get_Processor().ForbidActiveAt(0); + + node.RefreshCongestions(); + } + reactor->run(); } } diff --git a/core/block_crypt.cpp b/core/block_crypt.cpp index 611ff9b77..aa518d697 100644 --- a/core/block_crypt.cpp +++ b/core/block_crypt.cpp @@ -2612,26 +2612,25 @@ namespace beam uint32_t N = Rules::get().CA.m_ProofCfg.get_N(); assert(N); - ECC::Hash::Value hv; - ECC::Hash::Processor() << skGen >> hv; + if (aid > N / 2) + { + ECC::Hash::Value hv; + ECC::Hash::Processor() << skGen >> hv; - uint32_t nPos; - hv.ExportWord<0>(nPos); - nPos %= N; // the position of this element in the list + uint32_t nPos; + hv.ExportWord<0>(nPos); + nPos %= N; // the position of this element in the list - if (aid > nPos) - { - // TODO: don't exceed the max current asset count, for this we must query it - m_Begin = aid - nPos; - } - else - { - m_Begin = 0; - nPos = aid; + if (aid > nPos) + { + // TODO: don't exceed the max current asset count, for this we must query it + m_Begin = aid - nPos; + return nPos; + } } - assert(m_Begin + nPos == aid); - return nPos; + m_Begin = 0; + return aid; } bool Asset::Proof::IsValid(ECC::Point::Native& hGen, ECC::InnerProduct::BatchContext& bc, ECC::Scalar::Native* pKs) const diff --git a/core/fly_client.cpp b/core/fly_client.cpp index 3cd13210c..643984f7d 100644 --- a/core/fly_client.cpp +++ b/core/fly_client.cpp @@ -756,6 +756,10 @@ bool FlyClient::NetworkStd::Connection::IsSupported(RequestEvents& req) void FlyClient::NetworkStd::Connection::OnRequestData(RequestEvents& req) { + req.m_Max = (LoginFlags::Extension::get(m_LoginFlags) >= 5) ? + proto::Event::s_Max : + proto::Event::s_Max0; + } bool FlyClient::NetworkStd::Connection::IsSupported(RequestTransaction& req) @@ -915,5 +919,14 @@ void FlyClient::NetworkStd::Connection::OnMsg(BbsMsg&& msg) } } +void FlyClient::NetworkStd::Connection::OnMsg(EventsSerif&& msg) +{ + if (!(Flags::Owned & m_Flags)) + ThrowUnexpected(); + + // TODO: handle complex situation, where multiple owned nodes are connected + m_This.m_Client.OnEventsSerif(msg.m_Value, msg.m_Height); +} + } // namespace proto } // namespace beam diff --git a/core/fly_client.h b/core/fly_client.h index 8c384cf73..1a1cbda2d 100644 --- a/core/fly_client.h +++ b/core/fly_client.h @@ -22,6 +22,16 @@ namespace beam { namespace proto { + namespace details { + + template struct ExtraData { + }; + + template <> struct ExtraData { + uint32_t m_Max = proto::Event::s_Max; + }; + } + struct FlyClient { #define REQUEST_TYPES_All(macro) \ @@ -67,7 +77,10 @@ namespace proto { }; #define THE_MACRO(type, msgOut, msgIn) \ - struct Request##type :public Request { \ + struct Request##type \ + :public Request \ + ,public details::ExtraData \ + { \ typedef boost::intrusive_ptr Ptr; \ Request##type() :m_Msg(Zero), m_Res(Zero) {} \ virtual ~Request##type() {} \ @@ -87,6 +100,7 @@ namespace proto { virtual void get_OwnerKdf(Key::IPKdf::Ptr&) {} // get the owner kdf. Optional virtual Block::SystemState::IHistory& get_History() = 0; virtual void OnOwnedNode(const PeerID&, bool bUp) {} + virtual void OnEventsSerif(const ECC::Hash::Value&, Height) {} struct IBbsReceiver { @@ -216,6 +230,7 @@ namespace proto { virtual void OnMsg(proto::ProofCommonState&& msg) override; virtual void OnMsg(proto::ProofChainWork&& msg) override; virtual void OnMsg(proto::BbsMsg&& msg) override; + virtual void OnMsg(proto::EventsSerif&& msg) override; #define THE_MACRO(type, msgOut, msgIn) \ virtual void OnMsg(proto::msgIn&&) override; \ bool IsSupported(Request##type&); \ diff --git a/core/proto.cpp b/core/proto.cpp index 590d896f1..4ca65b5a0 100644 --- a/core/proto.cpp +++ b/core/proto.cpp @@ -608,10 +608,39 @@ void NodeConnection::OnMsg(SChannelInitiate&& msg) OnConnectedSecure(); } +void LoginFlags::Extension::set(uint32_t& nFlags, uint32_t nExt) +{ + assert(!(nFlags & Msk)); + + if (nExt < nBitsLegacy) + nExt = (1 << nExt) - 1; + else + nExt = ((nExt - nBitsLegacy + 1) << nBitsLegacy) - 1; + + nFlags |= nExt << nShift; +} + +uint32_t LoginFlags::Extension::get(uint32_t nFlags) +{ + uint32_t val = (Msk & nFlags) >> nShift; + + const uint32_t nLegacyVal = (1 << nBitsLegacy) - 1; + if (nLegacyVal == (val & nLegacyVal)) + return ((val - nLegacyVal) >> nBitsLegacy) + nBitsLegacy; + + // find 1st zero bit + uint32_t iBit = 0; + for (; iBit < nBitsLegacy - 1; iBit++) + if (!(1 & (val >> iBit))) + break; + + return iBit; +} + void NodeConnection::SendLogin() { Login msg; - msg.m_Flags = LoginFlags::ExtensionsAll; + LoginFlags::Extension::set(msg.m_Flags, LoginFlags::Extension::Maximum); SetupLogin(msg); const Rules& r = Rules::get(); @@ -730,23 +759,15 @@ void NodeConnection::OnMsg(Login&& msg) void NodeConnection::OnLoginInternal(Login&& msg) { - if (LoginFlags::ExtensionsBeforeHF1 != (LoginFlags::ExtensionsBeforeHF1 & msg.m_Flags)) - { - LOG_WARNING() << "Peer " << m_Connection->peer_address() << " uses legacy protocol"; - ThrowUnexpected("Legacy", NodeProcessingException::Type::Incompatible); - } + uint32_t nExt = LoginFlags::Extension::get(msg.m_Flags); + if (LoginFlags::Extension::Maximum != nExt) + { + bool bNewer = (nExt > LoginFlags::Extension::Maximum); + LOG_WARNING() << "Peer " << m_Connection->peer_address() << " uses " << (bNewer ? "newer" : "older") << " ext: " << nExt; - if ((~LoginFlags::Recognized) & msg.m_Flags) { - LOG_WARNING() << "Peer " << m_Connection->peer_address() << " Uses newer protocol."; - } - else - { - const uint32_t nMask = LoginFlags::ExtensionsAll; - uint32_t nFlags2 = nMask & msg.m_Flags; - if (nFlags2 != nMask) { - LOG_WARNING() << "Peer " << m_Connection->peer_address() << " Uses older protocol: " << nFlags2; - } - } + if (nExt < LoginFlags::Extension::Minimum) + ThrowUnexpected("Legacy", NodeProcessingException::Type::Incompatible); + } OnLogin(std::move(msg)); } diff --git a/core/proto.h b/core/proto.h index 7cdf96947..b269f443b 100644 --- a/core/proto.h +++ b/core/proto.h @@ -220,6 +220,10 @@ namespace proto { #define BeamNodeMsg_Events(macro) \ macro(ByteBuffer, Events) +#define BeamNodeMsg_EventsSerif(macro) \ + macro(ECC::Hash::Value, Value) \ + macro(Height, Height) \ + #define BeamNodeMsg_GetBlockFinalization(macro) \ macro(Height, Height) \ macro(Amount, Fees) @@ -293,6 +297,7 @@ namespace proto { macro(0x2c, GetEvents) \ macro(0x2d, EventsLegacy) \ macro(0x34, Events) \ + macro(0x37, EventsSerif) \ macro(0x2e, GetBlockFinalization) \ macro(0x2f, BlockFinalization) \ /* tx broadcast and replication */ \ @@ -317,21 +322,27 @@ namespace proto { static const uint32_t Bbs = 0x2; // I'm spreading bbs messages static const uint32_t SendPeers = 0x4; // Please send me periodically peers recommendations static const uint32_t MiningFinalization = 0x8; // I want to finalize block construction for my owned node - static const uint32_t Extension1 = 0x10; // Supports Bbs with POW, more advanced proof/disproof scheme for SPV clients (?) - static const uint32_t Extension2 = 0x20; // Supports large HdrPack, BlockPack with parameters - static const uint32_t Extension3 = 0x40; // Supports Login1, Status (former Boolean) for NewTransaction result, compatible with Fork H1 - static const uint32_t Extension4 = 0x80; // Supports proto::Events (replaces proto::EventsLegacy) - static const uint32_t Recognized = 0xff; + struct Extension + { + static const uint32_t nShift = 4; // 1st 4 bits are occupied by flags specified above + static const uint32_t nBitsLegacy = 4; // 1st 4 bits are set consequently for each new version + static const uint32_t nBitsExtra = 8; + + static const uint32_t Msk = ((1 << (nBitsLegacy + nBitsExtra)) - 1) << nShift; + + // 1 - Supports Bbs with POW, more advanced proof/disproof scheme for SPV clients (?) + // 2 - Supports large HdrPack, BlockPack with parameters + // 3 - Supports Login1, Status (former Boolean) for NewTransaction result, compatible with Fork H1 + // 4 - Supports proto::Events (replaces proto::EventsLegacy) + // 5 - Supports Events serif, max num of events per message increased from 64 to 1024 - static const uint32_t ExtensionsBeforeHF1 = - Extension1 | - Extension2 | - Extension3; + static const uint32_t Minimum = 3; + static const uint32_t Maximum = 5; - static const uint32_t ExtensionsAll = - ExtensionsBeforeHF1 | - Extension4; + static void set(uint32_t& nFlags, uint32_t nExt); + static uint32_t get(uint32_t nFlags); + }; }; struct IDType @@ -345,7 +356,8 @@ namespace proto { struct Event { - static const uint32_t s_Max = 64; // will send more, if the remaining events are on the same height + static const uint32_t s_Max0 = 64; + static const uint32_t s_Max = 1024; // will send more, if the remaining events are on the same height #define BeamEventsAll(macro) \ macro(1, Utxo) \ diff --git a/core/shielded.cpp b/core/shielded.cpp index 889cbcd71..87d648eca 100644 --- a/core/shielded.cpp +++ b/core/shielded.cpp @@ -334,7 +334,7 @@ namespace beam set_kG(hvShared, kTmp); } - void ShieldedTxo::Data::OutputParams::Generate(ShieldedTxo& txo, const ECC::Hash::Value& hvShared, ECC::Oracle& oracle) + void ShieldedTxo::Data::OutputParams::Generate(ShieldedTxo& txo, const ECC::Hash::Value& hvShared, ECC::Oracle& oracle, bool bHideAssetAlways /* = false */) { ECC::Scalar::Native pExtra[2]; @@ -361,7 +361,7 @@ namespace beam cp.m_Blob.n = sizeof(p); ECC::Scalar::Native skSign = m_k; - if (m_AssetID) + if (m_AssetID || bHideAssetAlways) { ECC::Scalar::Native skGen; get_skGen(skGen, hvShared); @@ -448,9 +448,9 @@ namespace beam ///////////// // Params (both) - void ShieldedTxo::Data::Params::GenerateOutp(ShieldedTxo& txo, ECC::Oracle& oracle) + void ShieldedTxo::Data::Params::GenerateOutp(ShieldedTxo& txo, ECC::Oracle& oracle, bool bHideAssetAlways /* = false */) { - m_Output.Generate(txo, m_Ticket.m_SharedSecret, oracle); + m_Output.Generate(txo, m_Ticket.m_SharedSecret, oracle, bHideAssetAlways); } bool ShieldedTxo::Data::Params::Recover(const ShieldedTxo& txo, ECC::Oracle& oracle, const Viewer& v) { diff --git a/core/shielded.h b/core/shielded.h index 3618bf281..e5d175ded 100644 --- a/core/shielded.h +++ b/core/shielded.h @@ -74,7 +74,7 @@ namespace beam ECC::Scalar::Native m_k; User m_User; - void Generate(ShieldedTxo&, const ECC::Hash::Value& hvShared, ECC::Oracle&); + void Generate(ShieldedTxo&, const ECC::Hash::Value& hvShared, ECC::Oracle&, bool bHideAssetAlways = false); bool Recover(const ShieldedTxo&, const ECC::Hash::Value& hvShared, ECC::Oracle&); void Restore_kG(const ECC::Hash::Value& hvShared); // restores m_k, all other members must be set @@ -94,7 +94,7 @@ namespace beam TicketParams m_Ticket; OutputParams m_Output; - void GenerateOutp(ShieldedTxo&, ECC::Oracle&); + void GenerateOutp(ShieldedTxo&, ECC::Oracle&, bool bHideAssetAlways = false); bool Recover(const ShieldedTxo&, ECC::Oracle&, const Viewer&); }; diff --git a/core/unittest/ecc_test.cpp b/core/unittest/ecc_test.cpp index 330d5fb77..eddc8f4c9 100644 --- a/core/unittest/ecc_test.cpp +++ b/core/unittest/ecc_test.cpp @@ -1620,6 +1620,30 @@ void TestDifficulty() } } +void TestProtoVer() +{ + using namespace beam; + + const uint32_t nMskFlags = ~proto::LoginFlags::Extension::Msk; + + for (uint32_t nVer = 0; nVer < 200; nVer++) + { + uint32_t nFlags = 0; + proto::LoginFlags::Extension::set(nFlags, nVer); + verify_test(!(nFlags & nMskFlags)); // should not leak + + uint32_t nVer2 = proto::LoginFlags::Extension::get(nFlags); + verify_test(nVer == nVer2); + + nFlags = nMskFlags; + proto::LoginFlags::Extension::set(nFlags, nVer); + verify_test((nFlags & nMskFlags) == nMskFlags); // should not loose flags + + nVer2 = proto::LoginFlags::Extension::get(nFlags); + verify_test(nVer == nVer2); + } +} + void TestRandom() { PseudoRandomGenerator::Scope scopePrg(nullptr); // restore std @@ -1834,22 +1858,8 @@ void TestLelantus(bool bWithAsset) for (uint32_t iCycle = 0; iCycle < 3; iCycle++) { - struct MyExec - :public beam::ExecutorMT - { - uint32_t m_Threads; - - virtual uint32_t get_Threads() override { return m_Threads; } - - virtual void RunThread(uint32_t iThread) override - { - ExecutorMT::Context ctx; - ctx.m_iThread = iThread; - RunThreadCtx(ctx); - } - } ex; - - ex.m_Threads = 1 << iCycle; + beam::ExecutorMT ex; + ex.set_Threads(1 << iCycle); beam::Executor::Scope scope(ex); @@ -1861,7 +1871,7 @@ void TestLelantus(bool bWithAsset) p.Generate(Zero, oracle, &hGen); if (!bWithAsset) - printf("\tProof time = %u ms, Threads=%u\n", beam::GetTime_ms() - t, ex.m_Threads); + printf("\tProof time = %u ms, Threads=%u\n", beam::GetTime_ms() - t, ex.get_Threads()); // serialization beam::Serializer ser_; @@ -2128,6 +2138,7 @@ void TestAll() TestKdf(); TestBbs(); TestDifficulty(); + TestProtoVer(); TestRandom(); TestFourCC(); TestTreasury(); diff --git a/explorer/explorer_node.cpp b/explorer/explorer_node.cpp index a14089161..fae3ed666 100644 --- a/explorer/explorer_node.cpp +++ b/explorer/explorer_node.cpp @@ -201,7 +201,7 @@ void setup_node(Node& node, const Options& o) { node.m_Cfg.m_Listen.port(o.nodeListenTo.port()); node.m_Cfg.m_Listen.ip(o.nodeListenTo.ip()); node.m_Cfg.m_MiningThreads = 0; - node.m_Cfg.m_VerificationThreads = 1; + node.m_Cfg.m_VerificationThreads = -1; node.m_Keys.m_pOwner = o.ownerKey; diff --git a/node/db.h b/node/db.h index 3c1625923..e102175b6 100644 --- a/node/db.h +++ b/node/db.h @@ -49,7 +49,7 @@ class NodeDB SyncTarget, // deprecated Deprecated_2, Treasury, - DummyID, // hash of keys used to create UTXOs (owner key, dummy key) + EventsOwnerID, // hash of keys used to scan and record events HeightTxoLo, // Height starting from which and below Txo info is totally erased. HeightTxoHi, // Height starting from which and below Txo infi is compacted, only the commitment is left SyncData, @@ -59,6 +59,8 @@ class NodeDB ShieldedInputs, AssetsCount, // Including unused. The last element is guaranteed to be used. AssetsCountUsed, // num of 'live' assets + EventsSerif, // pseudo-random, reset each time the events are rescanned. + ForbiddenState, }; }; diff --git a/node/node.cpp b/node/node.cpp index 24abcc350..1f5a89c93 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -401,6 +401,9 @@ bool Node::TryAssignTask(Task& t, Peer& p) t.m_nCount = std::min(static_cast(msg.m_CountExtra), m_Cfg.m_BandwidthCtl.m_MaxBodyPackCount) + 1; // just an estimate, the actual num of blocks can be smaller m_nTasksPackBody += t.m_nCount; + + t.m_h0 = m_Processor.m_SyncData.m_h0; + t.m_hTxoLo = m_Processor.m_SyncData.m_TxoLo; } else { @@ -549,15 +552,31 @@ void Node::Processor::FlushInsanePeers() void Node::Processor::DeleteOutdated() { TxPool::Fluff& txp = get_ParentObj().m_TxPool; - for (TxPool::Fluff::Queue::iterator it = txp.m_Queue.begin(); txp.m_Queue.end() != it; ) + + Height h = get_ParentObj().m_Cfg.m_RollbackLimit.m_Max; + std::setmin(h, Rules::get().MaxRollback); + + if (m_Cursor.m_ID.m_Height > h) + { + h = m_Cursor.m_ID.m_Height - h; + + while (!txp.m_setOutdated.empty()) + { + TxPool::Fluff::Element& x = txp.m_setOutdated.begin()->get_ParentObj(); + if (x.m_Outdated.m_Height > h) + break; + + txp.Delete(x); + } + } + + for (TxPool::Fluff::ProfitSet::iterator it = txp.m_setProfit.begin(); txp.m_setProfit.end() != it; ) { TxPool::Fluff::Element& x = (it++)->get_ParentObj(); - if (!x.m_pValue) - continue; Transaction& tx = *x.m_pValue; - if (proto::TxStatus::Ok != ValidateTxContextEx(tx, x.m_Threshold.m_Height, true)) - txp.Delete(x); + if (proto::TxStatus::Ok != ValidateTxContextEx(tx, x.m_Height, true)) + txp.SetOutdated(x, m_Cursor.m_ID.m_Height); } } @@ -615,6 +634,24 @@ void Node::Processor::OnNewState() get_ParentObj().MaybeGenerateRecovery(); } +void Node::Processor::OnFastSyncSucceeded() +{ + // update Events serif + ECC::Hash::Value hv; + Blob blob(hv); + if (!get_DB().ParamGet(NodeDB::ParamID::EventsSerif, nullptr, &blob)) + return; //?! + + get_DB().ParamSet(NodeDB::ParamID::EventsSerif, &m_Extra.m_TxoHi, &blob); + + for (PeerList::iterator it = get_ParentObj().m_lstPeers.begin(); get_ParentObj().m_lstPeers.end() != it; it++) + { + Peer& peer = *it; + peer.m_Flags &= ~Peer::Flags::SerifSent; + peer.MaybeSendSerif(); + } +} + void Node::MaybeGenerateRecovery() { if (!m_PostStartSynced || m_Cfg.m_Recovery.m_sPathOutput.empty() || !m_Cfg.m_Recovery.m_Granularity) @@ -665,13 +702,25 @@ void Node::Processor::OnRolledBack() { LOG_INFO() << "Rolled back to: " << m_Cursor.m_ID; - // Delete shielded txs which referenced shielded outputs which were reverted TxPool::Fluff& txp = get_ParentObj().m_TxPool; - for (TxPool::Fluff::Queue::iterator it = txp.m_Queue.begin(); txp.m_Queue.end() != it; ) + while (!txp.m_setOutdated.empty()) + { + TxPool::Fluff::Element& x = txp.m_setOutdated.rbegin()->get_ParentObj(); + if (x.m_Outdated.m_Height <= m_Cursor.m_ID.m_Height) + break; + + txp.SetOutdated(x, MaxHeight); // may be deferred by the next loop + } + + // Shielded txs that referenced shielded outputs which were reverted - must be reprocessed + for (TxPool::Fluff::ProfitSet::iterator it = txp.m_setProfit.begin(); txp.m_setProfit.end() != it; ) { TxPool::Fluff::Element& x = (it++)->get_ParentObj(); - if (x.m_pValue && !IsShieldedInPool(*x.m_pValue)) - txp.Delete(x); + if (!IsShieldedInPool(*x.m_pValue)) + { + get_ParentObj().OnTransactionDeferred(std::move(x.m_pValue), nullptr, true); + get_ParentObj().m_TxPool.DeleteEmpty(x); + } } TxPool::Stem& txps = get_ParentObj().m_Dandelion; @@ -689,18 +738,6 @@ void Node::Processor::OnRolledBack() pObserver->OnRolledBack(m_Cursor.m_ID); } -uint32_t Node::Processor::MyExecutorMT::get_Threads() -{ - Config& cfg = get_ParentObj().get_ParentObj().m_Cfg; // alias - - if (cfg.m_VerificationThreads < 0) - // use all the cores, don't subtract 'mining threads'. Verification has higher priority - cfg.m_VerificationThreads = std::thread::hardware_concurrency(); - - uint32_t nThreads = cfg.m_VerificationThreads; - return std::max(nThreads, 1U); -} - void Node::Processor::MyExecutorMT::RunThread(uint32_t iThread) { MyExecutor::MyContext ctx; @@ -770,6 +807,18 @@ void Node::Processor::get_ViewerKeys(ViewerKeys& vk) vk.m_pSh = &get_ParentObj().m_Keys.m_vSh.front(); } +Height Node::Processor::get_MaxAutoRollback() +{ + Height h = NodeProcessor::get_MaxAutoRollback(); + if (m_Cursor.m_Full.m_Height >= Rules::HeightGenesis) + { + Timestamp ts_s = getTimestamp(); + if (ts_s < m_Cursor.m_Full.m_TimeStamp + get_ParentObj().m_Cfg.m_RollbackLimit.m_TimeoutSinceTip_s) + std::setmin(h, get_ParentObj().m_Cfg.m_RollbackLimit.m_Max); + } + return h; +} + void Node::Processor::OnEvent(Height h, const proto::Event::Base& evt) { if (get_ParentObj().m_Cfg.m_LogEvents) @@ -857,6 +906,12 @@ void Node::Keys::SetSingleKey(const Key::IKdf::Ptr& pKdf) void Node::Initialize(IExternalPOW* externalPOW) { + if (m_Cfg.m_VerificationThreads < 0) + // use all the cores, don't subtract 'mining threads'. Verification has higher priority + m_Cfg.m_VerificationThreads = m_Processor.m_ExecutorMT.get_Threads(); + + m_Processor.m_ExecutorMT.set_Threads(std::max(m_Cfg.m_VerificationThreads, 1U)); + m_Processor.m_Horizon = m_Cfg.m_Horizon; m_Processor.Initialize(m_Cfg.m_sPathLocal.c_str(), m_Cfg.m_ProcessorParams); @@ -994,15 +1049,24 @@ void Node::RefreshOwnedUtxos() hp >> hv0; Blob blob(hv1); - m_Processor.get_DB().ParamGet(NodeDB::ParamID::DummyID, NULL, &blob); + m_Processor.get_DB().ParamGet(NodeDB::ParamID::EventsOwnerID, NULL, &blob); - if (hv0 == hv1) - return; // unchanged + bool bChanged = (hv0 != hv1); + if (bChanged) + { + // changed + m_Processor.RescanOwnedTxos(); - m_Processor.RescanOwnedTxos(); + blob = Blob(hv0); + m_Processor.get_DB().ParamSet(NodeDB::ParamID::EventsOwnerID, NULL, &blob); + } - blob = Blob(hv0); - m_Processor.get_DB().ParamSet(NodeDB::ParamID::DummyID, NULL, &blob); + if (bChanged || !m_Processor.get_DB().ParamGet(NodeDB::ParamID::EventsSerif, nullptr, &blob)) + { + hv0 = NextNonce(); + blob = Blob(hv0); + m_Processor.get_DB().ParamSet(NodeDB::ParamID::EventsSerif, &m_Processor.m_Extra.m_TxoHi, &blob); + } } bool Node::Bbs::IsInLimits() const @@ -1206,6 +1270,8 @@ void Node::Peer::OnMsg(proto::Authentication&& msg) } } + MaybeSendSerif(); + if (proto::IDType::Node != msg.m_IDType) return; @@ -1925,6 +1991,13 @@ bool Node::Peer::GetBlock(proto::BodyBuffers& out, const NodeDB::StateID& sid, c return true; } +bool Node::Peer::ShouldAcceptBodyPack() +{ + Task& t = get_FirstTask(); + const NodeProcessor::SyncData& d = m_This.m_Processor.m_SyncData; // alias + return (t.m_h0 == d.m_h0) && (t.m_hTxoLo == d.m_TxoLo); +} + void Node::Peer::OnMsg(proto::Body&& msg) { Task& t = get_FirstTask(); @@ -1940,7 +2013,9 @@ void Node::Peer::OnMsg(proto::Body&& msg) Processor& p = m_This.m_Processor; // alias NodeProcessor::DataStatus::Enum eStatus = h ? - p.OnBlock(id, msg.m_Body.m_Perishable, msg.m_Body.m_Eternal, m_pInfo->m_ID.m_Key) : + ShouldAcceptBodyPack() ? + p.OnBlock(id, msg.m_Body.m_Perishable, msg.m_Body.m_Eternal, m_pInfo->m_ID.m_Key) : + NodeProcessor::DataStatus::Rejected : p.OnTreasury(msg.m_Body.m_Eternal); p.TryGoUpAsync(); @@ -1973,7 +2048,7 @@ void Node::Peer::OnMsg(proto::BodyPack&& msg) ModifyRatingWrtData(nSize); NodeProcessor::DataStatus::Enum eStatus = NodeProcessor::DataStatus::Rejected; - if (!msg.m_Bodies.empty()) + if (!msg.m_Bodies.empty() && ShouldAcceptBodyPack()) { const uint64_t* pPtr = p.get_CachedRows(t.m_sidTrg, hCountExtra); if (pPtr) @@ -2019,15 +2094,75 @@ void Node::Peer::OnMsg(proto::NewTransaction&& msg) ThrowUnexpected(); // our deserialization permits NULL Ptrs. // However the transaction body must have already been checked for NULLs - if (msg.m_Fluff) - m_This.OnTransactionFluff(std::move(msg.m_Transaction), this, NULL); - else + // Change in protocol! + // Historically the node replied with proto::Status message for transactions in stem phase only, for fluff phase no reply was necessary. + // Now we reply regardless to phase, but ONLY to non-node client (means - FlyClient). + // For other cases the tx verification is deferred, until the node is idle. + bool bReply = !(Flags::PiRcvd & m_Flags); + + const PeerID* pSender = m_pInfo ? &m_pInfo->m_ID.m_Key : nullptr; + + if (bReply) { proto::Status msgOut; - msgOut.m_Value = m_This.OnTransactionStem(std::move(msg.m_Transaction), this); - + msgOut.m_Value = m_This.OnTransaction(std::move(msg.m_Transaction), pSender, msg.m_Fluff); Send(msgOut); } + else + { + m_This.OnTransactionDeferred(std::move(msg.m_Transaction), pSender, msg.m_Fluff); + } +} + +void Node::OnTransactionDeferred(Transaction::Ptr&& pTx, const PeerID* pSender, bool bFluff) +{ + TxDeferred::Element txd; + txd.m_pTx = std::move(pTx); + txd.m_Fluff = bFluff; + + if (pSender) + txd.m_Sender = *pSender; + else + txd.m_Sender = Zero; + + //if (m_Cfg.m_LogTxFluff) + //{ + // Transaction::KeyType key; + // txd.m_pTx->get_Key(key); + + // LOG_INFO() << "Tx " << key << " deferred"; + //} + + if (m_TxDeferred.m_lst.empty()) + m_TxDeferred.start(); + else + { + while (m_TxDeferred.m_lst.size() > m_Cfg.m_MaxDeferredTransactions) + m_TxDeferred.m_lst.pop_front(); + } + + m_TxDeferred.m_lst.push_back(std::move(txd)); +} + +void Node::TxDeferred::OnSchedule() +{ + if (!m_lst.empty()) + { + TxDeferred::Element& x = m_lst.front(); + get_ParentObj().OnTransaction(std::move(x.m_pTx), &x.m_Sender, x.m_Fluff); + m_lst.pop_front(); + } + + if (m_lst.empty()) + cancel(); + +} + +uint8_t Node::OnTransaction(Transaction::Ptr&& pTx, const PeerID* pSender, bool bFluff) +{ + return bFluff ? + OnTransactionFluff(std::move(pTx), pSender, nullptr) : + OnTransactionStem(std::move(pTx)); } uint8_t Node::ValidateTx(Transaction::Context& ctx, const Transaction& tx) @@ -2142,7 +2277,7 @@ uint32_t Node::RandomUInt32(uint32_t threshold) return threshold; } -uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx, const Peer* pPeer) +uint8_t Node::OnTransactionStem(Transaction::Ptr&& ptx) { TxStats s; ptx->get_Reader().AddStats(s); @@ -2498,7 +2633,7 @@ Height Node::SampleDummySpentHeight() return h; } -bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPool::Stem::Element* pElem) +uint8_t Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const PeerID* pSender, TxPool::Stem::Element* pElem) { Transaction::Ptr ptx; ptx.swap(ptxArg); @@ -2514,7 +2649,7 @@ bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPo m_Dandelion.Delete(*pElem); if (!bValid) - return false; + return proto::TxStatus::InvalidContext; } else { @@ -2535,7 +2670,7 @@ bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPo TxPool::Fluff::TxSet::iterator it = m_TxPool.m_setTxs.find(key); if (m_TxPool.m_setTxs.end() != it) - return true; + return proto::TxStatus::Ok; const Transaction& tx = *ptx; @@ -2546,14 +2681,17 @@ bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPo LogTx(tx, nCode, key.m_Key); if (proto::TxStatus::Ok != nCode) { - return false; // stupid compiler insists on parentheses here! + return nCode; // stupid compiler insists on parentheses here! } TxPool::Fluff::Element* pNewTxElem = m_TxPool.AddValidTx(std::move(ptx), ctx, key.m_Key); - while (m_TxPool.m_setProfit.size() > m_Cfg.m_MaxPoolTransactions) + while (m_TxPool.m_setProfit.size() + m_TxPool.m_setOutdated.size() > m_Cfg.m_MaxPoolTransactions) { - TxPool::Fluff::Element& txDel = m_TxPool.m_setProfit.rbegin()->get_ParentObj(); + TxPool::Fluff::Element& txDel = m_TxPool.m_setOutdated.empty() ? + m_TxPool.m_setProfit.rbegin()->get_ParentObj() : + m_TxPool.m_setOutdated.begin()->get_ParentObj(); + if (&txDel == pNewTxElem) pNewTxElem = nullptr; // Anti-spam protection: in case the maximum pool capacity is reached - ensure this tx is any better BEFORE broadcasting ti @@ -2561,7 +2699,7 @@ bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPo } if (!pNewTxElem) - return false; + return nCode; // though the tx is dropped, we return status ok. proto::HaveTransaction msgOut; msgOut.m_ID = key.m_Key; @@ -2569,7 +2707,7 @@ bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPo for (PeerList::iterator it2 = m_lstPeers.begin(); m_lstPeers.end() != it2; it2++) { Peer& peer = *it2; - if (&peer == pPeer) + if (pSender && peer.m_pInfo && (peer.m_pInfo->m_ID.m_Key == *pSender)) continue; if (!(peer.m_LoginFlags & proto::LoginFlags::SpreadingTransactions) || peer.IsChocking()) continue; @@ -2581,7 +2719,7 @@ bool Node::OnTransactionFluff(Transaction::Ptr&& ptxArg, const Peer* pPeer, TxPo if (m_Miner.IsEnabled() && !m_Miner.m_pTaskToFinalize) m_Miner.SetTimer(m_Cfg.m_Timeout.m_MiningSoftRestart_ms, false); - return true; + return nCode; } void Node::Dandelion::OnTimedOut(Element& x) @@ -2642,6 +2780,7 @@ void Node::Peer::OnLogin(proto::Login&& msg) } m_LoginFlags = msg.m_Flags; + MaybeSendSerif(); if (b != ShouldFinalizeMining()) { // stupid compiler insists on parentheses! @@ -2710,7 +2849,7 @@ void Node::Peer::BroadcastTxs() SetTxCursor(&itNext->get_ParentObj()); - if (!m_pCursorTx->m_pValue) + if (!m_pCursorTx->m_pValue || m_pCursorTx->IsOutdated()) continue; // already deleted proto::HaveTransaction msgOut; @@ -2752,6 +2891,23 @@ void Node::Peer::BroadcastBbs() m_CursorBbs = wlk.m_ID; } +void Node::Peer::MaybeSendSerif() +{ + if (!(Flags::Viewer & m_Flags) || (Flags::SerifSent & m_Flags)) + return; + + if (proto::LoginFlags::Extension::get(m_LoginFlags) < 5) + return; + + proto::EventsSerif msg; + + Blob blob(msg.m_Value); + m_This.m_Processor.get_DB().ParamGet(NodeDB::ParamID::EventsSerif, &msg.m_Height, &blob); + + Send(msg); + m_Flags |= Flags::SerifSent; +} + void Node::Peer::OnMsg(proto::HaveTransaction&& msg) { TxPool::Fluff::Element::Tx key; @@ -3355,6 +3511,9 @@ void Node::Peer::OnMsg(proto::GetEvents&& msg) Height hLast = 0; uint32_t nCount = 0; + // we'll send up to s_Max num of events, even to older clients, they won't complain + static_assert(proto::Event::s_Max > proto::Event::s_Max0); + Serializer ser; for (db.EnumEvents(wlk, msg.m_HeightMin); wlk.MoveNext(); hLast = wlk.m_Height) @@ -3368,7 +3527,7 @@ void Node::Peer::OnMsg(proto::GetEvents&& msg) ser & wlk.m_Height; ser.WriteRaw(wlk.m_Body.p, wlk.m_Body.n); - nCount++; + nCount++; } ser.swap_buf(msgOut.m_Events); @@ -3376,7 +3535,7 @@ void Node::Peer::OnMsg(proto::GetEvents&& msg) else LOG_WARNING() << "Peer " << m_RemoteAddr << " Unauthorized Utxo events request."; - if (proto::LoginFlags::Extension4 & m_LoginFlags) + if (proto::LoginFlags::Extension::get(m_LoginFlags) >= 4) { Send(msgOut); } @@ -4375,8 +4534,9 @@ void Node::PrintTxos() if (m_Processor.IsFastSync()) os << "Note: Fast-sync is in progress. Data is preliminary and not fully verified yet." << std::endl; - if (m_Processor.m_Extra.m_TxoHi >= Rules::HeightGenesis) - os << "Note: Cut-through up to Height=" << m_Processor.m_Extra.m_TxoHi << ", Txos spent earlier may be missing. To recover them too please make full sync." << std::endl; + Height hSerif0 = m_Processor.get_DB().ParamIntGetDef(NodeDB::ParamID::EventsSerif); + if (hSerif0 >= Rules::HeightGenesis) + os << "Note: Cut-through up to Height=" << hSerif0 << ", Txos spent earlier may be missing. To recover them too please make full sync." << std::endl; NodeDB::WalkerEvent wlk; for (m_Processor.get_DB().EnumEvents(wlk, Rules::HeightGenesis - 1); wlk.MoveNext(); ) diff --git a/node/node.h b/node/node.h index f58a93d92..c68490a2d 100644 --- a/node/node.h +++ b/node/node.h @@ -72,6 +72,7 @@ struct Node uint32_t m_MaxConcurrentBlocksRequest = 18; uint32_t m_MaxPoolTransactions = 100 * 1000; + uint32_t m_MaxDeferredTransactions = 100 * 1000; uint32_t m_MiningThreads = 0; // by default disabled bool m_LogEvents = false; // may be insecure. Off by default. @@ -83,6 +84,14 @@ struct Node // negative: number of cores minus number of mining threads. int m_VerificationThreads = 0; + struct RollbackLimit + { + Height m_Max = 60; // artificial restriction on how much the node will rollback automatically + uint32_t m_TimeoutSinceTip_s = 3600; // further rollback is possible after this timeout since the current tip + // in either case it's no more than Rules::MaxRollback + + } m_RollbackLimit; + struct Bbs { uint32_t m_MessageTimeout_s = 3600 * 12; // 1/2 day @@ -197,6 +206,8 @@ struct Node bool GenerateRecoveryInfo(const char*); void PrintTxos(); + void RefreshCongestions(); // call explicitly if manual rollback or forbidden state is modified + bool DecodeAndCheckHdrs(std::vector&, const proto::HdrPack&); private: @@ -210,16 +221,17 @@ struct Node void OnNewState() override; void OnRolledBack() override; void OnModified() override; + void OnFastSyncSucceeded() override; void get_ViewerKeys(ViewerKeys&) override; void OnEvent(Height, const proto::Event::Base&) override; void OnDummy(const CoinID&, Height) override; void InitializeUtxosProgress(uint64_t done, uint64_t total) override; + Height get_MaxAutoRollback() override; void Stop(); struct MyExecutorMT :public ExecutorMT { - virtual uint32_t get_Threads() override; virtual void RunThread(uint32_t) override; ~MyExecutorMT() { Stop(); } @@ -270,6 +282,8 @@ struct Node uint32_t m_nCount; uint32_t m_TimeAssigned_ms; NodeDB::StateID m_sidTrg; + Height m_h0; // those 2 are fast-sync params at the moment of task assignment + Height m_hTxoLo; Peer* m_pOwner; bool operator < (const Task& t) const { return (m_Key < t.m_Key); } @@ -349,7 +363,27 @@ struct Node IMPLEMENT_GET_PARENT_OBJ(Node, m_Dandelion) } m_Dandelion; - uint8_t OnTransactionStem(Transaction::Ptr&&, const Peer*); + struct TxDeferred + :public io::IdleEvt + { + struct Element + { + Transaction::Ptr m_pTx; + PeerID m_Sender; + bool m_Fluff; + }; + + std::list m_lst; + + virtual void OnSchedule() override; + + IMPLEMENT_GET_PARENT_OBJ(Node, m_TxDeferred) + } m_TxDeferred; + + uint8_t OnTransaction(Transaction::Ptr&&, const PeerID*, bool bFluff); + void OnTransactionDeferred(Transaction::Ptr&&, const PeerID*, bool bFluff); + uint8_t OnTransactionStem(Transaction::Ptr&&); + uint8_t OnTransactionFluff(Transaction::Ptr&&, const PeerID*, Dandelion::Element*); void OnTransactionAggregated(Dandelion::Element&); void PerformAggregation(Dandelion::Element&); void AddDummyInputs(Transaction&); @@ -357,7 +391,6 @@ struct Node bool AddDummyInputEx(Transaction& tx, const CoinID&); void AddDummyOutputs(Transaction&); Height SampleDummySpentHeight(); - bool OnTransactionFluff(Transaction::Ptr&&, const Peer*, Dandelion::Element*); uint8_t ValidateTx(Transaction::Context&, const Transaction&); // complete validation void LogTx(const Transaction&, uint8_t nStatus, const Transaction::KeyType&); @@ -463,6 +496,7 @@ struct Node static const uint16_t PiRcvd = 0x002; static const uint16_t Owner = 0x004; static const uint16_t Probe = 0x008; + static const uint16_t SerifSent = 0x010; static const uint16_t Finalizing = 0x080; static const uint16_t HasTreasury = 0x100; static const uint16_t Chocking = 0x200; @@ -503,6 +537,7 @@ struct Node void BroadcastTxs(); void BroadcastBbs(); void BroadcastBbs(Bbs::Subscription&); + void MaybeSendSerif(); void OnChocking(); void SetTxCursor(TxPool::Fluff::Element*); bool GetBlock(proto::BodyBuffers&, const NodeDB::StateID&, const proto::GetBodyPack&, bool bActive); @@ -511,6 +546,7 @@ struct Node bool ShouldAssignTasks(); bool ShouldFinalizeMining(); Task& get_FirstTask(); + bool ShouldAcceptBodyPack(); void OnFirstTaskDone(); void OnFirstTaskDone(NodeProcessor::DataStatus::Enum); void ModifyRatingWrtData(size_t nSize); @@ -578,8 +614,6 @@ struct Node Peer* AllocPeer(const beam::io::Address&); - void RefreshCongestions(); - struct Server :public proto::NodeConnection::Server { diff --git a/node/processor.cpp b/node/processor.cpp index d558d8ab3..16812576c 100644 --- a/node/processor.cpp +++ b/node/processor.cpp @@ -177,6 +177,12 @@ void NodeProcessor::Initialize(const char* szPath, const StartParams& sp) if (sp.m_Vacuum) Vacuum(); + blob = m_sidForbidden.m_Hash; + if (m_DB.ParamGet(NodeDB::ParamID::ForbiddenState, &m_sidForbidden.m_Height, &blob)) + LogForbiddenState(); + else + ResetForbiddenStateVar(); + TryGoUp(); } @@ -275,6 +281,17 @@ void NodeProcessor::LogSyncData() LOG_INFO() << "Fast-sync mode up to height " << m_SyncData.m_Target.m_Height; } +void NodeProcessor::LogForbiddenState() +{ + LOG_INFO() << "Forbidden state: " << m_sidForbidden; +} + +void NodeProcessor::ResetForbiddenStateVar() +{ + m_sidForbidden.m_Height = MaxHeight; // don't set it to 0, it may interfer with treasury in RequestData() + m_sidForbidden.m_Hash = Zero; +} + void NodeProcessor::SaveSyncData() { if (IsFastSync()) @@ -693,12 +710,17 @@ const uint64_t* NodeProcessor::get_CachedRows(const NodeDB::StateID& sid, Height return nullptr; } -Height NodeProcessor::get_LowestReturnHeight() const +Height NodeProcessor::get_MaxAutoRollback() { - Height hRet = m_Extra.m_TxoHi; + return Rules::get().MaxRollback; +} + +Height NodeProcessor::get_LowestReturnHeight() +{ + Height hRet = std::max(m_Extra.m_TxoHi, m_Extra.m_Fossil); Height h0 = IsFastSync() ? m_SyncData.m_h0 : m_Cursor.m_ID.m_Height; - Height hMaxRollback = Rules::get().MaxRollback; + Height hMaxRollback = get_MaxAutoRollback(); if (h0 > hMaxRollback) { @@ -711,14 +733,17 @@ Height NodeProcessor::get_LowestReturnHeight() const void NodeProcessor::RequestDataInternal(const Block::SystemState::ID& id, uint64_t row, bool bBlock, const NodeDB::StateID& sidTrg) { - if (id.m_Height >= get_LowestReturnHeight()) - { - RequestData(id, bBlock, sidTrg); + if (id.m_Height < get_LowestReturnHeight()) { + LOG_WARNING() << id << " State unreachable"; // probably will pollute the log, but it's a critical situation anyway + return; } - else - { - LOG_WARNING() << id << " State unreachable!"; // probably will pollute the log, but it's a critical situation anyway + + if (id == m_sidForbidden) { + LOG_WARNING() << id << " State forbidden"; + return; } + + RequestData(id, bBlock, sidTrg); } struct NodeProcessor::MultiSigmaContext @@ -1252,8 +1277,10 @@ struct NodeProcessor::MultiblockContext nTasks = ex.Flush(nTasks - 1); } - m_InProgress.m_Max++; - assert(m_InProgress.m_Max == pShared->m_Ctx.m_Height.m_Min); + // The following won't hold if some blocks in the current range were already verified in the past, and omitted from the current verification + // m_InProgress.m_Max++; + // assert(m_InProgress.m_Max == pShared->m_Ctx.m_Height.m_Min); + m_InProgress.m_Max = pShared->m_Ctx.m_Height.m_Min; bool bFull = (pShared->m_Ctx.m_Height.m_Min > m_This.m_SyncData.m_Target.m_Height); @@ -1595,6 +1622,8 @@ void NodeProcessor::OnFastSyncOver(MultiblockContext& mbc, bool& bContextFail) ZeroObject(m_SyncData); SaveSyncData(); + + OnFastSyncSucceeded(); } } @@ -2120,6 +2149,17 @@ struct NodeProcessor::KrnFlyMmr bool NodeProcessor::HandleBlock(const NodeDB::StateID& sid, const Block::SystemState::Full& s, MultiblockContext& mbc) { + if (s.m_Height == m_sidForbidden.m_Height) + { + Merkle::Hash hv; + s.get_Hash(hv); + if (hv == m_sidForbidden.m_Hash) + { + LOG_WARNING() << LogSid(m_DB, sid) << " Forbidden"; + return false; + } + } + ByteBuffer bbP, bbE; m_DB.GetStateBlock(sid.m_Row, &bbP, &bbE, nullptr); @@ -3677,6 +3717,71 @@ void NodeProcessor::RollbackTo(Height h) OnRolledBack(); } +bool NodeProcessor::ForbidActiveAt(Height h) +{ + if (h >= Rules::HeightGenesis) + { + if (m_Cursor.m_Sid.m_Height < h) + { + LOG_WARNING() << "Can't forbid a state above cursor"; + return false; + } + + NodeDB::StateID sid; + sid.m_Height = h; + sid.m_Row = FindActiveAtStrict(sid.m_Height); + m_DB.get_StateID(sid, m_sidForbidden); + + Blob blob = m_sidForbidden.m_Hash; + m_DB.ParamSet(NodeDB::ParamID::ForbiddenState, &m_sidForbidden.m_Height, &blob); + LogForbiddenState(); + } + else + { + LOG_INFO() << "Forbidden state reset"; + m_DB.ParamSet(NodeDB::ParamID::ForbiddenState, nullptr, nullptr); + ResetForbiddenStateVar(); + } + + return true; +} + +void NodeProcessor::ManualRollbackTo(Height h) +{ + LOG_INFO() << "Manual rollback to " << h << "..."; + + bool bChanged = false; + + if (IsFastSync() && (m_SyncData.m_Target.m_Height > h)) + { + LOG_INFO() << "Fast-sync abort..."; + + RollbackTo(m_SyncData.m_h0); + DeleteBlocksInRange(m_SyncData.m_Target, m_SyncData.m_h0); + + ZeroObject(m_SyncData); + SaveSyncData(); + + bChanged = true; + } + + if (h < m_Extra.m_TxoHi) + { + LOG_INFO() << "Can't go below Height " << m_Extra.m_TxoHi; + h = m_Extra.m_TxoHi; + } + + if (m_Cursor.m_ID.m_Height > h) + { + ForbidActiveAt(h + 1); + RollbackTo(h); + bChanged = true; + } + + if (bChanged) + OnNewState(); +} + NodeProcessor::DataStatus::Enum NodeProcessor::OnStateInternal(const Block::SystemState::Full& s, Block::SystemState::ID& id, bool bAlreadyChecked) { s.get_ID(id); @@ -4164,7 +4269,7 @@ size_t NodeProcessor::GenerateNewBlockInternal(BlockContext& bc, BlockInterpretC Transaction& tx = *x.m_pValue; - bool bDelete = !x.m_Threshold.m_Height.IsInRange(bic.m_Height); + bool bDelete = !x.m_Height.IsInRange(bic.m_Height); if (!bDelete) { assert(!bic.m_LimitExceeded); @@ -4187,7 +4292,7 @@ size_t NodeProcessor::GenerateNewBlockInternal(BlockContext& bc, BlockInterpretC } if (bDelete) - bc.m_TxPool.Delete(x); // isn't available in this context + bc.m_TxPool.SetOutdated(x, h); // isn't available in this context } LOG_INFO() << "GenerateNewBlock: size of block = " << ssc.m_Counter.m_Value << "; amount of tx = " << nTxNum; diff --git a/node/processor.h b/node/processor.h index e32c31d0c..fdba91cee 100644 --- a/node/processor.h +++ b/node/processor.h @@ -181,6 +181,9 @@ class NodeProcessor NodeProcessor(); virtual ~NodeProcessor(); + bool ForbidActiveAt(Height); + void ManualRollbackTo(Height); + struct Horizon { // branches behind this are pruned @@ -237,6 +240,10 @@ class NodeProcessor } m_SyncData; + Block::SystemState::ID m_sidForbidden; + void LogForbiddenState(); + void ResetForbiddenStateVar(); + bool IsFastSync() const { return m_SyncData.m_Target.m_Row != 0; } void SaveSyncData(); @@ -316,7 +323,7 @@ class NodeProcessor void OnFastSyncOver(MultiblockContext&, bool& bContextFail); // Lowest height to which it's possible to rollback. - Height get_LowestReturnHeight() const; + Height get_LowestReturnHeight(); static bool IsRemoteTipNeeded(const Block::SystemState::Full& sTipRemote, const Block::SystemState::Full& sTipMy); @@ -326,6 +333,8 @@ class NodeProcessor virtual void OnRolledBack() {} virtual void OnModified() {} virtual void InitializeUtxosProgress(uint64_t done, uint64_t total) {} + virtual void OnFastSyncSucceeded() {} + virtual Height get_MaxAutoRollback(); struct MyExecutor :public Executor diff --git a/node/txpool.cpp b/node/txpool.cpp index e37bf3b51..593da50e3 100644 --- a/node/txpool.cpp +++ b/node/txpool.cpp @@ -48,14 +48,14 @@ TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, con Element* p = new Element; p->m_pValue = std::move(pValue); - p->m_Threshold.m_Height = ctx.m_Height; + p->m_Height = ctx.m_Height; p->m_Profit.m_Fee = ctx.m_Stats.m_Fee; p->m_Profit.SetSize(*p->m_pValue); p->m_Tx.m_Key = key; + p->m_Outdated.m_Height = MaxHeight; + assert(!p->IsOutdated()); - m_setThreshold.insert(p->m_Threshold); - m_setProfit.insert(p->m_Profit); - m_setTxs.insert(p->m_Tx); + InternalInsert(*p); p->m_Queue.m_Refs = 1; m_Queue.push_back(p->m_Queue); @@ -63,15 +63,46 @@ TxPool::Fluff::Element* TxPool::Fluff::AddValidTx(Transaction::Ptr&& pValue, con return p; } +void TxPool::Fluff::SetOutdated(Element& x, Height h) +{ + InternalErase(x); + x.m_Outdated.m_Height = h; + InternalInsert(x); +} + +void TxPool::Fluff::InternalInsert(Element& x) +{ + if (x.IsOutdated()) + m_setOutdated.insert(x.m_Outdated); + else + { + m_setTxs.insert(x.m_Tx); + m_setProfit.insert(x.m_Profit); + } +} + +void TxPool::Fluff::InternalErase(Element& x) +{ + if (x.IsOutdated()) + m_setOutdated.erase(OutdatedSet::s_iterator_to(x.m_Outdated)); + else + { + m_setTxs.erase(TxSet::s_iterator_to(x.m_Tx)); + m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); + } +} + void TxPool::Fluff::Delete(Element& x) { assert(x.m_pValue); x.m_pValue.reset(); + DeleteEmpty(x); +} - m_setThreshold.erase(ThresholdSet::s_iterator_to(x.m_Threshold)); - m_setProfit.erase(ProfitSet::s_iterator_to(x.m_Profit)); - m_setTxs.erase(TxSet::s_iterator_to(x.m_Tx)); - +void TxPool::Fluff::DeleteEmpty(Element& x) +{ + assert(!x.m_pValue); + InternalErase(x); Release(x); } @@ -88,8 +119,11 @@ void TxPool::Fluff::Release(Element& x) void TxPool::Fluff::Clear() { - while (!m_setThreshold.empty()) - Delete(m_setThreshold.begin()->get_ParentObj()); + while (!m_setProfit.empty()) + Delete(m_setProfit.begin()->get_ParentObj()); + + while (!m_setOutdated.empty()) + Delete(m_setOutdated.begin()->get_ParentObj()); } ///////////////////////////// diff --git a/node/txpool.h b/node/txpool.h index ef2223efc..a2844fd68 100644 --- a/node/txpool.h +++ b/node/txpool.h @@ -55,15 +55,16 @@ struct TxPool IMPLEMENT_GET_PARENT_OBJ(Element, m_Profit) } m_Profit; - struct Threshold + HeightRange m_Height; + + struct Outdated :public boost::intrusive::set_base_hook<> { - HeightRange m_Height; - - bool operator < (const Threshold& t) const { return m_Height.m_Max < t.m_Height.m_Max; } + Height m_Height; - IMPLEMENT_GET_PARENT_OBJ(Element, m_Threshold) - } m_Threshold; + bool operator < (const Outdated& t) const { return m_Height < t.m_Height; } + IMPLEMENT_GET_PARENT_OBJ(Element, m_Outdated) + } m_Outdated; struct Queue :public boost::intrusive::list_base_hook<> @@ -71,24 +72,32 @@ struct TxPool uint32_t m_Refs = 0; IMPLEMENT_GET_PARENT_OBJ(Element, m_Queue) } m_Queue; + + bool IsOutdated() const { return MaxHeight != m_Outdated.m_Height; } }; typedef boost::intrusive::multiset TxSet; typedef boost::intrusive::multiset ProfitSet; - typedef boost::intrusive::multiset ThresholdSet; + typedef boost::intrusive::multiset OutdatedSet; typedef boost::intrusive::list Queue; TxSet m_setTxs; ProfitSet m_setProfit; - ThresholdSet m_setThreshold; + OutdatedSet m_setOutdated; Queue m_Queue; Element* AddValidTx(Transaction::Ptr&&, const Transaction::Context&, const Transaction::KeyType&); + void SetOutdated(Element&, Height); void Delete(Element&); + void DeleteEmpty(Element&); void Release(Element&); void Clear(); ~Fluff() { Clear(); } + + private: + void InternalInsert(Element&); + void InternalErase(Element&); }; struct Stem diff --git a/node/unittests/node_test.cpp b/node/unittests/node_test.cpp index d688d227c..21b83095c 100644 --- a/node/unittests/node_test.cpp +++ b/node/unittests/node_test.cpp @@ -2649,6 +2649,11 @@ namespace beam auto logger = beam::Logger::create(LOG_LEVEL_DEBUG, LOG_LEVEL_DEBUG); node.PrintTxos(); + + NodeProcessor& proc = node.get_Processor(); + proc.ManualRollbackTo(3); + verify_test(proc.m_Cursor.m_ID.m_Height >= 3); // it won't necessarily reach 3 + verify_test(proc.m_sidForbidden.m_Height > Rules::HeightGenesis); // some rollback with forbidden state update must take place } diff --git a/node/utils/node_net_sim.cpp b/node/utils/node_net_sim.cpp index c8dc99524..8a586d177 100644 --- a/node/utils/node_net_sim.cpp +++ b/node/utils/node_net_sim.cpp @@ -339,7 +339,7 @@ struct Context Height hTip = get_ParentObj().m_FlyClient.get_Height(); - if (nCount < proto::Event::s_Max) + if (nCount < r.m_Max) m_hEvents = hTip + 1; else { @@ -432,7 +432,6 @@ struct Context Context() :m_Network(m_FlyClient) { - m_Exec.m_Threads = std::thread::hardware_concurrency(); } struct Cfg @@ -452,20 +451,7 @@ struct Context } m_Cfg; - struct Exec - :public beam::ExecutorMT - { - uint32_t m_Threads; - - virtual uint32_t get_Threads() override { return m_Threads; } - - virtual void RunThread(uint32_t iThread) override - { - ExecutorMT::Context ctx; - ctx.m_iThread = iThread; - RunThreadCtx(ctx); - } - } m_Exec; + beam::ExecutorMT m_Exec; void OnRolledBack() @@ -546,6 +532,8 @@ struct Context if (h < Rules::get().pForks[2].m_Height) return; + std::cout << "\tTotal shielded in/outs: " << (m_pProc->m_Mmr.m_Shielded.m_Count - m_pProc->m_Extra.m_ShieldedOutputs) << " / " << m_pProc->m_Extra.m_ShieldedOutputs << std::endl; + m_TxosMW.HandleTxs(m_setSplit, h); uint32_t nDone = m_TxosMW.HandleTxs(m_setTxsOut, h); @@ -749,7 +737,7 @@ struct Context ECC::GenRandom(nonce); sdp.m_Ticket.Generate(pKrn->m_Txo.m_Ticket, v, nonce); - sdp.GenerateOutp(pKrn->m_Txo, oracle); + sdp.GenerateOutp(pKrn->m_Txo, oracle, true); pKrn->MsgToID(); //ECC::Point::Native pt; @@ -873,7 +861,7 @@ struct Context { beam::Executor::Scope scope(m_Exec); - pKrn->Sign(p, 0); + pKrn->Sign(p, 0, true); }; pTx->m_vKernels.push_back(std::move(pKrn)); @@ -1041,6 +1029,7 @@ int main_Guarded(int argc, char* argv[]) node.m_Cfg.m_Dandelion.m_FluffProbability = 0xFFFF; node.m_Keys.SetSingleKey(pKdf); + node.m_Cfg.m_Horizon.SetStdFastSync(); node.Initialize(); if (!bLocalMode && !node.m_PostStartSynced) @@ -1069,8 +1058,8 @@ int main_Guarded(int argc, char* argv[]) else node.m_PostStartSynced = true; - if (ctx.m_Cfg.m_BulletValue < (ctx.m_Cfg.m_Fees.m_Kernel + ctx.m_Cfg.m_Fees.m_Output) * 2 + ctx.m_Cfg.m_Fees.m_ShieldedOutput + ctx.m_Cfg.m_Fees.m_ShieldedInput) - throw std::runtime_error("Bullet/Fee settings not consistent"); + Amount nMinInOut = (ctx.m_Cfg.m_Fees.m_Kernel + ctx.m_Cfg.m_Fees.m_Output) * 2 + ctx.m_Cfg.m_Fees.m_ShieldedOutput + ctx.m_Cfg.m_Fees.m_ShieldedInput; + std::setmax(ctx.m_Cfg.m_BulletValue, nMinInOut + 10); ctx.m_pKdf = pKdf; ctx.m_Network.m_Cfg.m_vNodes.push_back(io::Address(INADDR_LOOPBACK, g_LocalNodePort)); diff --git a/utility/cli/options.cpp b/utility/cli/options.cpp index 1728464f3..5d86af540 100644 --- a/utility/cli/options.cpp +++ b/utility/cli/options.cpp @@ -165,6 +165,7 @@ namespace beam const char* RESET_ID = "reset_id"; const char* ERASE_ID = "erase_id"; const char* PRINT_TXO = "print_txo"; + const char* MANUAL_ROLLBACK = "manual_rollback"; const char* CHECKDB = "check_db"; const char* VACUUM = "vacuum"; const char* CRASH = "crash"; @@ -354,6 +355,7 @@ namespace beam (cli::RESET_ID, po::value()->default_value(false), "Reset self ID (used for network authentication). Must do if the node is cloned") (cli::ERASE_ID, po::value()->default_value(false), "Reset self ID (used for network authentication) and stop before re-creating the new one.") (cli::PRINT_TXO, po::value()->default_value(false), "Print TXO movements (create/spend) recognized by the owner key.") + (cli::MANUAL_ROLLBACK, po::value(), "Explicit rollback to height. The current consequent state will be forbidden (no automatic going up the same path)") (cli::CHECKDB, po::value()->default_value(false), "DB integrity check") (cli::VACUUM, po::value()->default_value(false), "DB vacuum (compact)") (cli::BBS_ENABLE, po::value()->default_value(true), "Enable SBBS messaging") diff --git a/utility/cli/options.h b/utility/cli/options.h index c8c5d24e7..d2cc83def 100644 --- a/utility/cli/options.h +++ b/utility/cli/options.h @@ -66,6 +66,7 @@ namespace beam extern const char* RESET_ID; extern const char* ERASE_ID; extern const char* PRINT_TXO; + extern const char* MANUAL_ROLLBACK; extern const char* CHECKDB; extern const char* VACUUM; extern const char* CRASH; diff --git a/utility/common.cpp b/utility/common.cpp index 67663f8f7..07b73b697 100644 --- a/utility/common.cpp +++ b/utility/common.cpp @@ -136,6 +136,22 @@ namespace beam return static_cast(val); } + ExecutorMT::ExecutorMT() + { + m_Threads = std::thread::hardware_concurrency(); + } + + void ExecutorMT::set_Threads(uint32_t nThreads) + { + Stop(); + m_Threads = nThreads; + } + + uint32_t ExecutorMT::get_Threads() + { + return m_Threads; + } + void ExecutorMT::InitSafe() { if (!m_vThreads.empty()) @@ -227,6 +243,13 @@ namespace beam } } + void ExecutorMT::RunThread(uint32_t iThread) + { + Context ctx; + ctx.m_iThread = iThread; + RunThreadCtx(ctx); + } + void ExecutorMT::RunThreadCtx(Context& ctx) { ctx.m_pThis = this; diff --git a/utility/executor.h b/utility/executor.h index 23fae8793..2ff0e8139 100644 --- a/utility/executor.h +++ b/utility/executor.h @@ -73,16 +73,22 @@ namespace beam struct ExecutorMT :public Executor { + virtual uint32_t get_Threads() override; virtual void Push(TaskAsync::Ptr&&) override; virtual uint32_t Flush(uint32_t nMaxTasks) override; virtual void ExecAll(TaskSync&) override; + ExecutorMT(); ~ExecutorMT() { Stop(); } void Stop(); + void set_Threads(uint32_t); + protected: - virtual void RunThread(uint32_t) = 0; // override this, create the appropriate context, and call the next + uint32_t m_Threads; // set at c'tor to num of cores. + + virtual void RunThread(uint32_t); // optionally override this, create the appropriate context, and call the next void RunThreadCtx(Context&); private: diff --git a/utility/io/timer.cpp b/utility/io/timer.cpp index 3114751e9..6cdcd6236 100644 --- a/utility/io/timer.cpp +++ b/utility/io/timer.cpp @@ -61,5 +61,35 @@ void Timer::cancel() { _callback = []{}; } + + + +void IdleEvt::cancel() +{ + if (m_Set) + { + m_Set = false; + uv_idle_stop(&m_Handle); + uv_close(reinterpret_cast(&Cast::Down(m_Handle)), nullptr); + } +} + +void IdleEvt::start() +{ + if (!m_Set) + { + m_Set = true; + uv_idle_init(&Reactor::get_Current().get_UvLoop(), &m_Handle); + uv_idle_start(&m_Handle, Handle::CallbackRaw); + } +} + +void IdleEvt::Handle::CallbackRaw(uv_idle_t* p) +{ + static_cast(p)->get_ParentObj().OnSchedule(); +} + + + }} //namespaces diff --git a/utility/io/timer.h b/utility/io/timer.h index 590a89371..2691a629b 100644 --- a/utility/io/timer.h +++ b/utility/io/timer.h @@ -14,6 +14,7 @@ #pragma once #include "reactor.h" +#include "../common.h" // IMPLEMENT_GET_PARENT_OBJ namespace beam { namespace io { @@ -40,5 +41,27 @@ class Timer : protected Reactor::Object { Callback _callback; }; +struct IdleEvt +{ + ~IdleEvt() { cancel(); } + + void start(); + void cancel(); + + virtual void OnSchedule() {} + +private: + + struct Handle + :public uv_idle_t + { + static void CallbackRaw(uv_idle_t*); + + IMPLEMENT_GET_PARENT_OBJ(IdleEvt, m_Handle) + } m_Handle; + + bool m_Set = false; +}; + }} //namespaces diff --git a/wallet/broadcaster/broadcaster.cpp b/wallet/broadcaster/broadcaster.cpp index 4654ebb7a..77102567b 100644 --- a/wallet/broadcaster/broadcaster.cpp +++ b/wallet/broadcaster/broadcaster.cpp @@ -73,24 +73,53 @@ bool parseUpdateInfo(const std::string& versionString, const std::string& typeSt bool parseWalletUpdateInfo(const std::string& versionString, const std::string& typeString, WalletImplVerInfo& walletVersionInfo) { VersionInfo::Application appType = VersionInfo::from_string(typeString); - if (appType != VersionInfo::Application::DesktopWallet) return false; + if (appType == VersionInfo::Application::Unknown) return false; - auto lastDot = versionString.find_last_of("."); - if (lastDot == std::string::npos) return false; - - Version libVersion; - if (bool res = libVersion.from_string(versionString.substr(0, lastDot)); !res) return false; - - auto uiRevisionStr = versionString.substr(lastDot + 1, versionString.length()); - size_t processed = 0; - uint32_t uiRevision = std::stoul(uiRevisionStr, &processed); - if (processed != uiRevisionStr.length()) return false; + Version libVersion = {0,0,0}; + uint32_t uiRevision = 0; + if (appType == VersionInfo::Application::DesktopWallet) + { + auto lastDot = versionString.find_last_of("."); + if (lastDot == std::string::npos) return false; + if (bool res = libVersion.from_string(versionString.substr(0, lastDot)); !res) return false; + auto uiRevisionStr = versionString.substr(lastDot + 1, versionString.length()); + + size_t processed = 0; + uiRevision = std::stoul(uiRevisionStr, &processed); + if (processed != uiRevisionStr.length()) return false; + } + else if (appType == VersionInfo::Application::AndroidWallet || + appType == VersionInfo::Application::IOSWallet) + { + if (bool res = libVersion.from_string(versionString); !res) return false; + uiRevision = libVersion.m_revision; + libVersion.m_revision = 0; + } + else + { + return false; + } walletVersionInfo.m_application = appType; walletVersionInfo.m_version = libVersion; walletVersionInfo.m_UIrevision = uiRevision; walletVersionInfo.m_title = "New version"; - walletVersionInfo.m_message = "Beam Wallet UI " + versionString; + + switch (appType) + { + case VersionInfo::Application::DesktopWallet: + walletVersionInfo.m_message = "Beam Wallet UI " + versionString; + break; + case VersionInfo::Application::AndroidWallet: + walletVersionInfo.m_message = "Android Wallet " + versionString; + break; + case VersionInfo::Application::IOSWallet: + walletVersionInfo.m_message = "IOS Wallet " + versionString; + break; + default: + assert(false); + } + return true; } @@ -327,7 +356,9 @@ namespace BroadcastMsg msg = BroadcastMsgCreator::createSignedMessage(rawMessage, key); broadcastRouter.sendMessage(contentType, msg); - if (contentType == BroadcastContentType::WalletUpdates) + // broadcast for 4.2 desktop ui // deprecated + if (contentType == BroadcastContentType::WalletUpdates && + walletVersionInfo.m_application == VersionInfo::Application::DesktopWallet) { VersionInfo versionInfo; versionInfo.m_application = walletVersionInfo.m_application; @@ -376,7 +407,7 @@ int main_impl(int argc, char* argv[]) messageDesc.add_options() (cli::PRIVATE_KEY, po::value(&options.privateKey), "private key to sign message") (cli::MESSAGE_TYPE, po::value(&options.messageType), "type of message: 'update' - info about available software updates, 'exchange' - info about current exchange rates") - (cli::UPDATE_VERSION, po::value(&options.walletUpdateInfo.version), "available software version in format 'x.x.x.x'") + (cli::UPDATE_VERSION, po::value(&options.walletUpdateInfo.version), "available software version in format 'x.x.x.x' for desktop or 'x.x.x' for IOS and Android") (cli::UPDATE_TYPE, po::value(&options.walletUpdateInfo.walletType), "updated software: 'desktop', 'android', 'ios'") (cli::EXCHANGE_CURR, po::value(&options.exchangeRate.currency), "currency: 'beam', 'btc', 'ltc', 'qtum'") (cli::EXCHANGE_RATE, po::value(&options.exchangeRate.rate), "exchange rate in decimal format: 100,000,000 = 1 usd") diff --git a/wallet/client/extensions/notifications/notification.h b/wallet/client/extensions/notifications/notification.h index f8a773a85..7bb3e6bd9 100644 --- a/wallet/client/extensions/notifications/notification.h +++ b/wallet/client/extensions/notifications/notification.h @@ -22,7 +22,7 @@ namespace beam::wallet { enum class Type : uint32_t { - SoftwareUpdateAvailable, + SoftwareUpdateAvailable, // deprecated AddressStatusChanged, WalletImplUpdateAvailable, BeamNews, diff --git a/wallet/client/extensions/notifications/notification_center.cpp b/wallet/client/extensions/notifications/notification_center.cpp index 2e92d2eba..f7662e7d2 100644 --- a/wallet/client/extensions/notifications/notification_center.cpp +++ b/wallet/client/extensions/notifications/notification_center.cpp @@ -68,31 +68,9 @@ namespace beam::wallet } size_t NotificationCenter::getUnreadCount( - VersionInfo::Application app, const Version& currentLibVersion, uint32_t currentClientRevision) const + std::function counter) const { - return std::count_if(m_cache.begin(), m_cache.end(), - [app, ¤tLibVersion, ¤tClientRevision](const auto& p) - { - if (p.second.m_state == Notification::State::Unread) - { - if (p.second.m_type == Notification::Type::WalletImplUpdateAvailable) - { - WalletImplVerInfo info; - if (fromByteBuffer(p.second.m_content, info) && - app == VersionInfo::Application::DesktopWallet && - (currentLibVersion < info.m_version || - (currentLibVersion == info.m_version && currentClientRevision < info.m_UIrevision))) - { - return true; - } - } - if (p.second.m_type == Notification::Type::TransactionFailed) - { - return true; - } - } - return false; - }); + return counter(m_cache.begin(), m_cache.end()); } void NotificationCenter::createNotification(const Notification& notification) diff --git a/wallet/client/extensions/notifications/notification_center.h b/wallet/client/extensions/notifications/notification_center.h index c8541bec0..cc50d7d1e 100644 --- a/wallet/client/extensions/notifications/notification_center.h +++ b/wallet/client/extensions/notifications/notification_center.h @@ -30,6 +30,7 @@ namespace beam::wallet // TODO dh unittests of address notifications public: + using Cache = std::unordered_map; NotificationCenter( IWalletDB& storage, const std::map& activeNotifications, io::Reactor::Ptr reactor); @@ -39,7 +40,7 @@ namespace beam::wallet void switchOnOffNotifications(Notification::Type, bool); size_t getUnreadCount( - VersionInfo::Application app, const Version& currentLibVersion, uint32_t currentClientRevision) const; + std::function counter) const; void Subscribe(INotificationsObserver* observer); void Unsubscribe(INotificationsObserver* observer); @@ -64,7 +65,7 @@ namespace beam::wallet IWalletDB& m_storage; std::map m_activeNotifications; - std::unordered_map m_cache; + Cache m_cache; std::vector m_subscribers; std::vector m_myAddresses; diff --git a/wallet/client/wallet_client.cpp b/wallet/client/wallet_client.cpp index ab8ec88e6..1ad373200 100644 --- a/wallet/client/wallet_client.cpp +++ b/wallet/client/wallet_client.cpp @@ -18,7 +18,6 @@ #include "core/block_rw.h" //#include "keykeeper/trezor_key_keeper.h" #include "extensions/broadcast_gateway/broadcast_router.h" -#include "extensions/news_channels/updates_provider.h" #include "extensions/news_channels/wallet_updates_provider.h" #include "extensions/news_channels/exchange_rate_provider.h" @@ -402,18 +401,13 @@ namespace beam::wallet } // Other content providers using broadcast messages - auto updatesProvider = make_shared(*broadcastRouter, *broadcastValidator); auto walletUpdatesProvider = make_shared(*broadcastRouter, *broadcastValidator); auto exchangeRateProvider = make_shared( *broadcastRouter, *broadcastValidator, *m_walletDB, isSecondCurrencyEnabled); - m_updatesProvider = updatesProvider; m_exchangeRateProvider = exchangeRateProvider; m_walletUpdatesProvider = walletUpdatesProvider; - using NewsSubscriber = ScopedSubscriber; using WalletUpdatesSubscriber = ScopedSubscriber; using ExchangeRatesSubscriber = ScopedSubscriber; - auto newsSubscriber = make_unique(static_cast( - m_notificationCenter.get()), updatesProvider); auto walletUpdatesSubscriber = make_unique(static_cast( m_notificationCenter.get()), walletUpdatesProvider); auto ratesSubscriber = make_unique( @@ -433,6 +427,8 @@ namespace beam::wallet nodeNetworkSubscriber.reset(); assert(nodeNetwork.use_count() == 1); nodeNetwork.reset(); + + m_DeferredBalanceUpdate.cancel(); // for more safety, while we see the same reactor } catch (const runtime_error& ex) { @@ -503,9 +499,13 @@ namespace beam::wallet void WalletClient::onCoinsChanged(ChangeAction action, const std::vector& items) { m_CoinChangesCollector.CollectItems(action, items); - // TODO: refactor this - // We should call getStatus to update balances - onStatus(getStatus()); + m_DeferredBalanceUpdate.start(); + } + + void WalletClient::DeferredBalanceUpdate::OnSchedule() + { + cancel(); + get_ParentObj().onStatus(get_ParentObj().getStatus()); } void WalletClient::onTransactionChanged(ChangeAction action, const std::vector& items) @@ -1149,9 +1149,35 @@ namespace beam::wallet void WalletClient::updateNotifications() { - size_t count = m_notificationCenter->getUnreadCount( - VersionInfo::Application::DesktopWallet, getLibVersion(), getClientRevision()); + [this] (NotificationCenter::Cache::const_iterator first, NotificationCenter::Cache::const_iterator last) + { + auto currentLibVersion = getLibVersion(); + auto currentClientRevision = getClientRevision(); + return std::count_if(first, last, + [¤tLibVersion, ¤tClientRevision](const auto& p) + { + if (p.second.m_state == Notification::State::Unread) + { + if (p.second.m_type == Notification::Type::WalletImplUpdateAvailable) + { + WalletImplVerInfo info; + if (fromByteBuffer(p.second.m_content, info) && + VersionInfo::Application::DesktopWallet == info.m_application && + (currentLibVersion < info.m_version || + (currentLibVersion == info.m_version && currentClientRevision < info.m_UIrevision))) + { + return true; + } + } + if (p.second.m_type == Notification::Type::TransactionFailed) + { + return true; + } + } + return false; + }); + }); postFunctionToClientContext([this, count]() { m_unreadNotificationsCount = count; diff --git a/wallet/client/wallet_client.h b/wallet/client/wallet_client.h index 9e73834dd..f6f7ed659 100644 --- a/wallet/client/wallet_client.h +++ b/wallet/client/wallet_client.h @@ -101,6 +101,13 @@ namespace beam::wallet // use this function to post function call to client's main loop void postFunctionToClientContext(MessageFunction&& func); + struct DeferredBalanceUpdate + :public io::IdleEvt + { + virtual void OnSchedule() override; + IMPLEMENT_GET_PARENT_OBJ(WalletClient, m_DeferredBalanceUpdate) + } m_DeferredBalanceUpdate; + // Callbacks virtual void onStatus(const WalletStatus& status) {} virtual void onTxStatus(ChangeAction, const std::vector& items) {} diff --git a/wallet/core/wallet.cpp b/wallet/core/wallet.cpp index 76738186c..380b7ebc9 100644 --- a/wallet/core/wallet.cpp +++ b/wallet/core/wallet.cpp @@ -164,6 +164,8 @@ namespace beam::wallet // Rescan the blockchain for UTXOs void Wallet::Rescan() { + AbortEvents(); + // We save all Incoming coins of active transactions and // restore them after clearing db. This will save our outgoing & available amounts std::vector ocoins; @@ -919,7 +921,7 @@ namespace beam::wallet uint32_t nCount = p.Proceed(r.m_Res.m_Events); - if (nCount < proto::Event::s_Max) + if (nCount < r.m_Max) { Block::SystemState::Full sTip; m_WalletDB->get_History().get_Tip(sTip); @@ -1057,7 +1059,7 @@ namespace beam::wallet { // Reconstruct tx with reset parameters and add it to the active list auto pTx = ConstructTransaction(tx.m_txId, tx.m_txType); - if (pTx->Rollback(sTip.m_Height)) + if (pTx && pTx->Rollback(sTip.m_Height)) { m_ActiveTransactions.emplace(tx.m_txId, pTx); UpdateOnSynced(pTx); @@ -1072,6 +1074,52 @@ namespace beam::wallet } } + void Wallet::OnEventsSerif(const Hash::Value& hv, Height h) + { + static const char szEvtSerif[] = "EventsSerif"; + + HeightHash hh; + if (!storage::getVar(*m_WalletDB, szEvtSerif, hh)) + { + hh.m_Hash = Zero; + // If this is the 1st time we received the serif - we do NOT assume the wallet was synced ok. It can potentially be already out-of-sync. + // Hence once node and wallet are both upgraded from older version - there will always be initial rescan. + hh.m_Height = MaxHeight; + } + + bool bHashChanged = (hh.m_Hash != hv); + bool bMustRescan = bHashChanged; + if (bHashChanged) + { + // Node Serif has changed (either connected to different node, or it rescanned the blockchain). The events stream may not be consistent with ours. + Height h0 = GetEventsHeightNext(); + if (!h0) + bMustRescan = false; // nothing to rescan atm + else + { + if (h0 > hh.m_Height) + { + // Our events are consistent and full up to height h0-1. + bMustRescan = (h >= h0); + } + } + + hh.m_Hash = hv; + } + + if (bHashChanged || (h != hh.m_Height)) + { + LOG_INFO() << "Events Serif changed: " << (bHashChanged ? "new Hash, " : "") << "Height=" << h << (bMustRescan ? ", Resyncing" : ""); + + hh.m_Height = h; + storage::setVar(*m_WalletDB, szEvtSerif, hh); + } + + if (bMustRescan) + Rescan(); + + } + void Wallet::OnNewTip() { m_WalletDB->ShrinkHistory(); diff --git a/wallet/core/wallet.h b/wallet/core/wallet.h index dea1278c8..4bc0e3608 100644 --- a/wallet/core/wallet.h +++ b/wallet/core/wallet.h @@ -167,6 +167,7 @@ namespace beam::wallet void get_OwnerKdf(Key::IPKdf::Ptr&) override; Block::SystemState::IHistory& get_History() override; void OnOwnedNode(const PeerID&, bool bUp) override; + void OnEventsSerif(const ECC::Hash::Value&, Height) override; struct RequestHandler : public proto::FlyClient::Request::IHandler diff --git a/wallet/core/wallet_db.cpp b/wallet/core/wallet_db.cpp index 78a53e040..f419da8a8 100644 --- a/wallet/core/wallet_db.cpp +++ b/wallet/core/wallet_db.cpp @@ -2103,6 +2103,60 @@ namespace beam::wallet return true; } + + typedef std::map ShieldedSpendKeyMap; + ShieldedSpendKeyMap m_mapShielded; + + virtual bool OnAssetRecognized(Asset::Full&) override + { + // TODO + return true; + } + + virtual bool OnShieldedOutRecognized(const ShieldedTxo::DescriptionOutp& dout, const ShieldedTxo::DataParams& pars, Key::Index nIdx) override + { + ShieldedCoin sc; + + sc.m_Key.m_nIdx = nIdx; + sc.m_Key.m_IsCreatedByViewer = pars.m_Ticket.m_IsCreatedByViewer; + sc.m_Key.m_kSerG = pars.m_Ticket.m_pK[0]; + + sc.m_User = pars.m_Output.m_User; + sc.m_ID = dout.m_ID; + sc.m_assetID = pars.m_Output.m_AssetID; + sc.m_value = pars.m_Output.m_Value; + sc.m_confirmHeight = dout.m_Height; + sc.m_spentHeight = MaxHeight; + + m_This.saveShieldedCoin(sc); + + LOG_INFO() << "Shielded output, ID: " << dout.m_ID << " Confirmed, Height=" << dout.m_Height; + + m_mapShielded[pars.m_Ticket.m_SpendPk] = sc.m_Key; + + return true; + } + + virtual bool OnShieldedIn(const ShieldedTxo::DescriptionInp& dinp) override + { + ShieldedSpendKeyMap::iterator it = m_mapShielded.find(dinp.m_SpendPk); + if (m_mapShielded.end() != it) + { + auto shieldedCoin = m_This.getShieldedCoin(it->second); + if (shieldedCoin) + { + shieldedCoin->m_spentHeight = dinp.m_Height; + m_This.saveShieldedCoin(*shieldedCoin); + + LOG_INFO() << "Shielded input, ID: " << shieldedCoin->m_ID << " Spent, Height=" << dinp.m_Height; + } + + m_mapShielded.erase(it); + } + + return true; + } + }; MyParser p(*this, prog); @@ -2535,7 +2589,7 @@ namespace beam::wallet void WalletDB::visitShieldedCoins(std::function func) { - sqlite::Statement stm(this, "SELECT " SHIELDED_COIN_FIELDS " FROM " SHIELDED_COINS_NAME " ORDER BY ID;"); + sqlite::Statement stm(this, "SELECT " SHIELDED_COIN_FIELDS " FROM " SHIELDED_COINS_NAME " ORDER BY Key;"); // the order is not importantt, but at least it should be by indexed field while (stm.step()) { ShieldedCoin coin; diff --git a/wallet/transactions/lelantus/pull_tx_builder.cpp b/wallet/transactions/lelantus/pull_tx_builder.cpp index ceeb3313e..33cac9516 100644 --- a/wallet/transactions/lelantus/pull_tx_builder.cpp +++ b/wallet/transactions/lelantus/pull_tx_builder.cpp @@ -15,6 +15,7 @@ #include "pull_tx_builder.h" #include "core/shielded.h" +#include "utility/executor.h" #include @@ -176,7 +177,12 @@ namespace beam::wallet::lelantus prover.m_Witness.V.m_V = IsAssetTx() ? GetAmount() : GetAmount() + GetFee(); } - pKrn->Sign(prover, GetAssetId()); + { + ExecutorMT exec; + Executor::Scope scope(exec); + pKrn->Sign(prover, GetAssetId()); + } + m_Tx.SetParameter(TxParameterID::KernelID, pKrn->m_Internal.m_ID); LOG_INFO() << m_Tx.GetTxID() << "[" << m_SubTxID << "]"