Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot defer a task from another task #609

Open
chrisrink10 opened this issue May 11, 2022 · 3 comments
Open

Cannot defer a task from another task #609

chrisrink10 opened this issue May 11, 2022 · 3 comments
Labels
Issue contains: Exploration & Design decisions 🤯 We don't know how this will be implemented yet Issue contains: Some Python 🐍 This issue involves writing some Python code Issue type: Bug 🐞 Something isn't working

Comments

@chrisrink10
Copy link

Hi there! I am having an issue trying to .defer() a task from another task.

Issue

Here's a minimum example:

from procrastinate import AiopgConnector, App

app = App(connector=AiopgConnector())
app.open()


@app.task
def other_task():
    print("Hey! I'm here.")


@app.periodic(cron="* * * * *")
@app.task
def periodic_task(timestamp: Optional[int] = None):
    other_task.defer()


@app.task
def primary_task():
    other_task.defer()

The other_task.defer() fails in my worker process (started as procrastinate --verbose --app=exampleapp.app.app worker) with the following stack trace regardless of whether I defer from the Python shell (app.primary_task.defer()) or when the Periodic tick occurs:

Traceback (most recent call last):
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/worker.py", line 229, in run_job
    task_result = task(*job_args, **job.task_kwargs)
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/tasks.py", line 109, in __call__
    return self.func(*args, **kwargs)
  File "/Users/christopher/Projects/exampleapp/src/exampleapp/app.py", line 273, in periodic_task
    other_task.defer()
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/tasks.py", line 129, in defer
    return self.configure().defer(**task_kwargs)
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/jobs.py", line 163, in defer
    job = self.job_manager.defer_job(job=job)
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/manager.py", line 52, in defer_job
    result = self.connector.execute_query_one(
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/utils.py", line 149, in wrapper
    return sync_await(awaitable=awaitable)
  File "/Users/christopher/Library/Caches/pypoetry/virtualenvs/exampleapp-AzECwUBP-py3.10/lib/python3.10/site-packages/procrastinate/utils.py", line 200, in sync_await
    return loop.run_until_complete(awaitable)
  File "/Users/christopher/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py", line 617, in run_until_complete
    self._check_running()
  File "/Users/christopher/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py", line 577, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

I could probably work around this with how I structure my code -- I could probably move the logic I wish to defer into a shared function that's called from 2 places, but ideally I'm not forced to do that.

The reason I've structured things this way is that it doesn't seem to be possible to defer a periodic task directly (which may be a separate issue I'm happy to file as well). Obviously in normal use cases you probably wouldn't do this, but while I'm testing things out locally and iterating on development I'd like to be able to invoke a Periodic task directly (rather than waiting for it to run on whatever Cron schedule I've set).

Given the example above:

>>> from exampleapp import app
>>> app.periodic_task.defer()
2022-05-11 12:26:06,444 [DEBUG] procrastinate.jobs :: About to defer job exampleapp.app.periodic_task[None]()
2022-05-11 12:26:06,449 [INFO] procrastinate.jobs :: Deferred job exampleapp.app.periodic_task[87]()
87
>>> app.periodic_task.defer(15)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: Task.defer() takes 1 positional argument but 2 were given

The invocation for job number 87 fails with the same RuntimeError given above.

Expected

Without using project-specific jargon, what I'd like to have happen is that I'd like to enqueue task B from within periodic task A. I do not need to wait for the result of B and I would like control to return to A once B is enqueued.

For context, I am coming from a place where I've run a large application using Celery and Rabbit, so I may simply be applying Celery paradigms to Procrastinate incorrectly so please feel free to let me know if that's the case.

Comments

This project is neat and I decided to use it for a small little side project I'm working on. I especially liked for a small side project that I would not be required to set up a separate task queue for running jobs asynchronously and on a schedule. Thanks for the work here!

@ewjoachim
Copy link
Member

Damn, that's the sync / async compatibility biting us again :/

As a "small" step, I'd say defining your task like this will likely solve the issue:

@app.periodic(cron="* * * * *")
@app.task
async def periodic_task(timestamp: Optional[int] = None):
    await other_task.defer_async()

But I agree it's not ideal :/

@chrisrink10
Copy link
Author

Thanks for the reply @ewjoachim. I'd already figured out the Optional[int] trick with timestamp which worked reasonably well, though I was still running into the later issue with the the async/sync mismatch.

For now, I just ended up splitting up my logic into a "private" function that I can call both from a non-periodic task for testing and periodic task. I don't think I can go full async yet since I've got some other dependencies which don't support it, but thank you for the tip on a possible workaround.

@ewjoachim
Copy link
Member

Oh sorry if it wasn't clear. My comment was solely about the async/await stuff, the Optional[int] is copied/pasted from your own snippet above :)

I could be wrong, but I think it shouldn't be too impactful if you declare your function as async but don't actually do async stuff in it. That's kind of the same thing that happens as when procrastinate launches your sync task. The consequence will be that the event loop will not get control back until the end of you task, but if you're doing sync things, I think the event loop doesn't actually need to have control back.

@ewjoachim ewjoachim added Issue type: Bug 🐞 Something isn't working Issue contains: Exploration & Design decisions 🤯 We don't know how this will be implemented yet Issue contains: Some Python 🐍 This issue involves writing some Python code labels Aug 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue contains: Exploration & Design decisions 🤯 We don't know how this will be implemented yet Issue contains: Some Python 🐍 This issue involves writing some Python code Issue type: Bug 🐞 Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants