Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run AsyncConsumer tasks concurrently #1933

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

primal100
Copy link

@primal100 primal100 commented Oct 14, 2022

For #1924.

Exploratory PR which uses asyncio.create_task for dispatching tasks in a consumer, and add_done_callback to raise exceptions, rather than await for each task one by one.

A couple of notes:

  1. The existing behaviour is that if one of the dispatch tasks raises an uncaught exception, that exception will be propagated to the parent task running the consumer. As far as I can see, this kills the consumer and means that no more tasks will be processed by that consumer. When using create_task and add_done_callback as in this PR the exception will still be raised by the dispatch task and appear in the console but it will not interrupt the parent task which will continue processing new requests.
    In order for the existing tests to pass, I added some extra code to keep the existing behaviour which is why the code is a bit messier. But I am wondering which behaviour is more desirable? It seems at the moment that if a user sends invalid data that causes an unhandled exception in one of the dispatch tasks, the entire consumer will stop working. But there may be other reasons why the existing behaviour is desirable (it does force the programmer to handle any possible exceptions caused by user input which is a good thing).

  2. I did try using the Python 3.11 TaskGroup backport for previous versions which actually implements the existing behaviour for point 1) and also would make the code cleaner, but discovered it's not exactly plug and play in previous Python versions, so not really an option. I could maybe implement a similar but much more simpler TaskGroup which make the code a bit cleaner, if you think it's needed.

  3. One backward compatibility consideration as brought up in the issue is that tasks will be finished out of order. Personally I think this is fine, as this is a feature of Asyncronous programming which async protocols and implementations should be able to handle. What I didn't know when I created the issue is that the SyncConsumer inherits from the AsyncConsumer so I think a change in this behaviour would also impact the SyncConsumer. Still, I think it shouldn't be an issue as network protocols should be able to deal with this. That said, I am mostly used to working with Channels from a runworker perspective, so someone with more experience with implementing http protocols may have a better idea if this is an issue from a backward compatibility POV. One option would be add a class boolean attribute which controls either old or new behaviour. Overall I think the new behaviour is worth it to allow the consumer to run in a truly asyncronous way which is faster.

  4. Everything is ok with the existing tests. Do you want me to add profiling tests to measure performance improvement?

@carltongibson
Copy link
Member

Hi @primal100 — thanks for this — interesting!

Ref point 3: e.g. AsyncHttpConsumer.http_request waits for the whole body to be available before passing off to the handle method, so should be OK. (That was the only obvious case that came to mind... 🤔)

One option would be add a class boolean attribute which controls either old or new behaviour.

Fancy putting that in a commit (maybe just a diff 🤔) to have a look at too?

@carltongibson
Copy link
Member

Do you want me to add profiling...

Always happy to see some numbers 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AsyncConsumer runs tasks sequentially (should be in parallel?)
2 participants