Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task_id must not be empty. Got None instead #117

Open
sindhujit1 opened this issue Jun 16, 2024 · 7 comments
Open

Task_id must not be empty. Got None instead #117

sindhujit1 opened this issue Jun 16, 2024 · 7 comments
Labels
wontfix This will not be worked on

Comments

@sindhujit1
Copy link

sindhujit1 commented Jun 16, 2024

My celery bars work fine normally, but when I try use it inside of a threadpool executor function, it fails. I pass the self keyword. Inside the threaded function, whenever I set the progress bar as:

progress_recorder.set_progress(8, seconds)

I get an error message:

Task_id must not be empty. Got None instead

on that line.

Any help would be appreciated!

@czue
Copy link
Owner

czue commented Jun 16, 2024

Can you provide sample code for reproducing this?

@sindhujit1
Copy link
Author

sindhujit1 commented Jun 16, 2024

Sure. So I have my main function which is:

result = basic_config_lb_task.delay(20,new_existing_fabric,new_fabric_check,fabricName)

@shared_task(bind=True)
def basic_config_lb_task(self,seconds,new_existing_fabric,new_fabric_check,fabricName):
  with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        progress_recorder.set_progress(6, seconds)   -> this works     

futures.append(executor.submit(views_lb_dns_helper.base_config_helper_commands,fabricName,vxlan_fabric_type,progress_recorder,seconds))

def base_config_helper_commands(self,fabricName,vxlan_fabric_type,progress_recorder,seconds):

        progress_recorder.set_progress(8, seconds)   -> this does not work

@sindhujit1
Copy link
Author

sindhujit1 commented Jun 16, 2024

that futures append line is actually inside of with , somehow when I paste it's not indenting properly.
celery==5.4.0
celery-progress==0.3
django-celery-beat==2.5.0

@sindhujit1
Copy link
Author

Let me know if you need any more info.. It seems to me somehow I need to pass the task_id inside of the threaded function, but not sure how.

@sindhujit1
Copy link
Author

Also is there any significance of the delay in this line:

result = my_task.delay(10)

Sometimes I see my task won't start and give an error on the delay line:

campus_networkinventory.tasks.my_task

It works 50% of the time.

@czue
Copy link
Owner

czue commented Jun 19, 2024

I was able to reproduce this with the following code:

class LoggingProgressRecorder(ProgressRecorder):

    def set_progress(self, current, total, description=""):
        state, meta = super().set_progress(current, total, description)
        print(self.task, state, meta)
        return state, meta

@shared_task(bind=True)
def main_task(self, seconds):
    progress_recorder = LoggingProgressRecorder(self)
    print(self.request.id)
    print(progress_recorder.task.request.id)
    # import pdb; pdb.set_trace()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        progress_recorder.set_progress(6, 10, "main task")
        futures.append(executor.submit(sub_task, self, seconds))


def sub_task(task, seconds):
    print(task)
    print(task.request.id)
    progress_recorder = LoggingProgressRecorder(task)
    progress_recorder.set_progress(8, 10)

Here's what ChatGPT has to say about it:


You're trying to use your LoggingProgressRecorder within a thread and it's failing because the self object doesn't have the Celery context when inside a thread created by concurrent.futures.ThreadPoolExecutor, which is out of Celery's control and hence doesn't work as you expect.

Another point is that Celery is not built to work with threads inside a task. The usual practice is to divide complex jobs into small tasks and let Celery handle their execution.

However, If you want to continue using threading, a possible solution could be to share the progress of the task with all the threads by placing it somewhere they could all access, for example through a shared variable, and update this from the main task. Here's a simplified example:

from concurrent.futures import ThreadPoolExecutor
from celery import shared_task


# shared variable
progress = {
    'current': 0,
    'total': 10,
}


@shared_task(bind=True)
def main_task(self, seconds):
    global progress  # access to shared variable
    print(self.request.id)
    
    with ThreadPoolExecutor() as executor:
        futures = []
        # setting some progress
        progress.update({'current': 6, 'description': 'Main task'})
        self.update_state(state='PROGRESS', meta=progress)

        futures.append(executor.submit(sub_task, seconds))


def sub_task(seconds):
    global progress  # access to shared variable
    print(progress['description'])

    # updating progress
    progress.update({'current': 8, 'description': 'Sub task work'})
    # Note: we don't update Celery task state here, because we are outside of the task 

In the above code, progress will be shared between the main_task and sub_task. However, you can update your task's state only from the main_task, because sub_task doesn't have access to the Celery task instance.


I'm inclined to agree that you should be using celery primitives like subtasks or groups to achieve this and not the threading library. I think figuring out how to support this is beyond the scope of this project.

@czue czue added the wontfix This will not be worked on label Jun 19, 2024
@sindhujit1
Copy link
Author

sindhujit1 commented Jun 19, 2024

So the email I got had something else compared to what I see now. SO I was trying that first:

@shared_task(bind=True)
def main_task(self, seconds):
    print('Main task ID:', [self.request.id])
    with ThreadPoolExecutor() as executor:
        futures = []
        futures.append(executor.submit(sub_task, self.request.id, seconds))

def sub_task(task_id, seconds):
    print('Task ID:', task_id)

but then the code fails on the sub_task where I have :
progress_recorder = ProgressRecorder(self)

saying self is not defined.

I tried passing in self,task_id,seconds, and now it fails on (inside the sub_task):

progress_recorder.set_progress(8, seconds)

task_id must not be empty. Got None instead..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants