Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database cuncurrency problem #2782

Closed
diegocorradini opened this issue Sep 12, 2019 · 11 comments
Closed

Database cuncurrency problem #2782

diegocorradini opened this issue Sep 12, 2019 · 11 comments
Labels

Comments

@diegocorradini
Copy link

Hi all,

I am developing a pipeline in Django where different workers read/write on a database.
Everything is working fine if I am using only one worker.
Otherwise I get random errors in the tasks like: pop from empty list, EOF detected, database is locked.. depending on the databaes backend.
Yes because I tried with Postgres, MySQL and SQLite..

This is the line where I am configuring the workers:
luigi.build(Workflow(id=session_id)], workers=4)

Any help would be really appreciate since I am stuck with this problem.

@kwilcox
Copy link
Contributor

kwilcox commented Sep 13, 2019

Not much to go on but I'll bite. What is the context you are calling luigi.build from? Are you calling from the same process that runs the Django app? A backend worker? What spawns that worker?

@diegocorradini
Copy link
Author

Thanks kwilcox. Then I am calling luigi.build from a view as a different thread with Celery.

from celery import shared_task

@shared_task
def luigi_pipeline_task(name, id, session_name):
    run_pipeline_task.update_state(state='PROGRESS')
    pipeline = LuigiPipeline()
    pipeline.run(name, id, session_name)
    return

def run_analysis_view(request, session_name):
    id = 1
    if Session.objects.all().count() > 0:
        id = id + int(Session.objects.all().order_by("-id")[0].pk)
    luigi_pipeline_task.apply_async([id])

Hope it helps.

@kwilcox
Copy link
Contributor

kwilcox commented Sep 16, 2019

You may want to try abstracting out the luigi portions of your code into something you can run and test outside of the Django / Celery. There are just too many variables/unknowns here for someone to be able to give you a solution. In the very least you would need to post the contents of your LuigiPipeline class since it seems to be what calls luigi.build and might have a red flag in it.

@diegocorradini
Copy link
Author

Hi again, sorry the delay but I was out of office.
I tried running the pipeline ouside the Celery environment. And happened again.
DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq

Therefore is not a problem raised by Celery.
It seems related to the Django ORM.. but i think most of the people use this combination.

@diegocorradini
Copy link
Author

It seems that Django and Postgres have problem to run queries from different processes

Is it possible to spawn workers as threads?
Otherwise could you suggest a workaround?

@kwilcox
Copy link
Contributor

kwilcox commented Oct 21, 2019

Database connections are not typically shared over different processes. You'll need to be creating the database connection in the luigi tasks/targets that are you using and not relying on any Django ORM connection objects passed into your worker. Try setting force_multiprocessing=True in your worker config and I'll guess you'll see the same errors when running with workers=1. If you can post your LuigiPipeline class it will be easier to help you out!

@diegocorradini
Copy link
Author

luigi.build([LuigiPipeline()], workers=3, no_lock=False)

class LuigiPipeline(sl.WorkflowTask):
    def workflow(self):
        tasks = []
        from django.db import connection
        connection.close()
        # Analyse all samples uploaded
        analyses = Analysis.objects.filter(status='UPLOAD')
        for analysis in analyses:
            wf = self.new_task('Foo', FooWorkflow, analysis_id=analysis.pk)
            tasks.append(wf)
        return tasks

While this is an example of Task class:

class SamToSortedIndexedBam(sl.Task):
    # the input
    in_sam = None
    analysis_id = sl.Parameter()

    def reads_count(self):
        mapped_reads = SamTools.viewCount(self.in_sam().path)
        if mapped_reads:
            Analysis.objects.get(ph=self.analysis_id)
            obj.mapped_reads = int(mapped_reads)
           obj.save()

   def out_sorted_indexed_bam(self):
       out_bai = os.path.splitext(self.in_sam().path)[0] + '.sorted.bam'
       return sl.TargetInfo(self, out_bai)

   def run(self):
        self.reads_count()
        bam_file = os.path.splitext(self.in_sam().path)[0] + '.bam'
        SamTools.sam_to_bam('1', self.in_sam().path, bam_file)
        sorted_bam_file = os.path.splitext(bam_file)[0] + '.sorted.bam'
        SamTools.sort_bam('1', bam_file, sorted_bam_file)
        SamTools.bam_index(sorted_bam_file)

@diegocorradini
Copy link
Author

For whom is interested I found a solution.

When using multiprocessing in Python, you should close all connection every time a process is spawned. Tehrefore in Luigi inside every Task.

from django.db import connection
connection.close()

@jevhen-ponomarenko
Copy link

@diegocorradini Hi could you please share with me how you set up django and luigi? I am trying to use django-luigi to access Django ORM but keep running into problem of ModuleNotFoundError: No module named ...

@stale
Copy link

stale bot commented Mar 28, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

@stale stale bot added the wontfix label Mar 28, 2020
@stale stale bot closed this as completed Apr 11, 2020
@simonpicard
Copy link

Hello, I am facing a similar issue with Luigi and SQL Alchemy.
I have tasks which modify database objects via SQL Alchemy ORM.
As per the SQL Alchemy documentation, I should dispose the database engine at the start of a new process: https://docs.sqlalchemy.org/en/20/core/connections.html#engine-disposal

How can I achieve such "on start worker" engine disposal?

I would like to avoid disposing the database engine more than once per worker, hence I do not want to add the dispose instruction on each of my Luigi tasks.

Many thanks for the help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants