Skip to content

Conversation

@kun6fup4nd4
Copy link
Contributor

@kun6fup4nd4 kun6fup4nd4 commented May 30, 2025

PR Type

enhancement


Description

  • Refactored large Data.ts into modular files for maintainability

  • Introduced new modules: dataSync, cycleData, nodeSubscription, etc.

  • Updated imports and function calls throughout codebase

  • Improved data synchronization, validation, and subscription logic


Changes walkthrough 📝

Relevant files
Enhancement
14 files
dataSync.ts
Core data synchronization logic extracted from Data.ts     
+802/-0 
cycleData.ts
Cycle data collection and validation logic modularized     
+554/-0 
missingFunctions.ts
Utility and network sync functions separated from Data.ts
+356/-0 
nodeSubscription.ts
Node subscription and data sender management refactored   
+208/-0 
socketClient.ts
Socket client logic for data transfer modularized               
+225/-0 
dataComparison.ts
Data comparison utilities for receipts, cycles, and txs   
+154/-0 
networkConfig.ts
Network configuration sync and consensus radius logic       
+122/-0 
types.ts
Type definitions for modular Data layer                                   
+132/-0 
dataRequests.ts
Data request sending utilities for node communication       
+42/-0   
accountData.ts
Combined accounts data management extracted from Data.ts 
+41/-0   
Collector.ts
Updated to use new modular account data management             
+3/-1     
Cycles.ts
Updated to use modular subscription and validation logic 
+10/-10 
API.ts
Updated API routes to use modular Data layer functions     
+2/-2     
server.ts
Updated server startup to use new subscription logic         
+3/-3     
Additional files
2 files
Data.ts +213/-2924
Data_old.ts +2996/-0

Need help?
  • Type /help how to ... in the comments thread for any questions about PR-Agent usage.
  • Check out the documentation for more information.
  • @github-actions
    Copy link
    Contributor

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 5 🔵🔵🔵🔵🔵
    🏅 Score: 83
    🧪 No relevant tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Possible Data Consistency Issue

    The logic in collectCycleData for selecting the "best" cycle marker and handling multiple markers is complex and may lead to inconsistent state if not all edge cases are handled (e.g., race conditions, marker selection when cache is empty, or marker scoring ties). This should be carefully validated for correctness and determinism.

    export function collectCycleData(
      cycleData: subscriptionCycleData[] | P2PTypes.CycleCreatorTypes.CycleData[],
      senderInfo: string,
      source: string,
      dataSenders?: Map<NodeList.ConsensusNodeInfo['publicKey'], DataSender>
    ): void {
      const startTime = Date.now()
      const operationId = ArchiverLogging.generateOperationId()
    
      Logger.mainLogger.debug(
        `collectCycleData: Processing ${cycleData.length} cycles from ${senderInfo}, source: ${source}`
      )
    
      nestedCountersInstance.countEvent('collectCycleData', 'cycles_received', cycleData.length)
      nestedCountersInstance.countEvent('collectCycleData', 'source_' + source, 1)
    
      ArchiverLogging.logDataSync({
        sourceArchiver: senderInfo,
        targetArchiver: config.ARCHIVER_IP,
        cycle: 0,
        dataType: 'CYCLE_RECORD',
        dataHash: '',
        status: 'STARTED',
        operationId,
        metrics: {
          duration: 0,
          dataSize: StringUtils.safeStringify(cycleData).length,
        },
      })
    
      if (NodeList.activeListByIdSorted.length > 0) {
        const [ip, port] = senderInfo.split(':')
        const isInActiveNodes = NodeList.activeListByIdSorted.some(
          (node) => node.ip === ip && node.port.toString() === port
        )
        const isInActiveArchivers = State.activeArchivers.some(
          (archiver) => archiver.ip === ip && archiver.port.toString() === port
        )
        if (!isInActiveNodes && !isInActiveArchivers) {
          nestedCountersInstance.countEvent('collectCycleData', 'sender_not_active', 1)
          Logger.mainLogger.warn(`collectCycleData: Ignoring cycle data from non-active node: ${senderInfo}`)
          ArchiverLogging.logDataSync({
            sourceArchiver: senderInfo,
            targetArchiver: config.ARCHIVER_IP,
            cycle: 0,
            dataType: 'CYCLE_RECORD',
            dataHash: '',
            status: 'ERROR',
            operationId,
            metrics: {
              duration: Date.now() - startTime,
              dataSize: StringUtils.safeStringify(cycleData).length,
            },
            error: 'Sender not in active nodes or archivers',
          })
          return
        }
      }
    
      for (const cycle of cycleData) {
        Logger.mainLogger.debug(`collectCycleData: Processing cycle ${cycle.counter}, marker: ${cycle.marker}`)
    
        if (receivedCycleTracker[cycle.counter]?.saved === true) {
          nestedCountersInstance.countEvent('collectCycleData', 'cycle_already_saved_' + cycle.mode, 1)
          Logger.mainLogger.debug(`collectCycleData: Cycle ${cycle.counter} already saved, skipping`)
          ArchiverLogging.logDataSync({
            sourceArchiver: senderInfo,
            targetArchiver: config.ARCHIVER_IP,
            cycle: cycle.counter,
            dataType: 'CYCLE_RECORD',
            dataHash: cycle.marker,
            status: 'COMPLETE',
            operationId,
            metrics: {
              duration: Date.now() - startTime,
              dataSize: StringUtils.safeStringify(cycle).length,
            },
          })
          break
        }
    
        nestedCountersInstance.countEvent('collectCycleData', 'process_cycle_' + cycle.mode, 1)
    
        if (source === 'archiver') {
          nestedCountersInstance.countEvent('collectCycleData', 'direct_process_from_archiver', 1)
          Logger.mainLogger.debug(`collectCycleData: Processing cycle ${cycle.counter} from archiver directly`)
          processCycles([cycle as P2PTypes.CycleCreatorTypes.CycleData])
          continue
        }
    
        let receivedCertSigners = []
        if (NodeList.activeListByIdSorted.length > 0) {
          const certSigners = receivedCycleTracker[cycle.counter]?.[cycle.marker]?.['certSigners'] ?? new Set()
    
          try {
            Logger.mainLogger.debug(`collectCycleData: Original cycle data: ${UtilsTypes.safeStringify(cycle)}`)
            const cycleCopy = getRecordWithoutPostQ3Changes(cycle)
            const computedMarker = Cycles.computeCycleMarker(cycleCopy)
            Logger.mainLogger.debug(`collectCycleData: cycle copy ${UtilsTypes.safeStringify(cycleCopy)}`)
            Logger.mainLogger.debug(
              `collectCycleData: Computed marker for cycle ${cycle.counter}: ${computedMarker}, original marker: ${cycle.marker}`
            )
            Logger.mainLogger.debug(
              `collectCycleData: Validating ${(cycle as subscriptionCycleData).certificates?.length || 0} certificates for cycle ${cycle.counter}`
            )
    
            const validateCertsResult = validateCerts(
              (cycle as subscriptionCycleData).certificates,
              certSigners,
              computedMarker,
              cycleCopy as P2PTypes.CycleCreatorTypes.CycleData
            )
    
            if (validateCertsResult === false) {
              nestedCountersInstance.countEvent('collectCycleData', 'certificate_validation_failed_' + cycle.mode, 1)
              Logger.mainLogger.warn(
                `collectCycleData: Certificate validation failed for cycle ${cycle.counter} from ${senderInfo} in ${cycle.mode} mode`
              )
              ArchiverLogging.logDataSync({
                sourceArchiver: senderInfo,
                targetArchiver: config.ARCHIVER_IP,
                cycle: cycle.counter,
                dataType: 'CYCLE_RECORD',
                dataHash: cycle.marker,
                status: 'ERROR',
                operationId,
                metrics: {
                  duration: Date.now() - startTime,
                  dataSize: StringUtils.safeStringify(cycle).length,
                },
                error: 'Certificate validation failed',
              })
              break
            }
    
            nestedCountersInstance.countEvent('collectCycleData', 'certificate_validation_success_' + cycle.mode, 1)
            Logger.mainLogger.debug(`collectCycleData: Certificate validation successful for cycle ${cycle.counter}`)
          } catch (error) {
            nestedCountersInstance.countEvent('collectCycleData', 'certificate_validation_error_' + cycle.mode, 1)
            Logger.mainLogger.error(
              `collectCycleData: Error during certificate validation for cycle ${cycle.counter}: ${error}`
            )
            ArchiverLogging.logDataSync({
              sourceArchiver: senderInfo,
              targetArchiver: config.ARCHIVER_IP,
              cycle: cycle.counter,
              dataType: 'CYCLE_RECORD',
              dataHash: cycle.marker,
              status: 'ERROR',
              operationId,
              metrics: {
                duration: Date.now() - startTime,
                dataSize: StringUtils.safeStringify(cycle).length,
              },
              error: `Certificate validation error: ${error.message}`,
            })
            break
          }
        }
    
        receivedCertSigners = (cycle as subscriptionCycleData).certificates.map((cert) => cert.sign.owner)
        Logger.mainLogger.debug(
          `collectCycleData: Received ${receivedCertSigners.length} certificate signers for cycle ${cycle.counter}`
        )
        delete (cycle as subscriptionCycleData).certificates
    
        if (receivedCycleTracker[cycle.counter]) {
          if (receivedCycleTracker[cycle.counter][cycle.marker]) {
            nestedCountersInstance.countEvent('collectCycleData', 'add_signers_to_existing_marker_' + cycle.mode, 1)
            Logger.mainLogger.debug(`collectCycleData: Adding signers to existing marker for cycle ${cycle.counter}`)
            for (const signer of receivedCertSigners)
              receivedCycleTracker[cycle.counter][cycle.marker]['certSigners'].add(signer)
          } else {
            if (!validateCycleData(cycle)) {
              nestedCountersInstance.countEvent('collectCycleData', 'cycle_data_validation_failed_' + cycle.mode, 1)
              Logger.mainLogger.warn(
                `collectCycleData: Cycle data validation failed for cycle ${cycle.counter} with marker ${cycle.marker}`
              )
              ArchiverLogging.logDataSync({
                sourceArchiver: senderInfo,
                targetArchiver: config.ARCHIVER_IP,
                cycle: cycle.counter,
                dataType: 'CYCLE_RECORD',
                dataHash: cycle.marker,
                status: 'ERROR',
                operationId,
                metrics: {
                  duration: Date.now() - startTime,
                  dataSize: StringUtils.safeStringify(cycle).length,
                },
                error: 'Cycle data validation failed',
              })
              continue
            }
            nestedCountersInstance.countEvent('collectCycleData', 'create_new_marker_entry_' + cycle.mode, 1)
            Logger.mainLogger.debug(
              `collectCycleData: Creating new marker entry for cycle ${cycle.counter} with marker ${cycle.marker}`
            )
            receivedCycleTracker[cycle.counter][cycle.marker] = {
              cycleInfo: cycle,
              certSigners: new Set(receivedCertSigners),
            }
            Logger.mainLogger.debug('Different Cycle Record received', cycle.counter)
          }
          receivedCycleTracker[cycle.counter]['received']++
          Logger.mainLogger.debug(
            `collectCycleData: Cycle ${cycle.counter} received count: ${receivedCycleTracker[cycle.counter]['received']}`
          )
        } else {
          if (!validateCycleData(cycle)) {
            nestedCountersInstance.countEvent('collectCycleData', 'cycle_data_validation_failed_' + cycle.mode, 1)
            Logger.mainLogger.warn(
              `collectCycleData: Cycle data validation failed for cycle ${cycle.counter} with marker ${cycle.marker}`
            )
            ArchiverLogging.logDataSync({
              sourceArchiver: senderInfo,
              targetArchiver: config.ARCHIVER_IP,
              cycle: cycle.counter,
              dataType: 'CYCLE_RECORD',
              dataHash: cycle.marker,
              status: 'ERROR',
              operationId,
              metrics: {
                duration: Date.now() - startTime,
                dataSize: StringUtils.safeStringify(cycle).length,
              },
              error: 'Cycle data validation failed',
            })
            continue
          }
          nestedCountersInstance.countEvent('collectCycleData', 'create_new_cycle_tracker_' + cycle.mode, 1)
          Logger.mainLogger.debug(`collectCycleData: Creating new cycle tracker entry for cycle ${cycle.counter}`)
          const cycleEntry = {} as ReceivedCycleEntry & { received?: number; saved?: boolean }
          cycleEntry[cycle.marker] = {
            cycleInfo: cycle,
            certSigners: new Set(receivedCertSigners),
          }
          cycleEntry.received = 1
          cycleEntry.saved = false
          receivedCycleTracker[cycle.counter] = cycleEntry
        }
        if (config.VERBOSE) Logger.mainLogger.debug('Cycle received', cycle.counter, receivedCycleTracker[cycle.counter])
    
        if (NodeList.activeListByIdSorted.length === 0) {
          nestedCountersInstance.countEvent('collectCycleData', 'no_active_nodes_direct_process_' + cycle.mode, 1)
          Logger.mainLogger.debug(`collectCycleData: No active nodes, processing cycle ${cycle.counter} directly`)
          processCycles([receivedCycleTracker[cycle.counter][cycle.marker].cycleInfo as P2PTypes.CycleCreatorTypes.CycleData])
          continue
        }
    
        const requiredSenders = dataSenders && dataSenders.size ? Math.ceil(dataSenders.size / 2) : 1
        Logger.mainLogger.debug(
          `collectCycleData: Cycle ${cycle.counter} requires ${requiredSenders} senders, current count: ${receivedCycleTracker[cycle.counter]['received']}`
        )
    
        if (receivedCycleTracker[cycle.counter]['received'] >= requiredSenders) {
          nestedCountersInstance.countEvent('collectCycleData', 'enough_senders_process_' + cycle.mode, 1)
          Logger.mainLogger.debug(`collectCycleData: Cycle ${cycle.counter} has enough senders, processing`)
    
          let bestScore = 0
          let bestMarker = ''
          let prevMarker = ''
    
          if (cachedCycleRecords.length === 0) {
            updateCacheFromDB()
              .then(() => {
                if (cachedCycleRecords.length > 0 && cycle.counter - cachedCycleRecords[0].counter > 1) {
                  Logger.mainLogger.debug(`updateCacheFromDB: No previous marker found for cycle ${cycle.counter}`)
                }
                processCycleWithPrevMarker()
              })
              .catch((error) => {
                Logger.mainLogger.error(`updateCacheFromDB: Error updating cache from db: ${error}`)
              })
          } else {
            processCycleWithPrevMarker()
          }
    
          function processCycleWithPrevMarker() {
            if (cachedCycleRecords.length > 0 && cycle.counter - cachedCycleRecords[0].counter === 1) {
              prevMarker = cachedCycleRecords[0].marker
              Logger.mainLogger.debug(`collectCycleData: Previous marker for scoring: ${prevMarker}`)
            } else {
              Logger.mainLogger.debug(`collectCycleData: No previous marker found for cycle ${cycle.counter}`)
              return
            }
            const markers = Object.entries(receivedCycleTracker[cycle.counter])
              .filter(([key]) => key !== 'saved' && key !== 'received')
              .map(([, value]) => value)
    
            Logger.mainLogger.debug(
              `collectCycleData: Found ${markers.length} different markers for cycle ${cycle.counter}`
            )
    
            for (const marker of markers) {
              const scores = []
              for (const signer of marker['certSigners']) {
                const score = scoreCert(signer as string, prevMarker)
                scores.push(score)
                Logger.mainLogger.debug(`collectCycleData: Cert from ${signer} scored ${score}`)
              }
              const sum = scores
                .sort((a, b) => b - a)
                .slice(0, 3)
                .reduce((sum, score) => (sum += score), 0)
    
              Logger.mainLogger.debug(`collectCycleData: Marker ${marker['cycleInfo'].marker} scored ${sum}`)
    
              if (sum > bestScore) {
                bestScore = sum
                bestMarker = marker['cycleInfo'].marker
                Logger.mainLogger.debug(`collectCycleData: New best marker: ${bestMarker} with score ${bestScore}`)
              }
            }
    
            Logger.mainLogger.debug(
              `collectCycleData: Processing cycle ${cycle.counter} with best marker ${bestMarker}, score: ${bestScore}`
            )
            processCycles([receivedCycleTracker[cycle.counter][bestMarker].cycleInfo as P2PTypes.CycleCreatorTypes.CycleData])
            receivedCycleTracker[cycle.counter]['saved'] = true
    
            nestedCountersInstance.countEvent('collectCycleData', 'cycle_processed_successfully_' + cycle.mode, 1)
    
            ArchiverLogging.logDataSync({
              sourceArchiver: senderInfo,
              targetArchiver: config.ARCHIVER_IP,
              cycle: cycle.counter,
              dataType: 'CYCLE_RECORD',
              dataHash: bestMarker,
              status: 'COMPLETE',
              operationId,
              metrics: {
                duration: Date.now() - startTime,
                dataSize: StringUtils.safeStringify(receivedCycleTracker[cycle.counter][bestMarker].cycleInfo).length,
              },
            })
          }
        }
      }
    
      if (Object.keys(receivedCycleTracker).length > maxCyclesInCycleTracker) {
        nestedCountersInstance.countEvent('collectCycleData', 'cleanup_old_cycles', 1)
        Logger.mainLogger.debug(
          `collectCycleData: Cleaning up old cycles, current count: ${Object.keys(receivedCycleTracker).length}`
        )
        for (const counter of Object.keys(receivedCycleTracker)) {
          if (parseInt(counter) < getCurrentCycleCounter() - maxCyclesInCycleTracker) {
            let totalTimes = receivedCycleTracker[counter]['received']
            let logCycle = false
    
            const markers = Object.entries(receivedCycleTracker[counter])
              .filter(([key]) => key !== 'saved' && key !== 'received')
              .map(([, value]) => value)
    
            if (markers.length > 1) {
              logCycle = true
              nestedCountersInstance.countEvent('collectCycleData', 'multiple_markers_for_cycle', 1)
            }
    
            for (const marker of markers) {
              Logger.mainLogger.debug(
                'Cycle',
                counter,
                marker,
                logCycle ? StringUtils.safeStringify([...receivedCycleTracker[counter][marker]['certSigners']]) : '',
                logCycle ? receivedCycleTracker[counter][marker] : ''
              )
            }
            if (logCycle) Logger.mainLogger.debug(`Cycle ${counter} has ${markers.length} different markers!`)
            Logger.mainLogger.debug(`Received ${totalTimes} times for cycle counter ${counter}`)
            delete receivedCycleTracker[counter]
          }
        }
      }
    }
    Error Handling Robustness

    Several sync and download functions (e.g., syncCyclesAndNodeList, syncReceiptsByCycle, downloadOldCycles) rely on throwing or logging errors but may not always propagate failures or handle partial data gracefully. Review error handling to ensure partial syncs do not leave the system in an inconsistent state.

    export async function syncCyclesAndNodeList(lastStoredCycleCount = 0): Promise<void> {
      Logger.mainLogger.debug('Getting newest cycle...')
      const cycleToSyncTo = await getNewestCycleFromArchivers()
      Logger.mainLogger.debug('cycleToSyncTo', cycleToSyncTo)
      Logger.mainLogger.debug(`Syncing till cycle ${cycleToSyncTo.counter}...`)
    
      const cyclesToGet = 2 * Math.floor(Math.sqrt(cycleToSyncTo.active)) + 2
      Logger.mainLogger.debug(`Cycles to get is ${cyclesToGet}`)
    
      const CycleChain = []
      const squasher = new ChangeSquasher()
    
      CycleChain.unshift(cycleToSyncTo)
      squasher.addChange(parse(CycleChain[0]))
    
      do {
        let end: number = CycleChain[0].counter - 1
        let start: number = end - cyclesToGet
        if (start < 0) start = 0
        if (end < start) end = start
        Logger.mainLogger.debug(`Getting cycles ${start} - ${end}...`)
        const prevCycles = await fetchCycleRecords(start, end)
    
        if (prevCycles.length < 1) throw new Error('Got empty previous cycles')
    
        prevCycles.sort((a, b) => (a.counter > b.counter ? -1 : 1))
    
        let prepended = 0
        for (const prevCycle of prevCycles) {
          if (validateCycle(prevCycle, CycleChain[0]) === false) {
            Logger.mainLogger.error(`Record ${prevCycle.counter} failed validation`)
            break
          }
          CycleChain.unshift(prevCycle)
          squasher.addChange(parse(prevCycle))
          prepended++
    
          if (
            squasher.final.updated.length >= activeNodeCount(cycleToSyncTo) &&
            squasher.final.added.length >= totalNodeCount(cycleToSyncTo)
          ) {
            break
          }
        }
    
        Logger.mainLogger.debug(`Got ${squasher.final.updated.length} active nodes, need ${activeNodeCount(cycleToSyncTo)}`)
        Logger.mainLogger.debug(`Got ${squasher.final.added.length} total nodes, need ${totalNodeCount(cycleToSyncTo)}`)
        if (squasher.final.added.length < totalNodeCount(cycleToSyncTo))
          Logger.mainLogger.debug('Short on nodes. Need to get more cycles. Cycle:' + cycleToSyncTo.counter)
    
        if (prepended < 1) throw new Error('Unable to prepend any previous cycles')
      } while (
        squasher.final.updated.length < activeNodeCount(cycleToSyncTo) ||
        squasher.final.added.length < totalNodeCount(cycleToSyncTo)
      )
    
      applyNodeListChange(squasher.final)
      Logger.mainLogger.debug('NodeList after sync', NodeList.getActiveList())
    
      for (let i = 0; i < CycleChain.length; i++) {
        const record = CycleChain[i]
        Cycles.CycleChain.set(record.counter, { ...record })
        if (i === CycleChain.length - 1) await storeCycleData(CycleChain)
        Cycles.setCurrentCycleCounter(record.counter)
        Cycles.setCurrentCycleMarker(record.marker)
      }
      Logger.mainLogger.debug('Cycle chain is synced. Size of CycleChain', Cycles.CycleChain.size)
    
      let endCycle = CycleChain[0].counter - 1
      Logger.mainLogger.debug('endCycle counter', endCycle, 'lastStoredCycleCount', lastStoredCycleCount)
      if (endCycle > lastStoredCycleCount) {
        Logger.mainLogger.debug(`Downloading old cycles from cycles ${lastStoredCycleCount} to cycle ${endCycle}!`)
      }
      let savedCycleRecord = CycleChain[0]
      while (endCycle > lastStoredCycleCount) {
        let nextEnd: number = endCycle - MAX_CYCLES_PER_REQUEST
        if (nextEnd < 0) nextEnd = 0
        Logger.mainLogger.debug(`Getting cycles ${nextEnd} - ${endCycle} ...`)
        const prevCycles = await fetchCycleRecords(nextEnd, endCycle)
    
        if (!prevCycles || prevCycles.length < 1) throw new Error('Got empty previous cycles')
        prevCycles.sort((a, b) => (a.counter > b.counter ? -1 : 1))
    
        const combineCycles = []
        for (const prevCycle of prevCycles) {
          if (validateCycle(prevCycle, savedCycleRecord) === false) {
            Logger.mainLogger.error(`Record ${prevCycle.counter} failed validation`)
            Logger.mainLogger.debug('fail', prevCycle, savedCycleRecord)
            break
          }
          savedCycleRecord = prevCycle
          combineCycles.push(prevCycle)
        }
        await storeCycleData(combineCycles)
        endCycle = nextEnd - 1
      }
    }
    
    export async function syncCyclesAndNodeListV2(
      activeArchivers: State.ArchiverNodeInfo[],
      lastStoredCycleCount = 0
    ): Promise<boolean> {
      Logger.mainLogger.debug('Syncing validators and latest cycle...')
      const syncResult = await syncV2(activeArchivers)
      let cycleToSyncTo: P2PTypes.CycleCreatorTypes.CycleData
      if (syncResult.isOk()) {
        cycleToSyncTo = syncResult.value
      } else {
        throw syncResult.error
      }
    
      Logger.mainLogger.debug('cycleToSyncTo', cycleToSyncTo)
      Logger.mainLogger.debug(`Syncing till cycle ${cycleToSyncTo.counter}...`)
    
      await processCycles([cycleToSyncTo])
    
      await downloadOldCycles(cycleToSyncTo, lastStoredCycleCount)
    
      return true
    }
    
    export async function syncCyclesBetweenCycles(lastStoredCycle = 0, cycleToSyncTo = 0): Promise<boolean> {
      const MAX_RETRIES = 3
      let retryCount = 0
    
      let startCycle = lastStoredCycle
      let endCycle = startCycle + MAX_CYCLES_PER_REQUEST
    
      while (cycleToSyncTo > startCycle) {
        if (endCycle > cycleToSyncTo) endCycle = cycleToSyncTo
        Logger.mainLogger.debug(`Downloading cycles from ${startCycle} to ${endCycle}`)
    
        let success = false
        retryCount = 0
    
        while (!success && retryCount < MAX_RETRIES) {
          const res = (await queryFromArchivers(
            RequestDataType.CYCLE,
            {
              start: startCycle,
              end: endCycle,
            },
            QUERY_TIMEOUT_MAX
          )) as ArchiverCycleResponse
    
          if (res && res.cycleInfo) {
            const cycles = res.cycleInfo as P2PTypes.CycleCreatorTypes.CycleData[]
            Logger.mainLogger.debug(`Downloaded cycles`, cycles.length)
    
            let validCyclesCount = 0
            for (const cycle of cycles) {
              if (!validateCycleData(cycle)) {
                Logger.mainLogger.debug('Found invalid cycle data')
                continue
              }
              await processCycles([cycle])
              validCyclesCount++
            }
    
            success = true
    
            if (cycles.length < MAX_CYCLES_PER_REQUEST || validCyclesCount === 0) {
              startCycle += Math.max(cycles.length, 1)
              endCycle = startCycle + MAX_CYCLES_PER_REQUEST
              if (startCycle >= cycleToSyncTo) {
                Logger.mainLogger.debug('Sync cycles completed!')
                return true
              }
              break
            }
          } else {
            Logger.mainLogger.debug(`Invalid cycle download response, attempt ${retryCount + 1} of ${MAX_RETRIES}`)
            retryCount++
            if (retryCount >= MAX_RETRIES) {
              Logger.mainLogger.error('Max retries reached for cycle download')
              return false
            }
          }
        }
    
        if (success) {
          startCycle = endCycle + 1
          endCycle += MAX_CYCLES_PER_REQUEST
        }
      }
    
      return true
    }
    
    export async function syncReceipts(): Promise<void> {
      const MAX_RETRIES = 3
      let retryCount = 0
    
      const lastUpdatedCycle = getLastUpdatedCycle()
      Logger.mainLogger.debug(`[syncReceipts] Last updated cycle from tracker: ${lastUpdatedCycle}`)
    
      let startCycle = 0
      if (lastUpdatedCycle > 0) {
        Logger.mainLogger.info(`[syncReceipts] Starting receipt sync from last updated cycle: ${lastUpdatedCycle}`)
        startCycle = Math.max(lastUpdatedCycle - config.checkpoint.syncCycleBuffer, 0)
        await syncReceiptsByCycle(startCycle)
        return
      }
    
      let response: ArchiverTotalDataResponse = await getTotalDataFromArchivers()
      if (!response || response.totalReceipts < 0) {
        return
      }
    
      let { totalReceipts } = response
      if (totalReceipts < 1) return
    
      let complete = false
      let start = 0
      let end = start + MAX_RECEIPTS_PER_REQUEST
    
      while (!complete) {
        if (end >= totalReceipts) {
          response = await getTotalDataFromArchivers()
          if (response && response.totalReceipts > 0) {
            if (response.totalReceipts > totalReceipts) totalReceipts = response.totalReceipts
            Logger.mainLogger.debug('totalReceiptsToSync', totalReceipts)
          }
        }
    
        Logger.mainLogger.debug(`Downloading receipts from ${start} to ${end}`)
        let success = false
        retryCount = 0
    
        while (!success && retryCount < MAX_RETRIES) {
          const res = (await queryFromArchivers(
            RequestDataType.RECEIPT,
            {
              start: start,
              end: end,
            },
            QUERY_TIMEOUT_MAX
          )) as ArchiverReceiptResponse
    
          if (res && res.receipts) {
            const downloadedReceipts = res.receipts as ReceiptDB.Receipt[]
            Logger.mainLogger.debug(`Downloaded receipts`, downloadedReceipts.length)
            await storeReceiptData(downloadedReceipts, '', false, false, true)
            success = true
    
            if (downloadedReceipts.length < MAX_RECEIPTS_PER_REQUEST) {
              start += downloadedReceipts.length
              end = start + MAX_RECEIPTS_PER_REQUEST
              response = await getTotalDataFromArchivers()
              if (response && response.totalReceipts > 0) {
                if (response.totalReceipts > totalReceipts) totalReceipts = response.totalReceipts
                if (start >= totalReceipts) {
                  complete = true
                  Logger.mainLogger.debug('Download receipts completed')
                }
              }
            }
          } else {
            Logger.mainLogger.debug(`Invalid download response, attempt ${retryCount + 1} of ${MAX_RETRIES}`)
            retryCount++
            if (retryCount >= MAX_RETRIES) {
              Logger.mainLogger.error('Max retries reached for receipt download')
              start = end + 1
              end += MAX_RECEIPTS_PER_REQUEST
              if (start >= totalReceipts) {
                complete = true
              }
            }
          }
        }
    
        if (success) {
          start = end + 1
          end += MAX_RECEIPTS_PER_REQUEST
        }
      }
    
      Logger.mainLogger.debug('Sync receipts data completed!')
    }
    
    class ArchiverSelector {
      private archivers: ArchiverWithRetries[]
      private currentIndex: number = 0
      private readonly maxRetries: number = 3
    
      constructor() {
        this.archivers = State.otherArchivers.map((archiver) => ({
          archiver,
          retriesLeft: this.maxRetries,
        }))
        Utils.shuffleArray(this.archivers)
      }
    
      getCurrentArchiver(): State.ArchiverNodeInfo | null {
        if (this.currentIndex >= this.archivers.length) {
          return null
        }
        return this.archivers[this.currentIndex].archiver
      }
    
      markCurrentArchiverFailed(): State.ArchiverNodeInfo | null {
        if (this.currentIndex >= this.archivers.length) {
          return null
        }
    
        this.archivers[this.currentIndex].retriesLeft--
    
        if (this.archivers[this.currentIndex].retriesLeft <= 0) {
          this.currentIndex++
        }
    
        return this.getCurrentArchiver()
      }
    
      hasMoreArchivers(): boolean {
        return this.currentIndex < this.archivers.length
      }
    }
    
    export async function syncReceiptsByCycle(lastStoredReceiptCycle = 0, cycleToSyncTo = 0): Promise<boolean> {
      if (lastStoredReceiptCycle === 0) {
        const trackedCycle = getLastUpdatedCycle()
        if (trackedCycle > 0) {
          Logger.mainLogger.info(`[syncReceiptsByCycle] Using last updated cycle from tracker: ${trackedCycle}`)
          lastStoredReceiptCycle = Math.max(trackedCycle - config.checkpoint.syncCycleBuffer, 0)
        }
      }
    
      let totalCycles = cycleToSyncTo
      let totalReceipts = 0
      if (cycleToSyncTo === 0) {
        const response: ArchiverTotalDataResponse = await getTotalDataFromArchivers()
        if (!response || response.totalReceipts < 0) {
          return false
        }
        totalCycles = response.totalCycles
        totalReceipts = response.totalReceipts
      }
      let startCycle = lastStoredReceiptCycle
      let endCycle = startCycle + MAX_BETWEEN_CYCLES_PER_REQUEST
      let receiptsCountToSyncBetweenCycles = 0
      let savedReceiptsCountBetweenCycles = 0
      let totalSavedReceiptsCount = 0
      let archiverSelector = new ArchiverSelector()
    
      while (true) {
        if (endCycle > totalCycles) {
          endCycle = totalCycles
          totalSavedReceiptsCount = await ReceiptDB.queryReceiptCount()
        }
        if (cycleToSyncTo > 0) {
          if (startCycle > cycleToSyncTo) {
            Logger.mainLogger.debug(`Sync receipts data completed!`)
            return true
          }
          if (endCycle > cycleToSyncTo) endCycle = cycleToSyncTo
        }
        Logger.mainLogger.debug(`Downloading receipts between cycles ${startCycle} to ${endCycle}`)
    
        const currentArchiver = archiverSelector.getCurrentArchiver()
        if (!currentArchiver) {
          Logger.mainLogger.error('No more archivers available for syncing receipts')
          return false
        }
    
        const res = (await P2P.getJson(
          `http://${currentArchiver.ip}:${currentArchiver.port}/receipt?startCycle=${startCycle}&endCycle=${endCycle}&type=tally`,
          QUERY_TIMEOUT_MAX
        )) as ArchiverReceiptResponse
    
        if (res && res.receipts) {
          const downloadedReceiptsByCycles = res.receipts as ReceiptDB.ReceiptCount[]
          for (const receiptData of downloadedReceiptsByCycles) {
            receiptsCountToSyncBetweenCycles += receiptData.receiptCount
          }
          Logger.mainLogger.debug(
            `Total receipts to sync between cycles ${startCycle} to ${endCycle}`,
            receiptsCountToSyncBetweenCycles
          )
          startCycle = endCycle + 1
          endCycle += MAX_BETWEEN_CYCLES_PER_REQUEST
        } else {
          Logger.mainLogger.error(
            `Failed to download receipts tally between cycles ${startCycle} to ${endCycle} from archiver ${currentArchiver.ip}:${currentArchiver.port}`
          )
          archiverSelector.markCurrentArchiverFailed()
          if (!archiverSelector.hasMoreArchivers()) {
            Logger.mainLogger.error('All archivers failed for syncing receipts')
            return false
          }
          continue
        }
    
        Logger.mainLogger.debug(
          `Downloading receipts between cycles ${lastStoredReceiptCycle} to ${startCycle - 1}`,
          receiptsCountToSyncBetweenCycles
        )
    
        const MAX_RETRIES = 3
        let start = 0
        let end = start + MAX_RECEIPTS_PER_REQUEST
        let hasMoreReceiptsToDownload = receiptsCountToSyncBetweenCycles > 0
    
        while (hasMoreReceiptsToDownload) {
          let success = false
          let retryCount = 0
    
          while (!success && retryCount < MAX_RETRIES) {
            Logger.mainLogger.debug(`Downloading receipts from index ${start} to ${end}`)
            const res2 = (await P2P.getJson(
              `http://${currentArchiver.ip}:${currentArchiver.port}/receipt?startCycle=${lastStoredReceiptCycle}&endCycle=${
                startCycle - 1
              }&start=${start}&end=${end}`,
              QUERY_TIMEOUT_MAX
            )) as ArchiverReceiptResponse
    
            if (res2 && res2.receipts) {
              const downloadedReceipts = res2.receipts as ReceiptDB.Receipt[]
              Logger.mainLogger.debug(`Downloaded receipts`, downloadedReceipts.length)
              const storageResult = await storeReceiptData(downloadedReceipts, '', false, false, true) as StoredReceiptObject
              savedReceiptsCountBetweenCycles += storageResult.receipts.length
              success = true
    
              if (downloadedReceipts.length === 0 || downloadedReceipts.length < MAX_RECEIPTS_PER_REQUEST) {
                updateLastUpdatedCycle(startCycle - 1)
                Logger.mainLogger.debug(`[syncReceiptsByCycle] Updated cycle tracker to cycle ${startCycle - 1}`)
                hasMoreReceiptsToDownload = false
                receiptsCountToSyncBetweenCycles = 0
                savedReceiptsCountBetweenCycles = 0
                break
              }
            } else {
              Logger.mainLogger.debug(`Invalid download response, attempt ${retryCount + 1} of ${MAX_RETRIES}`)
              retryCount++
              if (retryCount >= MAX_RETRIES) {
                Logger.mainLogger.error('Max retries reached for receipt download')
                archiverSelector.markCurrentArchiverFailed()
                if (!archiverSelector.hasMoreArchivers()) {
                  Logger.mainLogger.error('All archivers failed for syncing receipts')
                  return false
                }
              }
            }
          }
          if (success) {
            start = end + 1
            end += MAX_RECEIPTS_PER_REQUEST
          }
        }
    
        if (cycleToSyncTo === 0 && totalSavedReceiptsCount >= totalReceipts) {
          Logger.mainLogger.debug('Sync receipts data by cycle completed!')
          return true
        }
      }
    }
    
    export async function syncCyclesAndTxsData(lastStoredCycle = 0, cycleToSyncTo = 0): Promise<void> {
      let response: ArchiverTotalDataResponse = await getTotalDataFromArchivers()
      if (!response || response.totalCycles < 0) {
        return
      }
    
      let { totalCycles, totalReceipts } = response
      if (totalCycles < 1) return
    
      let completeForCycle = false
      let startCycle = lastStoredCycle
      let endCycle = startCycle + MAX_CYCLES_PER_REQUEST
      const MAX_RETRIES = 3
      let retryCount = 0
    
      while (!completeForCycle || startCycle < endCycle) {
        if (endCycle >= totalCycles) {
          endCycle = totalCycles
          completeForCycle = true
          response = await getTotalDataFromArchivers()
          if (response && response.totalCycles > 0) {
            if (response.totalCycles > totalCycles) totalCycles = response.totalCycles
            if (response.totalReceipts > totalReceipts) totalReceipts = response.totalReceipts
            Logger.mainLogger.debug('totalCyclesToSync', totalCycles, 'totalReceiptsToSync', totalReceipts)
          }
        }
    
        if (!completeForCycle) {
          Logger.mainLogger.debug(`Downloading cycles from ${startCycle} to ${endCycle}`)
          let success = false
          retryCount = 0
    
          while (!success && retryCount < MAX_RETRIES) {
            const res = (await queryFromArchivers(
              RequestDataType.CYCLE,
              {
                start: startCycle,
                end: endCycle,
              },
              QUERY_TIMEOUT_MAX
            )) as ArchiverCycleResponse
            if (res && res.cycleInfo) {
              const cycles = res.cycleInfo
              Logger.mainLogger.debug(`Downloaded cycles`, cycles.length)
              for (const cycle of cycles) {
                if (!validateCycleData(cycle)) {
                  Logger.mainLogger.debug('Found invalid cycle data')
                  continue
                }
                await processCycles([cycle])
              }
              success = true
    
              const highestCycle = cycles.reduce((max, cycle) => Math.max(max, cycle.counter), 0)
              if (highestCycle > 0) {
                updateLastUpdatedCycle(highestCycle)
                Logger.mainLogger.debug(`[syncCyclesAndTxsData] Updated cycle tracker to cycle ${highestCycle}`)
              }
    
              if (cycles.length < MAX_CYCLES_PER_REQUEST) {
                startCycle += cycles.length + 1
                endCycle += cycles.length + MAX_CYCLES_PER_REQUEST
              }
            } else {
              Logger.mainLogger.debug(`Invalid cycle download response, attempt ${retryCount + 1} of ${MAX_RETRIES}`)
              retryCount++
              if (retryCount >= MAX_RETRIES) {
                Logger.mainLogger.error('Max retries reached for cycle download')
              }
            }
          }
          if (success) {
            startCycle = endCycle + 1
            endCycle += MAX_CYCLES_PER_REQUEST
          }
        }
      }
      Logger.mainLogger.debug('Sync Cycle, Receipt & Original-Tx data completed!')
    }
    
    export const syncCyclesAndTxsDataBetweenCycles = async (lastStoredCycle = 0, cycleToSyncTo = 0): Promise<void> => {
      Logger.mainLogger.debug(`Syncing cycles and txs data between cycles ${lastStoredCycle} and ${cycleToSyncTo}`)
      await syncCyclesBetweenCycles(lastStoredCycle, cycleToSyncTo)
      await syncReceiptsByCycle(lastStoredCycle, cycleToSyncTo)
    }
    
    async function downloadOldCycles(
      cycleToSyncTo: P2PTypes.CycleCreatorTypes.CycleData,
      lastStoredCycleCount: number
    ): Promise<void> {
      let endCycle = cycleToSyncTo.counter - 1
      Logger.mainLogger.debug('endCycle counter', endCycle, 'lastStoredCycleCount', lastStoredCycleCount)
      if (endCycle > lastStoredCycleCount) {
        Logger.mainLogger.debug(`Downloading old cycles from cycles ${lastStoredCycleCount} to cycle ${endCycle}!`)
      }
    
      let savedCycleRecord = cycleToSyncTo
      const MAX_RETRY_COUNT = 3
      let retryCount = 0
      while (endCycle > lastStoredCycleCount) {
        let startCycle: number = endCycle - MAX_CYCLES_PER_REQUEST
        if (startCycle < 0) startCycle = 0
        if (startCycle < lastStoredCycleCount) startCycle = lastStoredCycleCount
        Logger.mainLogger.debug(`Getting cycles ${startCycle} - ${endCycle} ...`)
        const res = (await queryFromArchivers(
          RequestDataType.CYCLE,
          {
            start: startCycle,
            end: endCycle,
          },
          QUERY_TIMEOUT_MAX
        )) as ArchiverCycleResponse
        if (!res || !res.cycleInfo || !Array.isArray(res.cycleInfo) || res.cycleInfo.length === 0) {
          Logger.mainLogger.error(`Can't fetch data from cycle ${startCycle} to cycle ${endCycle}  from archivers`)
          if (retryCount < MAX_RETRY_COUNT) {
            retryCount++
            continue
          } else {
            endCycle = startCycle - 1
            retryCount = 0
          }
        }
    
        const prevCycles = res.cycleInfo as P2PTypes.CycleCreatorTypes.CycleData[]
        if (prevCycles) prevCycles.sort((a, b) => (a.counter > b.counter ? -1 : 1))
    
        const combineCycles: P2PTypes.CycleCreatorTypes.CycleData[] = []
        for (const prevCycle of prevCycles) {
          if (validateCycle(prevCycle, savedCycleRecord) === false) {
            Logger.mainLogger.error(`Record ${prevCycle.counter} failed validation`)
            Logger.mainLogger.debug('fail', prevCycle, savedCycleRecord)
          }
          savedCycleRecord = prevCycle
          combineCycles.push(prevCycle)
        }
        await storeCycleData(combineCycles)
        endCycle = startCycle - 1
      }
    }
    Type Safety and Interface Consistency

    The new type definitions (e.g., subscriptionCycleData, DataSender, response interfaces) are critical for data integrity. Ensure that all usages across the refactored modules are consistent and that type mismatches or missing fields are caught by TypeScript.

    import { P2P as P2PTypes } from '@shardeum-foundation/lib-types'
    import { Socket as SocketIOSocket } from 'socket.io-client'
    import * as NodeList from '../NodeList'
    import * as ReceiptDB from '../dbstore/receipts'
    import * as OriginalTxDB from '../dbstore/originalTxsData'
    import * as State from '../State'
    
    export interface CombinedAccountsData {
      accounts: any[]
      receipts: any[]
    }
    
    export interface ValidatorColletor {
      nodeId: string
      signedMessage: {
        cycleRecord: {
          counter: number
          mode: string
          archiversAtShutdown?: string[]
        }
      }
    }
    
    export interface ValidatorCycle {
      node: string
      cycle: P2PTypes.CycleCreatorTypes.CycleData
    }
    
    export interface Signer {
      owner: string
      sig: string
    }
    
    export interface DataRequestTypes {
      SUBSCRIBE: 'SUBSCRIBE'
      UNSUBSCRIBE: 'UNSUBSCRIBE'
    }
    
    export const DataRequestTypes: DataRequestTypes = {
      SUBSCRIBE: 'SUBSCRIBE',
      UNSUBSCRIBE: 'UNSUBSCRIBE',
    }
    
    export interface subscriptionCycleData extends Omit<P2PTypes.CycleCreatorTypes.CycleData, 'certificate'> {
      certificate?: P2PTypes.CycleCreatorTypes.CycleCert
      certificates?: P2PTypes.CycleCreatorTypes.CycleCert[]
    }
    
    export interface DataRequest<T extends P2PTypes.SnapshotTypes.ValidTypes> {
      type: P2PTypes.SnapshotTypes.TypeName<T>
      lastData: P2PTypes.SnapshotTypes.TypeIndex<T>
    }
    
    export interface DataResponse<T> {
      publicKey?: NodeList.ConsensusNodeInfo['publicKey']
      recipient: NodeList.ConsensusNodeInfo['publicKey']
      responses: { [name: string]: T }
    }
    
    export interface StoredReceiptObject {
      receipts: ReceiptDB.Receipt[]
      success: boolean
    }
    
    export interface CountResponse {
      receipts?: ArchiverReceiptCountResponse
      originalTxs?: ArchiverOriginalTxsCountResponse
      cycles?: number
      accounts?: number
    }
    
    export interface ArchiverAccountResponse {
      accounts: any[]
      transactions?: any[]
      receipts?: any[]
      totalAccounts?: number
    }
    
    export interface ArchiverCycleResponse {
      cycleInfo: P2PTypes.CycleCreatorTypes.CycleData[]
    }
    
    export interface ArchiverReceiptResponse {
      receipts: ReceiptDB.Receipt[] | ReceiptDB.ReceiptCount[]
    }
    
    export interface ArchiverOriginalTxResponse {
      originalTxs: OriginalTxDB.OriginalTxData[] | OriginalTxDB.OriginalTxDataCount[]
    }
    
    export interface ArchiverReceiptCountResponse {
      countByCycles: { cycle: number; count: number }[]
      startCycle: number
      endCycle: number
    }
    
    export interface ArchiverOriginalTxsCountResponse {
      countByCycles: { cycle: number; count: number }[]
      startCycle: number
      endCycle: number
    }
    
    export interface ArchiverTotalDataResponse {
      totalReceipts: number
      totalCycles: number
      totalAccounts: number
      totalOriginalTxs: number
    }
    
    // Re-export RequestDataType from API.ts to avoid duplication
    export { RequestDataType } from '../API'
    
    export interface RequestDataCountType {
      type: 'tally'
    }
    
    export interface DataSender {
      nodeInfo: NodeList.ConsensusNodeInfo
      types: (keyof typeof P2PTypes.SnapshotTypes.TypeNames)[]
      contactTimeout?: NodeJS.Timeout | null
      replaceTimeout?: NodeJS.Timeout | null
    }
    
    export interface CompareResponse {
      success: boolean
      matchedCycle: number
    }
    
    export interface ArchiverWithRetries {
      archiver: State.ArchiverNodeInfo
      retriesLeft: number
    }

    Copy link

    @github-advanced-security github-advanced-security bot left a comment

    Choose a reason for hiding this comment

    The reason will be displayed to describe this comment to others. Learn more.

    CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.

    @github-actions
    Copy link
    Contributor

    Failed to generate code suggestions for PR

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

    Projects

    None yet

    Development

    Successfully merging this pull request may close these issues.

    2 participants