Skip to content

Commit

Permalink
Merge pull request #3 from pgorecki/async
Browse files Browse the repository at this point in the history
Add async support. Closes #2
  • Loading branch information
pgorecki authored Mar 17, 2024
2 parents 1c03603 + 633f76a commit d8095d0
Show file tree
Hide file tree
Showing 15 changed files with 703 additions and 98 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# Change Log

## [0.10.0] - 2024-03-17

### Added

- Add async support in `Application`. New methods: `call_async`, `execute_async`, `publish_async`
- Add async support in `TransactionContext`. New methods: `call_async`, `execute_async`, `publish_async`

## [0.9.0] - 2024-02-28

### Added

- Add Sphinx docs

### Changed

- Rename `Task` to `Command`
Expand All @@ -12,4 +22,4 @@

## [0.8.0] - 2024-01-08

No history.
Early release.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
format:
pre-commit run --all-files

test:
pytest
mypy lato
pytest --doctest-modules lato

docs_autobuild:
sphinx-autobuild --watch lato -E docs docs/_build/html
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Based on dependency injection and Python 3.6+ type hints.

- **Minimalistic**: Intuitive and lean API for rapid development without the bloat.

- **Async Support**: Concurrency and async / await is supported.


## Installation

Expand Down
22 changes: 22 additions & 0 deletions examples/async_example/example1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio

from lato import Application, TransactionContext


async def add_async(a, b):
await asyncio.sleep(1)
return a + b


if __name__ == "__main__":
with TransactionContext() as ctx:
result = ctx.call(add_async, 1, 2)
print(result)

result = asyncio.run(result)

print("got result from asyncio.run", result)

app = Application("async")
result = app.call(add_async, 1, 2)
print(result)
21 changes: 21 additions & 0 deletions examples/async_example/example2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio

from lato import Application, Command


class MultiplyCommand(Command):
a: int
b: int


app = Application("async")


@app.handler(MultiplyCommand)
async def multiply_async(command: MultiplyCommand):
await asyncio.sleep(1)
return command.a * command.b


coroutine = app.execute(MultiplyCommand(a=10, b=20))
print("execution result", coroutine)
57 changes: 57 additions & 0 deletions examples/async_example/toy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import logging

from lato import Application, TransactionContext

logging.basicConfig(level=logging.DEBUG)
root_logger = logging.getLogger("toy")

app = Application()


class Counter:
def __init__(self):
self.value = 0

def next(self):
self.value += 1
return self.value


counter = Counter()


@app.on_enter_transaction_context
async def on_enter_transaction_context(ctx: TransactionContext):
correlation_id = str(counter.next())
logger = root_logger.getChild(correlation_id)
ctx.set_dependencies(logger=logger)
logger.info("Connecting to database")
await asyncio.sleep(0.001)
logger.info("Connected")


@app.on_exit_transaction_context
async def on_exit_transaction_context(ctx: TransactionContext, exception=None):
logger = ctx["logger"]
logger.info("Disconnecting from database")
await asyncio.sleep(0.001)
logger.info("Disconnected from database")


@app.handler("foo")
async def handle_foo(x, logger):
logger.info(f"Starting foo, x={x}")
await asyncio.sleep(0.001)
logger.info("Finished foo")


async def main() -> None:
await asyncio.gather(
app.call_async("foo", x=1),
app.call_async("foo", x=2),
app.call_async("foo", x=3),
)


asyncio.run(main())
150 changes: 131 additions & 19 deletions lato/application.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import logging
from collections.abc import Callable
from typing import Any, Optional, Union, List
from lato.types import DependencyIdentifier
from collections.abc import Awaitable, Callable
from typing import Any, Optional, Union

from lato.application_module import ApplicationModule
from lato.dependency_provider import BasicDependencyProvider, DependencyProvider
from lato.message import Command, Event, Message
from lato.transaction_context import TransactionContext
from lato.message import Event, Message
from lato.transaction_context import (
ComposerFunction,
MiddlewareFunction,
OnEnterTransactionContextCallback,
OnExitTransactionContextCallback,
TransactionContext,
)
from lato.types import DependencyIdentifier

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,11 +46,15 @@ def __init__(
self.dependency_provider = (
dependency_provider or self.dependency_provider_factory(**kwargs)
)
self._transaction_context_factory = None
self._on_enter_transaction_context = lambda ctx: None
self._on_exit_transaction_context = lambda ctx, exception=None: None
self._transaction_middlewares: List[Callable] = []
self._composers: dict[Union[Message, str], Callable] = {}
self._transaction_context_factory: Optional[Callable] = None
self._on_enter_transaction_context: Optional[
OnEnterTransactionContextCallback
] = None
self._on_exit_transaction_context: Optional[
OnExitTransactionContextCallback
] = None
self._transaction_middlewares: list[MiddlewareFunction] = []
self._composers: dict[Union[Message, str], ComposerFunction] = {}

def get_dependency(self, identifier: DependencyIdentifier) -> Any:
"""Gets a dependency from the dependency provider. Dependencies can be resolved either by name or by type.
Expand All @@ -57,7 +68,7 @@ def get_dependency(self, identifier: DependencyIdentifier) -> Any:
def __getitem__(self, identifier: DependencyIdentifier) -> Any:
return self.get_dependency(identifier)

def call(self, func: Union[Callable, str], *args, **kwargs):
def call(self, func: Union[Callable[..., Any], str], *args, **kwargs) -> Any:
"""Invokes a function with `args` and `kwargs` within the :class:`TransactionContext`.
If `func` is a string, then it is an alias, and the corresponding handler for the alias is retrieved.
Any missing arguments are provided by the dependency provider of a transaction context,
Expand All @@ -81,6 +92,32 @@ def call(self, func: Union[Callable, str], *args, **kwargs):
result = ctx.call(func, *args, **kwargs)
return result

async def call_async(
self, func: Union[Callable[..., Awaitable[Any]], str], *args, **kwargs
) -> Any:
"""Invokes an async function with `args` and `kwargs` within the :class:`TransactionContext`.
If `func` is a string, then it is an alias, and the corresponding handler for the alias is retrieved.
Any missing arguments are provided by the dependency provider of a transaction context,
and args and kwargs parameters.
:param func: The async function to invoke, or an alias.
:param args: Arguments to pass to the function.
:param kwargs: Keyword arguments to pass to the function.
:return: The result of the invoked function.
:raises ValueError: If an alias is provided, but no corresponding handler is found.
"""
if isinstance(func, str):
try:
func = next(self.iterate_handlers_for(alias=func))
except StopIteration:
raise ValueError(f"Handler not found", func)

async with self.transaction_context() as ctx:
result = await ctx.call_async(func, *args, **kwargs)
return result

def execute(self, message: Message) -> Any:
"""Executes a command within the :class:`TransactionContext`.
Use :func:`handler` decorator to register a handler for the command.
Expand All @@ -96,28 +133,88 @@ def execute(self, message: Message) -> Any:
result = ctx.execute(message)
return result

async def execute_async(self, message: Message) -> Any:
"""Asynchronously executes a command within the :class:`TransactionContext`.
Use :func:`handler` decorator to register a handler for the command.
If a command is handled by multiple handlers, then the final result is
composed to a single return value using :func:`compose` decorator.
:param message: The message to be executed (usually, a :class:`Command` or :class:`Query` subclass).
:return: The result of the invoked message handler.
:raises: ValueError: If no handlers are found for the message.
"""
async with self.transaction_context() as ctx:
result = await ctx.execute(message)
return result

def emit(self, event: Event) -> dict[Callable, Any]:
"""Deprecated. Use `publish()` instead."""
return self.publish(event)

def publish(self, event: Event) -> dict[Callable, Any]:
"""
Publish an event by calling all handlers for that event.
:param event: The event to publish, or an alias of an event handler to call.
:return: A dictionary mapping handlers to their results.
"""
with self.transaction_context() as ctx:
result = ctx.emit(event)
result = ctx.publish(event)
return result

async def publish_async(self, event: Event) -> dict[Callable, Any]:
"""
Asynchronously publish an event by calling all handlers for that event.
:param event: The event to publish, or an alias of an event handler to call.
:return: A dictionary mapping handlers to their results.
"""
async with self.transaction_context() as ctx:
result = await ctx.publish_async(event)
return result

def on_enter_transaction_context(self, func):
"""
Decorator for registering a function to be called when entering a transaction context
:param func:
:return:
:param func: callback to be called when entering a transaction context
:return: the decorated function
**Example:**
>>> from lato import Application, TransactionContext
>>> app = Application()
>>> @app.on_enter_transaction_context
... def on_enter_transaction_context(ctx: TransactionContext):
... print('entering transaction context')
... ctx.set_dependencies(foo="foo")
>>> app.call(lambda foo: print(foo))
entering transaction context
foo
"""

self._on_enter_transaction_context = func
return func

def on_exit_transaction_context(self, func):
"""
Decorator for registering a function to be called when exiting a transaction context
:param func:
:return:
:param func: callback to be called when exiting a transaction context
:return: the decorated function
**Example:**
>>> from lato import Application, TransactionContext
>>> app = Application()
>>>
>>> @app.on_exit_transaction_context
... def on_exit_transaction_context(ctx: TransactionContext, exception=None):
... print("exiting context")
>>> app.call(lambda: print("calling"))
calling
exiting context
"""
self._on_exit_transaction_context = func
return func
Expand All @@ -126,8 +223,23 @@ def on_create_transaction_context(self, func):
"""
Decorator for overriding default transaction context creation
:param func:
:return:
:param func: callback to be called when creating a transaction context
:return: the decorated function
**Example:**
>>> from lato import Application, TransactionContext
>>> app = Application()
>>>
>>> class CustomTransactionContext(TransactionContext):
... pass
>>>
>>> @app.on_create_transaction_context
... def create_transaction_context(**kwargs):
... return CustomTransactionContext(**kwargs)
>>>
>>> print(app.transaction_context(foo="bar").__class__.__name__)
CustomTransactionContext
"""
self._transaction_context_factory = func
return func
Expand All @@ -136,7 +248,7 @@ def transaction_middleware(self, middleware_func):
"""
Decorator for registering a middleware function to be called when executing a function in a transaction context
:param middleware_func:
:return:
:return: the decorated function
"""
self._transaction_middlewares.insert(0, middleware_func)
return middleware_func
Expand Down
Loading

0 comments on commit d8095d0

Please sign in to comment.