Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor account processing implementation to be more efficient #552

Closed
4 tasks
nullpointer0x00 opened this issue Oct 1, 2024 · 2 comments · Fixed by #553
Closed
4 tasks

Refactor account processing implementation to be more efficient #552

nullpointer0x00 opened this issue Oct 1, 2024 · 2 comments · Fixed by #553
Labels
enhancement New feature or request

Comments

@nullpointer0x00
Copy link
Contributor

Summary

Refactor the current account processing implementation to be single-threaded and sequential, reducing unnecessary load on the database by removing aggressive polling of the database.

Problem Definition

The current implementation spawns 5 coroutines that continuously poll the ProcessQueueRecord table, resulting in up to 330 requests per second to that table. This seems too aggressive for what it is is doing.

Once this is released, we need to confirm that the single threaded solution is enough and can keep up.

Proposal

Current code that starts 5 coroutine while loops that never end:

@Scheduled(initialDelay = 5000L, fixedDelay = 5000L)
    fun startAccountProcess() = runBlocking {
        ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT)
        val producer = startAccountProcess()
        repeat(5) { accountProcessor(producer) }
    }

    @OptIn(ExperimentalCoroutinesApi::class)
    fun CoroutineScope.startAccountProcess() = produce {
        while (true) {
            ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT).firstOrNull()?.let {
                try {
                    transaction { it.apply { this.processing = true } }
                    send(it.processValue)
                } catch (_: Exception) {
                }
            }
        }
    }

    fun CoroutineScope.accountProcessor(channel: ReceiveChannel<String>) = launch(Dispatchers.IO) {
        for (msg in channel) {
            accountService.updateTokenCounts(msg)
            ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, msg)
        }
    }

New impl:

@Scheduled(initialDelay = 5000L, fixedDelay = 5000L)
fun startAccountProcess() {
    processAccountRecords()
}

fun processAccountRecords() {
    ProcessQueueRecord.reset(ProcessQueueType.ACCOUNT)
    val records = ProcessQueueRecord.findByType(ProcessQueueType.ACCOUNT)
    for (record in records) {
        try {
            transaction { record.apply { this.processing = true } }
            accountService.updateTokenCounts(record.processValue)
            ProcessQueueRecord.delete(ProcessQueueType.ACCOUNT, record.processValue)
        } catch (_: Exception) {
        }
    }
}

For Admin Use

  • Not duplicate issue
  • Appropriate labels applied
  • Appropriate contributors tagged
  • Contributor assigned/self-assigned
@nullpointer0x00 nullpointer0x00 added the enhancement New feature or request label Oct 1, 2024
@iramiller
Copy link
Member

It seems like this code is approaching the accounts wrong... instead of starting with the large number of accounts and looking for changes ... why does it not focus on the small number of changes and apply them to the impacted accounts?

@nullpointer0x00
Copy link
Contributor Author

I think it is trying to update accounts when they do change. For every transaction it tries to extract all account addresses associated with it. Adds them to a processing queue and has another thread process them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants