Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically handle parallel queries with an async Lock #852

Open
1 task done
LiraNuna opened this issue Oct 2, 2022 · 4 comments
Open
1 task done

Automatically handle parallel queries with an async Lock #852

LiraNuna opened this issue Oct 2, 2022 · 4 comments

Comments

@LiraNuna
Copy link

LiraNuna commented Oct 2, 2022

Is your feature request related to a problem?

When performing two "parallel" queries via asyncio.gather, an error is thrown:
RuntimeError: readexactly() called while another coroutine is already waiting for incoming data

Describe the solution you'd like

I have implemented a simple yet effective way to remedy this issue however it's quite "gross", requiring monkey patching as I am unable to perform the required changes in a type safe manner:

class LockedCursor(Cursor):
    def get_lock(self) -> Lock:
        if not hasattr(self.connection, 'lock'):
            self.connection.lock = Lock()

        return self.connection.lock

    async def execute(self, query_string: str, args: Optional[Iterable[Any]] = None):
        async with self.get_lock():
            return await super().execute(query_string, args)

I recommend this feature to be standard on the base cursor, as it's often time impossible to avoid parallel queries happening at the same time.

Describe alternatives you've considered

Another option would involve the connection holding an instance of asyncio.Queue to orchestrate any parallel queries going out, however I personally believe this is overkill and is not as simple to understand as a lock.

Another option I considered is subclassing each of the Pool, Connection and Cursor class to inject LockedCursor when calling connection.cursor, however that has proven to be very difficult and full of boilerplate and repetitive code to override the constructor and other methods simply to add a lock.

Additional context

I am willing to help prepare a PR if this is a solution that the maintainers would approve of. I am also willing to discuss other potential solutions and approaches.

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
@Nothing4You
Copy link
Collaborator

Hi,
the simplest solution for this that would require no code changes is to just acquire() a connection from the pool for each task.
By providing appropriate minsize and maxsize values the pool will automatically take care of creating the necessary amount of connections and it will not return on acquire() until a connection is available.

@LiraNuna
Copy link
Author

LiraNuna commented Oct 23, 2022

I don't think this is solution will work if need to work within the same connection transaction, which is my requirement.
On our server, we use a connection per request where each connection is also a transaction we can rollback if the request fails (due to an exception or other error).

@Nothing4You
Copy link
Collaborator

Indeed, transactions will get lost with that.

@LiraNuna
Copy link
Author

Any progress on this? An asyncio.Lock is an elegant solution that is easy to implement. I'm willing to contribute a PR if necessary, I just want to make sure it will get quickly reviewed if I spend the time on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants