From 92166eb2c674ffd29f7b532b98285bf4e7867cbe Mon Sep 17 00:00:00 2001 From: Samuel Holmes Date: Thu, 18 Nov 2021 17:45:43 -0800 Subject: [PATCH 1/4] Rename TaskCache to TaskState --- .../utxobased/engine/makeUtxoEngineState.ts | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/common/utxobased/engine/makeUtxoEngineState.ts b/src/common/utxobased/engine/makeUtxoEngineState.ts index e8acbc42..00b8be92 100644 --- a/src/common/utxobased/engine/makeUtxoEngineState.ts +++ b/src/common/utxobased/engine/makeUtxoEngineState.ts @@ -98,7 +98,7 @@ export function makeUtxoEngineState( const { supportedFormats } = walletInfo.keys - const taskCache: TaskCache = { + const taskState: TaskState = { addressWatching: false, blockWatching: false, addressSubscribeCache: {}, @@ -109,15 +109,15 @@ export function makeUtxoEngineState( updateTransactionsCache: {} } - const clearTaskCache = (): void => { - taskCache.addressWatching = false - taskCache.blockWatching = false - taskCache.addressSubscribeCache = {} - taskCache.transactionsCache = {} - taskCache.utxosCache = {} - taskCache.rawUtxosCache = {} - taskCache.processedUtxosCache = {} - taskCache.updateTransactionsCache = {} + const clearTaskState = (): void => { + taskState.addressWatching = false + taskState.blockWatching = false + taskState.addressSubscribeCache = {} + taskState.transactionsCache = {} + taskState.utxosCache = {} + taskState.rawUtxosCache = {} + taskState.processedUtxosCache = {} + taskState.updateTransactionsCache = {} } let processedCount = 0 @@ -156,7 +156,7 @@ export function makeUtxoEngineState( walletTools, processor, emitter, - taskCache, + taskState, onAddressChecked, io: config.io, log, @@ -191,7 +191,7 @@ export function makeUtxoEngineState( }) for (const tx of txs) { if (tx == null) continue - taskCache.updateTransactionsCache[tx.txid] = { processing: false } + taskState.updateTransactionsCache[tx.txid] = { processing: false } } } ) @@ -199,10 +199,10 @@ export function makeUtxoEngineState( emitter.on( EngineEvent.NEW_ADDRESS_TRANSACTION, async (_uri: string, response: INewTransactionResponse): Promise => { - const state = taskCache.addressSubscribeCache[response.address] + const state = taskState.addressSubscribeCache[response.address] if (state != null) { const { path } = state - taskCache.utxosCache[response.address] = { + taskState.utxosCache[response.address] = { processing: false, path } @@ -212,7 +212,7 @@ export function makeUtxoEngineState( path.format, path.branch, 0, - taskCache.transactionsCache + taskState.transactionsCache ).catch(() => { throw new Error('failed to add to transaction cache') }) @@ -233,7 +233,7 @@ export function makeUtxoEngineState( ) if ( - Object.keys(taskCache.addressSubscribeCache).length < totalAddressCount + Object.keys(taskState.addressSubscribeCache).length < totalAddressCount ) { for (const format of supportedFormats) { const branches = getFormatSupportedBranches(format) @@ -259,7 +259,7 @@ export function makeUtxoEngineState( addressesToSubscribe.add(address) } addToAddressSubscribeCache( - commonArgs.taskCache, + commonArgs.taskState, addressesToSubscribe, { format, @@ -283,7 +283,7 @@ export function makeUtxoEngineState( async stop(): Promise { serverStates.stop() - clearTaskCache() + clearTaskState() running = false }, @@ -336,7 +336,7 @@ export function makeUtxoEngineState( walletTools: commonArgs.walletTools, engineInfo: commonArgs.pluginInfo.engineInfo, processor: commonArgs.processor, - taskCache: commonArgs.taskCache, + taskState: commonArgs.taskState, format: walletInfo.keys.format, script }) @@ -396,7 +396,7 @@ interface CommonArgs { walletTools: UTXOPluginWalletTools processor: Processor emitter: EngineEmitter - taskCache: TaskCache + taskState: TaskState onAddressChecked: () => void io: EdgeIo log: EdgeLog @@ -409,7 +409,7 @@ interface ShortPath { format: CurrencyFormat branch: number } -interface TaskCache { +interface TaskState { addressWatching: boolean blockWatching: boolean addressSubscribeCache: AddressSubscribeCache @@ -524,7 +524,7 @@ const setLookAhead = async (common: CommonArgs): Promise => { // Add all the addresses to the subscribe cache for registering subscriptions later addToAddressSubscribeCache( - common.taskCache, + common.taskState, addressesToSubscribe, shortPath ) @@ -532,16 +532,16 @@ const setLookAhead = async (common: CommonArgs): Promise => { } const addToAddressSubscribeCache = ( - taskCache: TaskCache, + taskState: TaskState, addresses: Set, path: ShortPath ): void => { addresses.forEach(address => { - taskCache.addressSubscribeCache[address] = { + taskState.addressSubscribeCache[address] = { path, processing: false } - taskCache.addressWatching = false + taskState.addressWatching = false }) } @@ -610,7 +610,7 @@ export const pickNextTask = async ( args: NextTaskArgs // eslint-disable-next-line @typescript-eslint/no-explicit-any ): Promise | undefined | boolean> => { - const { taskCache, uri, serverStates } = args + const { taskState, uri, serverStates } = args const { addressSubscribeCache, @@ -619,7 +619,7 @@ export const pickNextTask = async ( processedUtxosCache, transactionsCache, updateTransactionsCache - } = taskCache + } = taskState const serverState = serverStates.getServerState(uri) if (serverState == null) return @@ -717,7 +717,7 @@ export const pickNextTask = async ( // Check if there are any addresses pending to be subscribed if ( Object.keys(addressSubscribeCache).length > 0 && - !taskCache.addressWatching + !taskState.addressWatching ) { const blockHeight = serverStates.getBlockHeight(uri) // Loop each address that needs to be subscribed @@ -745,7 +745,7 @@ export const pickNextTask = async ( state.processing = true } - taskCache.addressWatching = true + taskState.addressWatching = true const queryTime = Date.now() const deferredAddressSub = new Deferred() @@ -754,10 +754,10 @@ export const pickNextTask = async ( serverStates.serverScoreUp(uri, Date.now() - queryTime) }) .catch(() => { - taskCache.addressWatching = false + taskState.addressWatching = false }) deferredAddressSub.promise.catch(() => { - taskCache.addressWatching = false + taskState.addressWatching = false }) serverStates.watchAddresses( uri, @@ -824,7 +824,7 @@ interface UpdateTransactionsArgs extends CommonArgs { const updateTransactions = ( args: UpdateTransactionsArgs ): WsTask => { - const { emitter, walletTools, txId, pluginInfo, processor, taskCache } = args + const { emitter, walletTools, txId, pluginInfo, processor, taskState } = args const deferredITransaction = new Deferred() deferredITransaction.promise .then(async (rawTx: ITransaction) => { @@ -849,7 +849,7 @@ const updateTransactions = ( }) }) .catch(() => { - taskCache.updateTransactionsCache[txId] = { processing: false } + taskState.updateTransactionsCache[txId] = { processing: false } }) return { ...transactionMessage(txId), @@ -881,7 +881,7 @@ interface DeriveScriptAddressArgs { engineInfo: EngineInfo processor: Processor format: CurrencyFormat - taskCache: TaskCache + taskState: TaskState script: string } @@ -896,7 +896,7 @@ const internalDeriveScriptAddress = async ({ engineInfo, processor, format, - taskCache, + taskState, script }: DeriveScriptAddressArgs): Promise => { if (engineInfo.scriptTemplates == null) { @@ -923,7 +923,7 @@ const internalDeriveScriptAddress = async ({ ) const addresses = new Set() addresses.add(address) - addToAddressSubscribeCache(taskCache, addresses, { + addToAddressSubscribeCache(taskState, addresses, { format: path.format, branch: path.changeIndex }) @@ -992,11 +992,11 @@ const processAddressTransactions = async ( processor, walletTools, path, - taskCache, + taskState, serverStates, uri } = args - const transactionsCache = taskCache.transactionsCache + const transactionsCache = taskState.transactionsCache const scriptPubkey = walletTools.addressToScriptPubkey(address) const addressData = await processor.fetchAddress(scriptPubkey) @@ -1129,12 +1129,12 @@ const processAddressUtxos = async ( address, walletTools, processor, - taskCache, + taskState, path, serverStates, uri } = args - const { utxosCache, rawUtxosCache } = taskCache + const { utxosCache, rawUtxosCache } = taskState const queryTime = Date.now() const deferredIAccountUTXOs = new Deferred() deferredIAccountUTXOs.promise @@ -1253,13 +1253,13 @@ const processRawUtxo = async ( format, processor, path, - taskCache, + taskState, requiredCount, serverStates, uri, log } = args - const { rawUtxosCache, processedUtxosCache } = taskCache + const { rawUtxosCache, processedUtxosCache } = taskState let scriptType: ScriptTypeEnum let script: string let redeemScript: string | undefined From 24b3cb402902a203e2b27cdbfd6275ab04d2d2d3 Mon Sep 17 00:00:00 2001 From: Samuel Holmes Date: Fri, 19 Nov 2021 10:54:05 -0800 Subject: [PATCH 2/4] Abstract TaskCache By having a TaskCache interface type, we maintain referential integrity, minimizing bug surface area. Also, it moves some code around for better isolation. --- .../utxobased/engine/makeUtxoEngineState.ts | 294 ++++++++---------- src/common/utxobased/engine/taskCache.ts | 41 +++ 2 files changed, 176 insertions(+), 159 deletions(-) create mode 100644 src/common/utxobased/engine/taskCache.ts diff --git a/src/common/utxobased/engine/makeUtxoEngineState.ts b/src/common/utxobased/engine/makeUtxoEngineState.ts index 00b8be92..b183d9d3 100644 --- a/src/common/utxobased/engine/makeUtxoEngineState.ts +++ b/src/common/utxobased/engine/makeUtxoEngineState.ts @@ -14,7 +14,6 @@ import { EngineInfo, PluginInfo } from '../../plugin/types' -import { removeItem } from '../../plugin/utils' import { Processor } from '../db/makeProcessor' import { toEdgeTransaction } from '../db/Models/ProcessorTransaction' import { @@ -49,6 +48,7 @@ import AwaitLock from './await-lock' import { BLOCKBOOK_TXS_PER_PAGE, CACHE_THROTTLE } from './constants' import { makeServerStates, ServerStates } from './makeServerStates' import { UTXOPluginWalletTools } from './makeUtxoWalletTools' +import { makeTaskCache, TaskCache } from './taskCache' import { currencyFormatToPurposeType, getCurrencyFormatFromPurposeType, @@ -101,23 +101,23 @@ export function makeUtxoEngineState( const taskState: TaskState = { addressWatching: false, blockWatching: false, - addressSubscribeCache: {}, - transactionsCache: {}, - utxosCache: {}, - rawUtxosCache: {}, - processedUtxosCache: {}, - updateTransactionsCache: {} + addressSubscribeTasks: makeTaskCache(), + transactionTasks: makeTaskCache(), + utxoTasks: makeTaskCache(), + rawUtxoTasks: makeTaskCache(), + processedUtxoTasks: makeTaskCache(), + updateTransactionTasks: makeTaskCache() } const clearTaskState = (): void => { taskState.addressWatching = false taskState.blockWatching = false - taskState.addressSubscribeCache = {} - taskState.transactionsCache = {} - taskState.utxosCache = {} - taskState.rawUtxosCache = {} - taskState.processedUtxosCache = {} - taskState.updateTransactionsCache = {} + taskState.addressSubscribeTasks.clear() + taskState.transactionTasks.clear() + taskState.utxoTasks.clear() + taskState.rawUtxoTasks.clear() + taskState.processedUtxoTasks.clear() + taskState.updateTransactionTasks.clear() } let processedCount = 0 @@ -191,7 +191,7 @@ export function makeUtxoEngineState( }) for (const tx of txs) { if (tx == null) continue - taskState.updateTransactionsCache[tx.txid] = { processing: false } + taskState.updateTransactionTasks.add(tx.txid, { processing: false }) } } ) @@ -199,20 +199,20 @@ export function makeUtxoEngineState( emitter.on( EngineEvent.NEW_ADDRESS_TRANSACTION, async (_uri: string, response: INewTransactionResponse): Promise => { - const state = taskState.addressSubscribeCache[response.address] - if (state != null) { - const { path } = state - taskState.utxosCache[response.address] = { + const task = taskState.addressSubscribeTasks.get(response.address) + if (task != null) { + const { path } = task + taskState.utxoTasks.add(response.address, { processing: false, path - } - addToTransactionCache( + }) + addTransactionTasks( commonArgs, response.address, path.format, path.branch, 0, - taskState.transactionsCache + taskState.transactionTasks ).catch(() => { throw new Error('failed to add to transaction cache') }) @@ -223,7 +223,7 @@ export function makeUtxoEngineState( } ) - // Initialize the addressSubscribeCache with the existing addresses already + // Initialize the addressSubscribeTasks with the existing addresses already // processed by the processor. This happens only once before any call to // setLookAhead. const initializeAddressSubscriptions = async (): Promise => { @@ -232,9 +232,7 @@ export function makeUtxoEngineState( processor ) - if ( - Object.keys(taskState.addressSubscribeCache).length < totalAddressCount - ) { + if (taskState.addressSubscribeTasks.size < totalAddressCount) { for (const format of supportedFormats) { const branches = getFormatSupportedBranches(format) for (const branch of branches) { @@ -258,14 +256,10 @@ export function makeUtxoEngineState( }) addressesToSubscribe.add(address) } - addToAddressSubscribeCache( - commonArgs.taskState, - addressesToSubscribe, - { - format, - branch - } - ) + addAddressSubscribeTasks(commonArgs.taskState, addressesToSubscribe, { + format, + branch + }) } } } @@ -412,46 +406,44 @@ interface ShortPath { interface TaskState { addressWatching: boolean blockWatching: boolean - addressSubscribeCache: AddressSubscribeCache - utxosCache: UtxosCache - rawUtxosCache: RawUtxoCache - processedUtxosCache: ProcessedUtxoCache - transactionsCache: AddressTransactionCache - updateTransactionsCache: UpdateTransactionCache + addressSubscribeTasks: TaskCache + utxoTasks: TaskCache + rawUtxoTasks: TaskCache + processedUtxoTasks: TaskCache + transactionTasks: TaskCache + updateTransactionTasks: TaskCache } -interface UpdateTransactionCache { - [key: string]: { processing: boolean } +interface UpdateTransactionTask { + processing: boolean } -interface AddressSubscribeCache { - [key: string]: { processing: boolean; path: ShortPath } +interface AddressSubscribeTask { + processing: boolean + path: ShortPath } -interface UtxosCache { - [key: string]: { processing: boolean; path: ShortPath } + +interface UtxosTask { + processing: boolean + path: ShortPath } -interface ProcessedUtxoCache { - [key: string]: { - processing: boolean - full: boolean - utxos: Set - path: ShortPath - } + +interface ProcessedUtxoTask { + processing: boolean + full: boolean + utxos: Set + path: ShortPath } -interface RawUtxoCache { - [key: string]: { - processing: boolean - path: ShortPath - address: IAddress - requiredCount: number - } +interface RawUtxoTask { + processing: boolean + path: ShortPath + address: IAddress + requiredCount: number } -interface AddressTransactionCache { - [key: string]: { - processing: boolean - path: ShortPath - page: number - blockHeight: number - } +interface TransactionTask { + processing: boolean + path: ShortPath + page: number + blockHeight: number } interface FormatArgs extends CommonArgs, ShortPath {} @@ -523,35 +515,31 @@ const setLookAhead = async (common: CommonArgs): Promise => { } // Add all the addresses to the subscribe cache for registering subscriptions later - addToAddressSubscribeCache( - common.taskState, - addressesToSubscribe, - shortPath - ) + addAddressSubscribeTasks(common.taskState, addressesToSubscribe, shortPath) } } -const addToAddressSubscribeCache = ( +const addAddressSubscribeTasks = ( taskState: TaskState, addresses: Set, path: ShortPath ): void => { addresses.forEach(address => { - taskState.addressSubscribeCache[address] = { + taskState.addressSubscribeTasks.add(address, { path, processing: false - } - taskState.addressWatching = false + }) }) + taskState.addressWatching = false } -const addToTransactionCache = async ( +const addTransactionTasks = async ( args: CommonArgs, address: string, format: CurrencyFormat, branch: number, blockHeight: number, - transactions: AddressTransactionCache + transactionTasks: TaskCache ): Promise => { const { walletTools, processor } = args // Fetch the blockHeight for the address from the database @@ -563,7 +551,7 @@ const addToTransactionCache = async ( blockHeight = lastQueriedBlockHeight } - transactions[address] = { + transactionTasks.add(address, { processing: false, path: { format, @@ -571,7 +559,7 @@ const addToTransactionCache = async ( }, page: 1, // Page starts on 1 blockHeight - } + }) } interface TransactionChangedArgs { @@ -613,12 +601,12 @@ export const pickNextTask = async ( const { taskState, uri, serverStates } = args const { - addressSubscribeCache, - utxosCache, - rawUtxosCache, - processedUtxosCache, - transactionsCache, - updateTransactionsCache + addressSubscribeTasks, + utxoTasks, + rawUtxoTasks, + processedUtxoTasks, + transactionTasks, + updateTransactionTasks } = taskState const serverState = serverStates.getServerState(uri) @@ -641,32 +629,30 @@ export const pickNextTask = async ( } // Loop processed utxos, these are just database ops, triggers setLookAhead - if (Object.keys(processedUtxosCache).length > 0) { - for (const scriptPubkey of Object.keys(processedUtxosCache)) { + if (processedUtxoTasks.size > 0) { + for (const [scriptPubkey, task] of processedUtxoTasks.entries) { // Only process when all utxos for a specific address have been gathered - const state = processedUtxosCache[scriptPubkey] - if (!state.processing && state.full) { - state.processing = true + if (!task.processing && task.full) { + task.processing = true await processUtxoTransactions({ ...args, scriptPubkey, - utxos: state.utxos, - path: state.path + utxos: task.utxos, + path: task.path }) - removeItem(processedUtxosCache, scriptPubkey) + processedUtxoTasks.remove(scriptPubkey) return true } } } // Loop unparsed utxos, some require a network call to get the full tx data - for (const utxoString of Object.keys(rawUtxosCache)) { - const state = rawUtxosCache[utxoString] + for (const [utxoString, task] of rawUtxoTasks.entries) { const utxo: IAccountUTXO = JSON.parse(utxoString) if (utxo == null) continue - if (!state.processing) { + if (!task.processing) { // check if we need to fetch additional network content for legacy purpose type - const purposeType = currencyFormatToPurposeType(state.path.format) + const purposeType = currencyFormatToPurposeType(task.path.format) if ( purposeType === BIP43PurposeTypeEnum.Airbitz || purposeType === BIP43PurposeTypeEnum.Legacy @@ -674,13 +660,13 @@ export const pickNextTask = async ( // if we do need to make a network call, check with the serverState if (!serverStates.serverCanGetTx(uri, utxo.txid)) return } - state.processing = true - removeItem(rawUtxosCache, utxoString) + task.processing = true + rawUtxoTasks.remove(utxoString) const wsTask = await processRawUtxo({ ...args, - ...state, - ...state.path, - address: state.address, + ...task, + ...task.path, + address: task.address, utxo, id: `${utxo.txid}_${utxo.vout}` }) @@ -689,18 +675,17 @@ export const pickNextTask = async ( } // Loop to process addresses to utxos - for (const address of Object.keys(utxosCache)) { - const state = utxosCache[address] + for (const [address, task] of utxoTasks.entries) { // Check if we need to fetch address UTXOs - if (!state.processing && serverStates.serverCanGetAddress(uri, address)) { - state.processing = true + if (!task.processing && serverStates.serverCanGetAddress(uri, address)) { + task.processing = true - removeItem(utxosCache, address) + utxoTasks.remove(address) // Fetch and process address UTXOs const wsTask = await processAddressUtxos({ ...args, - ...state, + ...task, address }) wsTask.deferred.promise @@ -715,34 +700,29 @@ export const pickNextTask = async ( } // Check if there are any addresses pending to be subscribed - if ( - Object.keys(addressSubscribeCache).length > 0 && - !taskState.addressWatching - ) { + if (addressSubscribeTasks.size > 0 && !taskState.addressWatching) { const blockHeight = serverStates.getBlockHeight(uri) // Loop each address that needs to be subscribed - for (const address of Object.keys(addressSubscribeCache)) { - const state = addressSubscribeCache[address] + for (const [address, task] of addressSubscribeTasks.entries) { // Add address in the cache to the set of addresses to watch - const { path, processing: subscribed } = state // only process newly watched addresses - if (subscribed) continue - if (path != null) { + if (task.processing) continue + if (task.path != null) { // Add the newly watched addresses to the UTXO cache - utxosCache[address] = { + utxoTasks.add(address, { processing: false, - path - } - await addToTransactionCache( + path: task.path + }) + await addTransactionTasks( args, address, - path.format, - path.branch, + task.path.format, + task.path.branch, blockHeight, - transactionsCache + transactionTasks ) } - state.processing = true + task.processing = true } taskState.addressWatching = true @@ -752,6 +732,7 @@ export const pickNextTask = async ( deferredAddressSub.promise .then(() => { serverStates.serverScoreUp(uri, Date.now() - queryTime) + taskState.addressWatching = false }) .catch(() => { taskState.addressWatching = false @@ -761,21 +742,18 @@ export const pickNextTask = async ( }) serverStates.watchAddresses( uri, - Array.from(Object.keys(addressSubscribeCache)), + addressSubscribeTasks.keys, deferredAddressSub ) return true } // filled when transactions potentially changed (e.g. through new block notification) - if (Object.keys(updateTransactionsCache).length > 0) { - for (const txId of Object.keys(updateTransactionsCache)) { - if ( - !updateTransactionsCache[txId].processing && - serverStates.serverCanGetTx(uri, txId) - ) { - updateTransactionsCache[txId].processing = true - removeItem(updateTransactionsCache, txId) + if (updateTransactionTasks.size > 0) { + for (const [txId, task] of updateTransactionTasks.entries) { + if (!task.processing && serverStates.serverCanGetTx(uri, txId)) { + task.processing = true + updateTransactionTasks.remove(txId) const updateTransactionTask = updateTransactions({ ...args, txId }) // once resolved, add the txid to the server cache updateTransactionTask.deferred.promise @@ -792,17 +770,16 @@ export const pickNextTask = async ( } // loop to get and process transaction history of single addresses, triggers setLookAhead - for (const address of Object.keys(transactionsCache)) { - const state = transactionsCache[address] - if (!state.processing && serverStates.serverCanGetAddress(uri, address)) { - state.processing = true + for (const [address, task] of transactionTasks.entries) { + if (!task.processing && serverStates.serverCanGetAddress(uri, address)) { + task.processing = true - removeItem(transactionsCache, address) + transactionTasks.remove(address) // Fetch and process address UTXOs const wsTask = await processAddressTransactions({ ...args, - ...state, + ...task, address }) wsTask.deferred.promise @@ -849,7 +826,7 @@ const updateTransactions = ( }) }) .catch(() => { - taskState.updateTransactionsCache[txId] = { processing: false } + taskState.updateTransactionTasks.add(txId, { processing: false }) }) return { ...transactionMessage(txId), @@ -923,7 +900,7 @@ const internalDeriveScriptAddress = async ({ ) const addresses = new Set() addresses.add(address) - addToAddressSubscribeCache(taskState, addresses, { + addAddressSubscribeTasks(taskState, addresses, { format: path.format, branch: path.changeIndex }) @@ -996,7 +973,6 @@ const processAddressTransactions = async ( serverStates, uri } = args - const transactionsCache = taskState.transactionsCache const scriptPubkey = walletTools.addressToScriptPubkey(address) const addressData = await processor.fetchAddress(scriptPubkey) @@ -1037,12 +1013,12 @@ const processAddressTransactions = async ( // we have progressed through all of the blockbook pages if (page < totalPages) { // Add the address back to the cache, incrementing the page - transactionsCache[address] = { + taskState.transactionTasks.add(address, { path, processing: false, blockHeight, page: page + 1 - } + }) return } @@ -1134,7 +1110,7 @@ const processAddressUtxos = async ( serverStates, uri } = args - const { utxosCache, rawUtxosCache } = taskState + const { utxoTasks, rawUtxoTasks } = taskState const queryTime = Date.now() const deferredIAccountUTXOs = new Deferred() deferredIAccountUTXOs.promise @@ -1146,21 +1122,21 @@ const processAddressUtxos = async ( return } for (const utxo of utxos) { - rawUtxosCache[JSON.stringify(utxo)] = { + rawUtxoTasks.add(JSON.stringify(utxo), { processing: false, requiredCount: utxos.length, path, // TypeScript yells otherwise address: { ...addressData, path: addressData.path } - } + }) } }) .catch(() => { args.processing = false - utxosCache[address] = { + utxoTasks.add(address, { processing: args.processing, path - } + }) }) return { ...addressUtxosMessage(address), @@ -1259,15 +1235,15 @@ const processRawUtxo = async ( uri, log } = args - const { rawUtxosCache, processedUtxosCache } = taskState + const { rawUtxoTasks, processedUtxoTasks } = taskState let scriptType: ScriptTypeEnum let script: string let redeemScript: string | undefined // Function to call once we are finished const done = (): void => - addToProcessedUtxosCache( - processedUtxosCache, + addProcessedUtxosTask( + processedUtxoTasks, path, address.scriptPubkey, requiredCount, @@ -1312,12 +1288,12 @@ const processRawUtxo = async ( .catch(e => { // If something went wrong, add the UTXO back to the queue log('error in processed utxos cache, re-adding utxo to cache:', e) - rawUtxosCache[JSON.stringify(utxo)] = { + rawUtxoTasks.add(JSON.stringify(utxo), { processing: false, path, address, requiredCount - } + }) }) return { ...transactionMessage(utxo.txid), @@ -1351,20 +1327,20 @@ const processRawUtxo = async ( done() } -const addToProcessedUtxosCache = ( - processedUtxosCache: ProcessedUtxoCache, +const addProcessedUtxosTask = ( + processedUtxoTasks: TaskCache, path: ShortPath, scriptPubkey: string, requiredCount: number, utxo: IUTXO ): void => { - const processedUtxos = processedUtxosCache[scriptPubkey] ?? { + const processedUtxos = processedUtxoTasks.get(scriptPubkey) ?? { utxos: new Set(), processing: false, path, full: false } processedUtxos.utxos.add(utxo) - processedUtxosCache[scriptPubkey] = processedUtxos + processedUtxoTasks.add(scriptPubkey, processedUtxos) processedUtxos.full = processedUtxos.utxos.size >= requiredCount } diff --git a/src/common/utxobased/engine/taskCache.ts b/src/common/utxobased/engine/taskCache.ts new file mode 100644 index 00000000..676766a2 --- /dev/null +++ b/src/common/utxobased/engine/taskCache.ts @@ -0,0 +1,41 @@ +export interface TaskCache { + add: (key: string, value: T) => void + clear: () => void + get: (key: string) => T | undefined + keys: string[] + entries: Array<[string, T]> + remove: (key: string) => void + size: number +} + +export const makeTaskCache = (): TaskCache => { + let cache: { [key: string]: T } = {} + let size = 0 + + return { + add: (key: string, value: T) => { + cache[key] = value + ++size + }, + clear: () => { + cache = {} + }, + get: (key: string) => { + return cache[key] + }, + get keys() { + return Object.keys(cache) + }, + get entries() { + return Object.entries(cache) + }, + remove: (key: string) => { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete cache[key] + --size + }, + get size() { + return size + } + } +} From 64869f8f7a33fff04594ecdd0ad4f653817df3ba Mon Sep 17 00:00:00 2001 From: Samuel Holmes Date: Thu, 18 Nov 2021 21:53:16 -0800 Subject: [PATCH 3/4] Abstract the TaskState into it's own file --- .../utxobased/engine/makeUtxoEngineState.ts | 214 ++++-------------- src/common/utxobased/engine/taskState.ts | 128 +++++++++++ 2 files changed, 169 insertions(+), 173 deletions(-) create mode 100644 src/common/utxobased/engine/taskState.ts diff --git a/src/common/utxobased/engine/makeUtxoEngineState.ts b/src/common/utxobased/engine/makeUtxoEngineState.ts index b183d9d3..6e162608 100644 --- a/src/common/utxobased/engine/makeUtxoEngineState.ts +++ b/src/common/utxobased/engine/makeUtxoEngineState.ts @@ -48,7 +48,7 @@ import AwaitLock from './await-lock' import { BLOCKBOOK_TXS_PER_PAGE, CACHE_THROTTLE } from './constants' import { makeServerStates, ServerStates } from './makeServerStates' import { UTXOPluginWalletTools } from './makeUtxoWalletTools' -import { makeTaskCache, TaskCache } from './taskCache' +import { makeTaskState, TaskState } from './taskState' import { currencyFormatToPurposeType, getCurrencyFormatFromPurposeType, @@ -98,27 +98,7 @@ export function makeUtxoEngineState( const { supportedFormats } = walletInfo.keys - const taskState: TaskState = { - addressWatching: false, - blockWatching: false, - addressSubscribeTasks: makeTaskCache(), - transactionTasks: makeTaskCache(), - utxoTasks: makeTaskCache(), - rawUtxoTasks: makeTaskCache(), - processedUtxoTasks: makeTaskCache(), - updateTransactionTasks: makeTaskCache() - } - - const clearTaskState = (): void => { - taskState.addressWatching = false - taskState.blockWatching = false - taskState.addressSubscribeTasks.clear() - taskState.transactionTasks.clear() - taskState.utxoTasks.clear() - taskState.rawUtxoTasks.clear() - taskState.processedUtxoTasks.clear() - taskState.updateTransactionTasks.clear() - } + const taskState = makeTaskState({ walletTools, processor }) let processedCount = 0 let processedPercent = 0 @@ -206,16 +186,13 @@ export function makeUtxoEngineState( processing: false, path }) - addTransactionTasks( - commonArgs, + + await taskState.addTransactionTask( response.address, path.format, path.branch, - 0, - taskState.transactionTasks - ).catch(() => { - throw new Error('failed to add to transaction cache') - }) + 0 + ) setLookAhead(commonArgs).catch(e => { log(e) }) @@ -236,7 +213,6 @@ export function makeUtxoEngineState( for (const format of supportedFormats) { const branches = getFormatSupportedBranches(format) for (const branch of branches) { - const addressesToSubscribe = new Set() const branchAddressCount = processor.numAddressesByFormatPath({ format, changeIndex: branch @@ -254,12 +230,14 @@ export function makeUtxoEngineState( changeIndex: branch, format }) - addressesToSubscribe.add(address) + taskState.addressSubscribeTasks.add(address, { + path: { + format, + branch + }, + processing: false + }) } - addAddressSubscribeTasks(commonArgs.taskState, addressesToSubscribe, { - format, - branch - }) } } } @@ -277,7 +255,7 @@ export function makeUtxoEngineState( async stop(): Promise { serverStates.stop() - clearTaskState() + taskState.clearTaskState() running = false }, @@ -403,48 +381,6 @@ interface ShortPath { format: CurrencyFormat branch: number } -interface TaskState { - addressWatching: boolean - blockWatching: boolean - addressSubscribeTasks: TaskCache - utxoTasks: TaskCache - rawUtxoTasks: TaskCache - processedUtxoTasks: TaskCache - transactionTasks: TaskCache - updateTransactionTasks: TaskCache -} - -interface UpdateTransactionTask { - processing: boolean -} -interface AddressSubscribeTask { - processing: boolean - path: ShortPath -} - -interface UtxosTask { - processing: boolean - path: ShortPath -} - -interface ProcessedUtxoTask { - processing: boolean - full: boolean - utxos: Set - path: ShortPath -} -interface RawUtxoTask { - processing: boolean - path: ShortPath - address: IAddress - requiredCount: number -} -interface TransactionTask { - processing: boolean - path: ShortPath - page: number - blockHeight: number -} interface FormatArgs extends CommonArgs, ShortPath {} @@ -454,6 +390,7 @@ const setLookAhead = async (common: CommonArgs): Promise => { lock, processor, supportedFormats, + taskState, walletTools } = common @@ -473,7 +410,6 @@ const setLookAhead = async (common: CommonArgs): Promise => { } async function deriveKeys(shortPath: ShortPath): Promise { - const addressesToSubscribe = new Set() const formatPath: Omit = { format: shortPath.format, changeIndex: shortPath.branch @@ -503,8 +439,11 @@ const setLookAhead = async (common: CommonArgs): Promise => { makeIAddress({ scriptPubkey, redeemScript, path }) ) - // Add the displayAddress to the set of addresses to subscribe to after loop - addressesToSubscribe.add(address) + // Add the address to the set of addresses to subscribe to after loop + taskState.addressSubscribeTasks.add(address, { + path: shortPath, + processing: false + }) // Update the state for the loop lastUsedIndex = await processor.lastUsedIndexByFormatPath({ @@ -513,53 +452,7 @@ const setLookAhead = async (common: CommonArgs): Promise => { lookAheadIndex = lastUsedIndex + engineInfo.gapLimit nextAddressIndex = processor.numAddressesByFormatPath(formatPath) } - - // Add all the addresses to the subscribe cache for registering subscriptions later - addAddressSubscribeTasks(common.taskState, addressesToSubscribe, shortPath) - } -} - -const addAddressSubscribeTasks = ( - taskState: TaskState, - addresses: Set, - path: ShortPath -): void => { - addresses.forEach(address => { - taskState.addressSubscribeTasks.add(address, { - path, - processing: false - }) - }) - taskState.addressWatching = false -} - -const addTransactionTasks = async ( - args: CommonArgs, - address: string, - format: CurrencyFormat, - branch: number, - blockHeight: number, - transactionTasks: TaskCache -): Promise => { - const { walletTools, processor } = args - // Fetch the blockHeight for the address from the database - const scriptPubkey = walletTools.addressToScriptPubkey(address) - - if (blockHeight === 0) { - const { lastQueriedBlockHeight = 0 } = - (await processor.fetchAddress(scriptPubkey)) ?? {} - blockHeight = lastQueriedBlockHeight } - - transactionTasks.add(address, { - processing: false, - path: { - format, - branch - }, - page: 1, // Page starts on 1 - blockHeight - }) } interface TransactionChangedArgs { @@ -713,13 +606,11 @@ export const pickNextTask = async ( processing: false, path: task.path }) - await addTransactionTasks( - args, + await taskState.addTransactionTask( address, task.path.format, task.path.branch, - blockHeight, - transactionTasks + blockHeight ) } task.processing = true @@ -898,11 +789,12 @@ const internalDeriveScriptAddress = async ({ await processor.saveAddress( makeIAddress({ scriptPubkey, redeemScript, path }) ) - const addresses = new Set() - addresses.add(address) - addAddressSubscribeTasks(taskState, addresses, { - format: path.format, - branch: path.changeIndex + taskState.addressSubscribeTasks.add(address, { + path: { + format: path.format, + branch: path.changeIndex + }, + processing: false }) return { address, scriptPubkey, redeemScript } } @@ -1235,31 +1127,25 @@ const processRawUtxo = async ( uri, log } = args - const { rawUtxoTasks, processedUtxoTasks } = taskState + const { rawUtxoTasks } = taskState let scriptType: ScriptTypeEnum let script: string let redeemScript: string | undefined // Function to call once we are finished const done = (): void => - addProcessedUtxosTask( - processedUtxoTasks, - path, - address.scriptPubkey, - requiredCount, - { - id, - txid: utxo.txid, - vout: utxo.vout, - value: utxo.value, - scriptPubkey: address.scriptPubkey, - script, - redeemScript, - scriptType, - blockHeight: utxo.height ?? -1, - spent: false - } - ) + taskState.addProcessedUtxoTask(path, address.scriptPubkey, requiredCount, { + id, + txid: utxo.txid, + vout: utxo.vout, + value: utxo.value, + scriptPubkey: address.scriptPubkey, + script, + redeemScript, + scriptType, + blockHeight: utxo.height ?? -1, + spent: false + }) switch (currencyFormatToPurposeType(format)) { case BIP43PurposeTypeEnum.Airbitz: @@ -1326,21 +1212,3 @@ const processRawUtxo = async ( // Since we have everything, call done done() } - -const addProcessedUtxosTask = ( - processedUtxoTasks: TaskCache, - path: ShortPath, - scriptPubkey: string, - requiredCount: number, - utxo: IUTXO -): void => { - const processedUtxos = processedUtxoTasks.get(scriptPubkey) ?? { - utxos: new Set(), - processing: false, - path, - full: false - } - processedUtxos.utxos.add(utxo) - processedUtxoTasks.add(scriptPubkey, processedUtxos) - processedUtxos.full = processedUtxos.utxos.size >= requiredCount -} diff --git a/src/common/utxobased/engine/taskState.ts b/src/common/utxobased/engine/taskState.ts new file mode 100644 index 00000000..86807e8f --- /dev/null +++ b/src/common/utxobased/engine/taskState.ts @@ -0,0 +1,128 @@ +import { CurrencyFormat } from '../../plugin/types' +import { Processor } from '../db/makeProcessor' +import { IAddress, IUTXO } from '../db/types' +import { UTXOPluginWalletTools } from './makeUtxoWalletTools' +import { makeTaskCache, TaskCache } from './taskCache' + +export interface TaskState { + addressWatching: boolean + blockWatching: boolean + addressSubscribeTasks: TaskCache + utxoTasks: TaskCache + rawUtxoTasks: TaskCache + processedUtxoTasks: TaskCache + transactionTasks: TaskCache + updateTransactionTasks: TaskCache + clearTaskState: () => void + addProcessedUtxoTask: ( + path: ShortPath, + scriptPubkey: string, + requiredCount: number, + utxo: IUTXO + ) => void + addTransactionTask: ( + address: string, + format: CurrencyFormat, + branch: number, + blockHeight: number + ) => Promise +} + +export interface UpdateTransactionTask { + processing: boolean +} +export interface AddressSubscribeTask { + processing: boolean + path: ShortPath +} + +export interface UtxosTask { + processing: boolean + path: ShortPath +} + +export interface ProcessedUtxoTask { + processing: boolean + full: boolean + utxos: Set + path: ShortPath +} +export interface RawUtxoTask { + processing: boolean + path: ShortPath + address: IAddress + requiredCount: number +} +export interface TransactionTask { + processing: boolean + path: ShortPath + page: number + blockHeight: number +} + +export interface ShortPath { + format: CurrencyFormat + branch: number +} + +interface TaskStateConfig { + walletTools: UTXOPluginWalletTools + processor: Processor +} + +export const makeTaskState = ({ + walletTools, + processor +}: TaskStateConfig): TaskState => { + return { + addressWatching: false, + blockWatching: false, + addressSubscribeTasks: makeTaskCache(), + transactionTasks: makeTaskCache(), + utxoTasks: makeTaskCache(), + rawUtxoTasks: makeTaskCache(), + processedUtxoTasks: makeTaskCache(), + updateTransactionTasks: makeTaskCache(), + clearTaskState: function () { + this.addressWatching = false + this.blockWatching = false + this.addressSubscribeTasks.clear() + this.transactionTasks.clear() + this.utxoTasks.clear() + this.rawUtxoTasks.clear() + this.processedUtxoTasks.clear() + this.updateTransactionTasks.clear() + }, + addProcessedUtxoTask(path, scriptPubkey, requiredCount, utxo) { + const processedUtxos = this.processedUtxoTasks.get(scriptPubkey) ?? { + utxos: new Set(), + processing: false, + path, + full: false + } + processedUtxos.utxos.add(utxo) + this.processedUtxoTasks.add(scriptPubkey, processedUtxos) + processedUtxos.full = processedUtxos.utxos.size >= requiredCount + }, + async addTransactionTask(address, format, branch, blockHeight) { + // Fetch the blockHeight for the address from the database + const scriptPubkey = walletTools.addressToScriptPubkey(address) + + if (blockHeight === 0) { + const { lastQueriedBlockHeight = 0 } = + (await processor.fetchAddress(scriptPubkey)) ?? {} + blockHeight = lastQueriedBlockHeight + } + + this.transactionTasks.add(address, { + processing: false, + path: { + format, + branch + }, + page: 1, // Page starts on 1 + blockHeight + }) + } + } +} From 465b13fe2e4d5dd5d877503b7c9620f45d0610a3 Mon Sep 17 00:00:00 2001 From: Samuel Holmes Date: Fri, 19 Nov 2021 19:45:23 -0800 Subject: [PATCH 4/4] Remove blockWatching from TaskState This is unused code. --- src/common/utxobased/engine/taskState.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/common/utxobased/engine/taskState.ts b/src/common/utxobased/engine/taskState.ts index 86807e8f..668d3932 100644 --- a/src/common/utxobased/engine/taskState.ts +++ b/src/common/utxobased/engine/taskState.ts @@ -6,7 +6,6 @@ import { makeTaskCache, TaskCache } from './taskCache' export interface TaskState { addressWatching: boolean - blockWatching: boolean addressSubscribeTasks: TaskCache utxoTasks: TaskCache rawUtxoTasks: TaskCache @@ -76,7 +75,6 @@ export const makeTaskState = ({ }: TaskStateConfig): TaskState => { return { addressWatching: false, - blockWatching: false, addressSubscribeTasks: makeTaskCache(), transactionTasks: makeTaskCache(), utxoTasks: makeTaskCache(), @@ -85,7 +83,6 @@ export const makeTaskState = ({ updateTransactionTasks: makeTaskCache(), clearTaskState: function () { this.addressWatching = false - this.blockWatching = false this.addressSubscribeTasks.clear() this.transactionTasks.clear() this.utxoTasks.clear()