diff --git a/src/common/utxobased/engine/makeUtxoEngineState.ts b/src/common/utxobased/engine/makeUtxoEngineState.ts index e8acbc42..6e162608 100644 --- a/src/common/utxobased/engine/makeUtxoEngineState.ts +++ b/src/common/utxobased/engine/makeUtxoEngineState.ts @@ -14,7 +14,6 @@ import { EngineInfo, PluginInfo } from '../../plugin/types' -import { removeItem } from '../../plugin/utils' import { Processor } from '../db/makeProcessor' import { toEdgeTransaction } from '../db/Models/ProcessorTransaction' import { @@ -49,6 +48,7 @@ import AwaitLock from './await-lock' import { BLOCKBOOK_TXS_PER_PAGE, CACHE_THROTTLE } from './constants' import { makeServerStates, ServerStates } from './makeServerStates' import { UTXOPluginWalletTools } from './makeUtxoWalletTools' +import { makeTaskState, TaskState } from './taskState' import { currencyFormatToPurposeType, getCurrencyFormatFromPurposeType, @@ -98,27 +98,7 @@ export function makeUtxoEngineState( const { supportedFormats } = walletInfo.keys - const taskCache: TaskCache = { - addressWatching: false, - blockWatching: false, - addressSubscribeCache: {}, - transactionsCache: {}, - utxosCache: {}, - rawUtxosCache: {}, - processedUtxosCache: {}, - updateTransactionsCache: {} - } - - const clearTaskCache = (): void => { - taskCache.addressWatching = false - taskCache.blockWatching = false - taskCache.addressSubscribeCache = {} - taskCache.transactionsCache = {} - taskCache.utxosCache = {} - taskCache.rawUtxosCache = {} - taskCache.processedUtxosCache = {} - taskCache.updateTransactionsCache = {} - } + const taskState = makeTaskState({ walletTools, processor }) let processedCount = 0 let processedPercent = 0 @@ -156,7 +136,7 @@ export function makeUtxoEngineState( walletTools, processor, emitter, - taskCache, + taskState, onAddressChecked, io: config.io, log, @@ -191,7 +171,7 @@ export function makeUtxoEngineState( }) for (const tx of txs) { if (tx == null) continue - taskCache.updateTransactionsCache[tx.txid] = { processing: false } + taskState.updateTransactionTasks.add(tx.txid, { processing: false }) } } ) @@ -199,23 +179,20 @@ export function makeUtxoEngineState( emitter.on( EngineEvent.NEW_ADDRESS_TRANSACTION, async (_uri: string, response: INewTransactionResponse): Promise => { - const state = taskCache.addressSubscribeCache[response.address] - if (state != null) { - const { path } = state - taskCache.utxosCache[response.address] = { + const task = taskState.addressSubscribeTasks.get(response.address) + if (task != null) { + const { path } = task + taskState.utxoTasks.add(response.address, { processing: false, path - } - addToTransactionCache( - commonArgs, + }) + + await taskState.addTransactionTask( response.address, path.format, path.branch, - 0, - taskCache.transactionsCache - ).catch(() => { - throw new Error('failed to add to transaction cache') - }) + 0 + ) setLookAhead(commonArgs).catch(e => { log(e) }) @@ -223,7 +200,7 @@ export function makeUtxoEngineState( } ) - // Initialize the addressSubscribeCache with the existing addresses already + // Initialize the addressSubscribeTasks with the existing addresses already // processed by the processor. This happens only once before any call to // setLookAhead. const initializeAddressSubscriptions = async (): Promise => { @@ -232,13 +209,10 @@ export function makeUtxoEngineState( processor ) - if ( - Object.keys(taskCache.addressSubscribeCache).length < totalAddressCount - ) { + if (taskState.addressSubscribeTasks.size < totalAddressCount) { for (const format of supportedFormats) { const branches = getFormatSupportedBranches(format) for (const branch of branches) { - const addressesToSubscribe = new Set() const branchAddressCount = processor.numAddressesByFormatPath({ format, changeIndex: branch @@ -256,16 +230,14 @@ export function makeUtxoEngineState( changeIndex: branch, format }) - addressesToSubscribe.add(address) + taskState.addressSubscribeTasks.add(address, { + path: { + format, + branch + }, + processing: false + }) } - addToAddressSubscribeCache( - commonArgs.taskCache, - addressesToSubscribe, - { - format, - branch - } - ) } } } @@ -283,7 +255,7 @@ export function makeUtxoEngineState( async stop(): Promise { serverStates.stop() - clearTaskCache() + taskState.clearTaskState() running = false }, @@ -336,7 +308,7 @@ export function makeUtxoEngineState( walletTools: commonArgs.walletTools, engineInfo: commonArgs.pluginInfo.engineInfo, processor: commonArgs.processor, - taskCache: commonArgs.taskCache, + taskState: commonArgs.taskState, format: walletInfo.keys.format, script }) @@ -396,7 +368,7 @@ interface CommonArgs { walletTools: UTXOPluginWalletTools processor: Processor emitter: EngineEmitter - taskCache: TaskCache + taskState: TaskState onAddressChecked: () => void io: EdgeIo log: EdgeLog @@ -409,50 +381,6 @@ interface ShortPath { format: CurrencyFormat branch: number } -interface TaskCache { - addressWatching: boolean - blockWatching: boolean - addressSubscribeCache: AddressSubscribeCache - utxosCache: UtxosCache - rawUtxosCache: RawUtxoCache - processedUtxosCache: ProcessedUtxoCache - transactionsCache: AddressTransactionCache - updateTransactionsCache: UpdateTransactionCache -} - -interface UpdateTransactionCache { - [key: string]: { processing: boolean } -} -interface AddressSubscribeCache { - [key: string]: { processing: boolean; path: ShortPath } -} -interface UtxosCache { - [key: string]: { processing: boolean; path: ShortPath } -} -interface ProcessedUtxoCache { - [key: string]: { - processing: boolean - full: boolean - utxos: Set - path: ShortPath - } -} -interface RawUtxoCache { - [key: string]: { - processing: boolean - path: ShortPath - address: IAddress - requiredCount: number - } -} -interface AddressTransactionCache { - [key: string]: { - processing: boolean - path: ShortPath - page: number - blockHeight: number - } -} interface FormatArgs extends CommonArgs, ShortPath {} @@ -462,6 +390,7 @@ const setLookAhead = async (common: CommonArgs): Promise => { lock, processor, supportedFormats, + taskState, walletTools } = common @@ -481,7 +410,6 @@ const setLookAhead = async (common: CommonArgs): Promise => { } async function deriveKeys(shortPath: ShortPath): Promise { - const addressesToSubscribe = new Set() const formatPath: Omit = { format: shortPath.format, changeIndex: shortPath.branch @@ -511,8 +439,11 @@ const setLookAhead = async (common: CommonArgs): Promise => { makeIAddress({ scriptPubkey, redeemScript, path }) ) - // Add the displayAddress to the set of addresses to subscribe to after loop - addressesToSubscribe.add(address) + // Add the address to the set of addresses to subscribe to after loop + taskState.addressSubscribeTasks.add(address, { + path: shortPath, + processing: false + }) // Update the state for the loop lastUsedIndex = await processor.lastUsedIndexByFormatPath({ @@ -521,56 +452,6 @@ const setLookAhead = async (common: CommonArgs): Promise => { lookAheadIndex = lastUsedIndex + engineInfo.gapLimit nextAddressIndex = processor.numAddressesByFormatPath(formatPath) } - - // Add all the addresses to the subscribe cache for registering subscriptions later - addToAddressSubscribeCache( - common.taskCache, - addressesToSubscribe, - shortPath - ) - } -} - -const addToAddressSubscribeCache = ( - taskCache: TaskCache, - addresses: Set, - path: ShortPath -): void => { - addresses.forEach(address => { - taskCache.addressSubscribeCache[address] = { - path, - processing: false - } - taskCache.addressWatching = false - }) -} - -const addToTransactionCache = async ( - args: CommonArgs, - address: string, - format: CurrencyFormat, - branch: number, - blockHeight: number, - transactions: AddressTransactionCache -): Promise => { - const { walletTools, processor } = args - // Fetch the blockHeight for the address from the database - const scriptPubkey = walletTools.addressToScriptPubkey(address) - - if (blockHeight === 0) { - const { lastQueriedBlockHeight = 0 } = - (await processor.fetchAddress(scriptPubkey)) ?? {} - blockHeight = lastQueriedBlockHeight - } - - transactions[address] = { - processing: false, - path: { - format, - branch - }, - page: 1, // Page starts on 1 - blockHeight } } @@ -610,16 +491,16 @@ export const pickNextTask = async ( args: NextTaskArgs // eslint-disable-next-line @typescript-eslint/no-explicit-any ): Promise | undefined | boolean> => { - const { taskCache, uri, serverStates } = args + const { taskState, uri, serverStates } = args const { - addressSubscribeCache, - utxosCache, - rawUtxosCache, - processedUtxosCache, - transactionsCache, - updateTransactionsCache - } = taskCache + addressSubscribeTasks, + utxoTasks, + rawUtxoTasks, + processedUtxoTasks, + transactionTasks, + updateTransactionTasks + } = taskState const serverState = serverStates.getServerState(uri) if (serverState == null) return @@ -641,32 +522,30 @@ export const pickNextTask = async ( } // Loop processed utxos, these are just database ops, triggers setLookAhead - if (Object.keys(processedUtxosCache).length > 0) { - for (const scriptPubkey of Object.keys(processedUtxosCache)) { + if (processedUtxoTasks.size > 0) { + for (const [scriptPubkey, task] of processedUtxoTasks.entries) { // Only process when all utxos for a specific address have been gathered - const state = processedUtxosCache[scriptPubkey] - if (!state.processing && state.full) { - state.processing = true + if (!task.processing && task.full) { + task.processing = true await processUtxoTransactions({ ...args, scriptPubkey, - utxos: state.utxos, - path: state.path + utxos: task.utxos, + path: task.path }) - removeItem(processedUtxosCache, scriptPubkey) + processedUtxoTasks.remove(scriptPubkey) return true } } } // Loop unparsed utxos, some require a network call to get the full tx data - for (const utxoString of Object.keys(rawUtxosCache)) { - const state = rawUtxosCache[utxoString] + for (const [utxoString, task] of rawUtxoTasks.entries) { const utxo: IAccountUTXO = JSON.parse(utxoString) if (utxo == null) continue - if (!state.processing) { + if (!task.processing) { // check if we need to fetch additional network content for legacy purpose type - const purposeType = currencyFormatToPurposeType(state.path.format) + const purposeType = currencyFormatToPurposeType(task.path.format) if ( purposeType === BIP43PurposeTypeEnum.Airbitz || purposeType === BIP43PurposeTypeEnum.Legacy @@ -674,13 +553,13 @@ export const pickNextTask = async ( // if we do need to make a network call, check with the serverState if (!serverStates.serverCanGetTx(uri, utxo.txid)) return } - state.processing = true - removeItem(rawUtxosCache, utxoString) + task.processing = true + rawUtxoTasks.remove(utxoString) const wsTask = await processRawUtxo({ ...args, - ...state, - ...state.path, - address: state.address, + ...task, + ...task.path, + address: task.address, utxo, id: `${utxo.txid}_${utxo.vout}` }) @@ -689,18 +568,17 @@ export const pickNextTask = async ( } // Loop to process addresses to utxos - for (const address of Object.keys(utxosCache)) { - const state = utxosCache[address] + for (const [address, task] of utxoTasks.entries) { // Check if we need to fetch address UTXOs - if (!state.processing && serverStates.serverCanGetAddress(uri, address)) { - state.processing = true + if (!task.processing && serverStates.serverCanGetAddress(uri, address)) { + task.processing = true - removeItem(utxosCache, address) + utxoTasks.remove(address) // Fetch and process address UTXOs const wsTask = await processAddressUtxos({ ...args, - ...state, + ...task, address }) wsTask.deferred.promise @@ -715,67 +593,58 @@ export const pickNextTask = async ( } // Check if there are any addresses pending to be subscribed - if ( - Object.keys(addressSubscribeCache).length > 0 && - !taskCache.addressWatching - ) { + if (addressSubscribeTasks.size > 0 && !taskState.addressWatching) { const blockHeight = serverStates.getBlockHeight(uri) // Loop each address that needs to be subscribed - for (const address of Object.keys(addressSubscribeCache)) { - const state = addressSubscribeCache[address] + for (const [address, task] of addressSubscribeTasks.entries) { // Add address in the cache to the set of addresses to watch - const { path, processing: subscribed } = state // only process newly watched addresses - if (subscribed) continue - if (path != null) { + if (task.processing) continue + if (task.path != null) { // Add the newly watched addresses to the UTXO cache - utxosCache[address] = { + utxoTasks.add(address, { processing: false, - path - } - await addToTransactionCache( - args, + path: task.path + }) + await taskState.addTransactionTask( address, - path.format, - path.branch, - blockHeight, - transactionsCache + task.path.format, + task.path.branch, + blockHeight ) } - state.processing = true + task.processing = true } - taskCache.addressWatching = true + taskState.addressWatching = true const queryTime = Date.now() const deferredAddressSub = new Deferred() deferredAddressSub.promise .then(() => { serverStates.serverScoreUp(uri, Date.now() - queryTime) + taskState.addressWatching = false }) .catch(() => { - taskCache.addressWatching = false + taskState.addressWatching = false }) deferredAddressSub.promise.catch(() => { - taskCache.addressWatching = false + taskState.addressWatching = false }) serverStates.watchAddresses( uri, - Array.from(Object.keys(addressSubscribeCache)), + addressSubscribeTasks.keys, deferredAddressSub ) return true } // filled when transactions potentially changed (e.g. through new block notification) - if (Object.keys(updateTransactionsCache).length > 0) { - for (const txId of Object.keys(updateTransactionsCache)) { - if ( - !updateTransactionsCache[txId].processing && - serverStates.serverCanGetTx(uri, txId) - ) { - updateTransactionsCache[txId].processing = true - removeItem(updateTransactionsCache, txId) + if (updateTransactionTasks.size > 0) { + for (const [txId, task] of updateTransactionTasks.entries) { + if (!task.processing && serverStates.serverCanGetTx(uri, txId)) { + task.processing = true + updateTransactionTasks.remove(txId) const updateTransactionTask = updateTransactions({ ...args, txId }) // once resolved, add the txid to the server cache updateTransactionTask.deferred.promise @@ -792,17 +661,16 @@ export const pickNextTask = async ( } // loop to get and process transaction history of single addresses, triggers setLookAhead - for (const address of Object.keys(transactionsCache)) { - const state = transactionsCache[address] - if (!state.processing && serverStates.serverCanGetAddress(uri, address)) { - state.processing = true + for (const [address, task] of transactionTasks.entries) { + if (!task.processing && serverStates.serverCanGetAddress(uri, address)) { + task.processing = true - removeItem(transactionsCache, address) + transactionTasks.remove(address) // Fetch and process address UTXOs const wsTask = await processAddressTransactions({ ...args, - ...state, + ...task, address }) wsTask.deferred.promise @@ -824,7 +692,7 @@ interface UpdateTransactionsArgs extends CommonArgs { const updateTransactions = ( args: UpdateTransactionsArgs ): WsTask => { - const { emitter, walletTools, txId, pluginInfo, processor, taskCache } = args + const { emitter, walletTools, txId, pluginInfo, processor, taskState } = args const deferredITransaction = new Deferred() deferredITransaction.promise .then(async (rawTx: ITransaction) => { @@ -849,7 +717,7 @@ const updateTransactions = ( }) }) .catch(() => { - taskCache.updateTransactionsCache[txId] = { processing: false } + taskState.updateTransactionTasks.add(txId, { processing: false }) }) return { ...transactionMessage(txId), @@ -881,7 +749,7 @@ interface DeriveScriptAddressArgs { engineInfo: EngineInfo processor: Processor format: CurrencyFormat - taskCache: TaskCache + taskState: TaskState script: string } @@ -896,7 +764,7 @@ const internalDeriveScriptAddress = async ({ engineInfo, processor, format, - taskCache, + taskState, script }: DeriveScriptAddressArgs): Promise => { if (engineInfo.scriptTemplates == null) { @@ -921,11 +789,12 @@ const internalDeriveScriptAddress = async ({ await processor.saveAddress( makeIAddress({ scriptPubkey, redeemScript, path }) ) - const addresses = new Set() - addresses.add(address) - addToAddressSubscribeCache(taskCache, addresses, { - format: path.format, - branch: path.changeIndex + taskState.addressSubscribeTasks.add(address, { + path: { + format: path.format, + branch: path.changeIndex + }, + processing: false }) return { address, scriptPubkey, redeemScript } } @@ -992,11 +861,10 @@ const processAddressTransactions = async ( processor, walletTools, path, - taskCache, + taskState, serverStates, uri } = args - const transactionsCache = taskCache.transactionsCache const scriptPubkey = walletTools.addressToScriptPubkey(address) const addressData = await processor.fetchAddress(scriptPubkey) @@ -1037,12 +905,12 @@ const processAddressTransactions = async ( // we have progressed through all of the blockbook pages if (page < totalPages) { // Add the address back to the cache, incrementing the page - transactionsCache[address] = { + taskState.transactionTasks.add(address, { path, processing: false, blockHeight, page: page + 1 - } + }) return } @@ -1129,12 +997,12 @@ const processAddressUtxos = async ( address, walletTools, processor, - taskCache, + taskState, path, serverStates, uri } = args - const { utxosCache, rawUtxosCache } = taskCache + const { utxoTasks, rawUtxoTasks } = taskState const queryTime = Date.now() const deferredIAccountUTXOs = new Deferred() deferredIAccountUTXOs.promise @@ -1146,21 +1014,21 @@ const processAddressUtxos = async ( return } for (const utxo of utxos) { - rawUtxosCache[JSON.stringify(utxo)] = { + rawUtxoTasks.add(JSON.stringify(utxo), { processing: false, requiredCount: utxos.length, path, // TypeScript yells otherwise address: { ...addressData, path: addressData.path } - } + }) } }) .catch(() => { args.processing = false - utxosCache[address] = { + utxoTasks.add(address, { processing: args.processing, path - } + }) }) return { ...addressUtxosMessage(address), @@ -1253,37 +1121,31 @@ const processRawUtxo = async ( format, processor, path, - taskCache, + taskState, requiredCount, serverStates, uri, log } = args - const { rawUtxosCache, processedUtxosCache } = taskCache + const { rawUtxoTasks } = taskState let scriptType: ScriptTypeEnum let script: string let redeemScript: string | undefined // Function to call once we are finished const done = (): void => - addToProcessedUtxosCache( - processedUtxosCache, - path, - address.scriptPubkey, - requiredCount, - { - id, - txid: utxo.txid, - vout: utxo.vout, - value: utxo.value, - scriptPubkey: address.scriptPubkey, - script, - redeemScript, - scriptType, - blockHeight: utxo.height ?? -1, - spent: false - } - ) + taskState.addProcessedUtxoTask(path, address.scriptPubkey, requiredCount, { + id, + txid: utxo.txid, + vout: utxo.vout, + value: utxo.value, + scriptPubkey: address.scriptPubkey, + script, + redeemScript, + scriptType, + blockHeight: utxo.height ?? -1, + spent: false + }) switch (currencyFormatToPurposeType(format)) { case BIP43PurposeTypeEnum.Airbitz: @@ -1312,12 +1174,12 @@ const processRawUtxo = async ( .catch(e => { // If something went wrong, add the UTXO back to the queue log('error in processed utxos cache, re-adding utxo to cache:', e) - rawUtxosCache[JSON.stringify(utxo)] = { + rawUtxoTasks.add(JSON.stringify(utxo), { processing: false, path, address, requiredCount - } + }) }) return { ...transactionMessage(utxo.txid), @@ -1350,21 +1212,3 @@ const processRawUtxo = async ( // Since we have everything, call done done() } - -const addToProcessedUtxosCache = ( - processedUtxosCache: ProcessedUtxoCache, - path: ShortPath, - scriptPubkey: string, - requiredCount: number, - utxo: IUTXO -): void => { - const processedUtxos = processedUtxosCache[scriptPubkey] ?? { - utxos: new Set(), - processing: false, - path, - full: false - } - processedUtxos.utxos.add(utxo) - processedUtxosCache[scriptPubkey] = processedUtxos - processedUtxos.full = processedUtxos.utxos.size >= requiredCount -} diff --git a/src/common/utxobased/engine/taskCache.ts b/src/common/utxobased/engine/taskCache.ts new file mode 100644 index 00000000..676766a2 --- /dev/null +++ b/src/common/utxobased/engine/taskCache.ts @@ -0,0 +1,41 @@ +export interface TaskCache { + add: (key: string, value: T) => void + clear: () => void + get: (key: string) => T | undefined + keys: string[] + entries: Array<[string, T]> + remove: (key: string) => void + size: number +} + +export const makeTaskCache = (): TaskCache => { + let cache: { [key: string]: T } = {} + let size = 0 + + return { + add: (key: string, value: T) => { + cache[key] = value + ++size + }, + clear: () => { + cache = {} + }, + get: (key: string) => { + return cache[key] + }, + get keys() { + return Object.keys(cache) + }, + get entries() { + return Object.entries(cache) + }, + remove: (key: string) => { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete cache[key] + --size + }, + get size() { + return size + } + } +} diff --git a/src/common/utxobased/engine/taskState.ts b/src/common/utxobased/engine/taskState.ts new file mode 100644 index 00000000..668d3932 --- /dev/null +++ b/src/common/utxobased/engine/taskState.ts @@ -0,0 +1,125 @@ +import { CurrencyFormat } from '../../plugin/types' +import { Processor } from '../db/makeProcessor' +import { IAddress, IUTXO } from '../db/types' +import { UTXOPluginWalletTools } from './makeUtxoWalletTools' +import { makeTaskCache, TaskCache } from './taskCache' + +export interface TaskState { + addressWatching: boolean + addressSubscribeTasks: TaskCache + utxoTasks: TaskCache + rawUtxoTasks: TaskCache + processedUtxoTasks: TaskCache + transactionTasks: TaskCache + updateTransactionTasks: TaskCache + clearTaskState: () => void + addProcessedUtxoTask: ( + path: ShortPath, + scriptPubkey: string, + requiredCount: number, + utxo: IUTXO + ) => void + addTransactionTask: ( + address: string, + format: CurrencyFormat, + branch: number, + blockHeight: number + ) => Promise +} + +export interface UpdateTransactionTask { + processing: boolean +} +export interface AddressSubscribeTask { + processing: boolean + path: ShortPath +} + +export interface UtxosTask { + processing: boolean + path: ShortPath +} + +export interface ProcessedUtxoTask { + processing: boolean + full: boolean + utxos: Set + path: ShortPath +} +export interface RawUtxoTask { + processing: boolean + path: ShortPath + address: IAddress + requiredCount: number +} +export interface TransactionTask { + processing: boolean + path: ShortPath + page: number + blockHeight: number +} + +export interface ShortPath { + format: CurrencyFormat + branch: number +} + +interface TaskStateConfig { + walletTools: UTXOPluginWalletTools + processor: Processor +} + +export const makeTaskState = ({ + walletTools, + processor +}: TaskStateConfig): TaskState => { + return { + addressWatching: false, + addressSubscribeTasks: makeTaskCache(), + transactionTasks: makeTaskCache(), + utxoTasks: makeTaskCache(), + rawUtxoTasks: makeTaskCache(), + processedUtxoTasks: makeTaskCache(), + updateTransactionTasks: makeTaskCache(), + clearTaskState: function () { + this.addressWatching = false + this.addressSubscribeTasks.clear() + this.transactionTasks.clear() + this.utxoTasks.clear() + this.rawUtxoTasks.clear() + this.processedUtxoTasks.clear() + this.updateTransactionTasks.clear() + }, + addProcessedUtxoTask(path, scriptPubkey, requiredCount, utxo) { + const processedUtxos = this.processedUtxoTasks.get(scriptPubkey) ?? { + utxos: new Set(), + processing: false, + path, + full: false + } + processedUtxos.utxos.add(utxo) + this.processedUtxoTasks.add(scriptPubkey, processedUtxos) + processedUtxos.full = processedUtxos.utxos.size >= requiredCount + }, + async addTransactionTask(address, format, branch, blockHeight) { + // Fetch the blockHeight for the address from the database + const scriptPubkey = walletTools.addressToScriptPubkey(address) + + if (blockHeight === 0) { + const { lastQueriedBlockHeight = 0 } = + (await processor.fetchAddress(scriptPubkey)) ?? {} + blockHeight = lastQueriedBlockHeight + } + + this.transactionTasks.add(address, { + processing: false, + path: { + format, + branch + }, + page: 1, // Page starts on 1 + blockHeight + }) + } + } +}