Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poll_pending_deposits concurrency #516

Closed
yuriescl opened this issue Sep 4, 2021 · 4 comments
Closed

poll_pending_deposits concurrency #516

yuriescl opened this issue Sep 4, 2021 · 4 comments

Comments

@yuriescl
Copy link
Contributor

yuriescl commented Sep 4, 2021

I'd like to poll deposits in parallel but it seems Polaris doesn't allow it.
I imagine that's the reason I get so many database is locked errors, but I have no concrete evidence yet.
Looking at this code section from polaris/management/commands/process_pending_deposits.py:

        pending_deposits = (
            Transaction.objects.filter(
                status__in=[
                    Transaction.STATUS.pending_user_transfer_start,
                    Transaction.STATUS.pending_external,
                ],
                kind=Transaction.KIND.deposit,
                pending_execution_attempt=False,
            )
            .select_related("asset")
            .select_for_update()
        )
        with django.db.transaction.atomic():
            ready_transactions = rri.poll_pending_deposits(pending_deposits)
            Transaction.objects.filter(
                id__in=[t.id for t in ready_transactions]
            ).update(pending_execution_attempt=True)

I can see that Polaris selects the items for update, then uses an atomic block to pass the queryset.
Buy if I try, for example, to create multiple threads inside rri.poll_pending_deposits, one to poll each deposit, and do transaction.save() inside that thread, I get some kind of deadlock, code just freezes in the save():

def poll_pending_deposits(
        self, pending_deposits: QuerySet, *args: List, **kwargs: Dict
    ) -> List[Transaction]:
        if pending_deposits.count() == 0:
            return []

        with ThreadPool(pending_deposits.count()) as pool:
            rets = pool.map(poll_pending_deposit, pending_deposits)
            pool.close()
            pool.join()

I might be doing it wrong, maybe I shouldn't be using threads, but since it's a management command, it's not really inside a worker process so it doesn't affect the web server.

How should I poll deposits in parallel?
Thanks

@yuriescl
Copy link
Contributor Author

yuriescl commented Sep 4, 2021

I tested with both SQLite and PostgreSQL. Both get a deadlock at save():

def poll_pending_deposit(transaction: Transaction):
    transaction.save()
    return None

@yuriescl
Copy link
Contributor Author

yuriescl commented Sep 4, 2021

Maybe process_pending_deposits shouldn't lock the entire queryset and shouldn't pass a queryset to the integration function.
Each transaction should be handled individually. We could maybe call the integration function asynchronously for each transaction.
I feel very limited not being able to fetch in parallel. Something that could take 1s takes 60s (60 transactions and 1 second to poll each).

@JakeUrban
Copy link
Contributor

Hi @yuriescl I didn't find the exact reason why you're getting deadlock errors but I suspect it has to do with using rows locked in one connection in another. Django's docs mention that it creates a different DB connection per thread, but the connection that collects the pending_deposits is locking the rows (or in SQLite's case, the entire table) so other connections may not be able to write to them. Depending on your DB's isolation level they may not be able to read either.

Maybe process_pending_deposits shouldn't lock the entire queryset and shouldn't pass a queryset to the integration function. Each transaction should be handled individually. We could maybe call the integration function asynchronously for each transaction.

I've considered doing this, but felt that changing existing integration functions to be async def's could potentially require significant refactoring of an anchor's implementation. For example, all DB queries would need to be wrapped in sync_to_async().

That doesn't mean that you cannot use async though. Polaris calls poll_pending_deposits() from a synchronous context by wrapping the function it's called from in a async_to_sync() call. You could still run each of your transaction polling jobs in the event loop for concurrent processing doing something like this:

import asyncio
from django.db.models import QuerySet
from asgiref.sync import async_to_sync

def poll_pending_deposits(pending_deposits: QuerySet):
    pending_deposits = list(pending_deposits)  # execute query
    poll_results = async_to_sync(poll_transactions)(pending_deposits)
    return [deposit for deposit, idx in enumerate(pending_deposits) if poll_results[idx] is True]
    

async def poll_transactions(transactions: List[Transaction]) -> List[bool]:
    return asyncio.gather(*[poll_transaction(t) for t in transactions], raise_exceptions=False)


async def poll_transaction(transaction: Transaction) -> bool:
    <polling code>
    return is_ready

@yuriescl
Copy link
Contributor Author

yuriescl commented Sep 8, 2021

Ok, I appreciate the suggestion, will fiddle around with async and see if I can get more speed on the polling. Thanks!

@yuriescl yuriescl closed this as completed Sep 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants