This proof-of-concept library builds on top of Python's standard asynchronous iterables, asynchronous generators, and asyncio
coroutines to offer reactive programming abstractions.
The intention is to offer a level of abstraction and compositional power similar to ReactiveX, while being directly compatible and composable with the Python standard library.
This library is not production-ready, so the examples will necessarily be toys for now.
Filtering and mapping:
import asyncio
import math
from reactive import aitertools
import reactive.operators as op
async def demo():
floats = [1.2, float('nan'), 5.4, 6.3, float('inf')]
strings = aitertools.from_iterable(floats) >= op.filter(math.isfinite) | op.map(str)
async for s in strings:
print(s)
asyncio.run(demo())
# Prints:
# 1.2
# 5.4
# 6.3
More example usage can be seen in the library's unit tests.
It's a design goal of this library to stay as close to standard Python concepts as possible, while achieving the desired expressivity. Most of the "key concepts" are therefore pre-existing parts of the language or standard library.
An asynchronous iterable (represented as typing.AsyncIterable
) can be looped over with async for
:
async for s in strings:
print(s)
This is analogous to normal iterables (typing.Iterable
) and for ... in ...
loops.
In the context of the reactive
library, asynchronous iterables are a useful abstraction to represent a stream of values that may arrive at an indefinite point in time. Between each iteration of the async for
loop, execution of the containing coroutine is paused. When the next value is ready, possibly far in the future, the loop resumes.
An asynchronous generator (represented as typing.AsyncGenerator
) is like an asynchronous iterable that can also be written to (just like a normal typing.Generator
).
In the reactive
library, operators are implemented as asynchronous generators, so that they can be used to build reactive pipelines. When data is written to an operator, like map
, it is able to perform any number of transformations or side effects, then output some result.
Asynchronous generators can be tricky to wrap one's head around, but luckily they're mostly only important if you want to implement your own operators. Consuming a stream of data, or using the library's provided operators, shouldn't require intimate knowledge of how generators work.
A stream generator (represented as reactive.StreamGenerator
) is like a specialized asynchronous generator for use in the reactive
library.
It's specialized in two ways:
- Stream generators always yield
Iterable
s of values;StreamGenerator[OutputType, InputType]
is always a subtype ofAsyncGenerator[Iterable[OutputType], InputType]
. This ensures that operators likereactive.operators.filter
can output a different number of values than they receive as input. - The
StreamGenerator
class comes with a couple of overridden binary operators,|
and>=
, that make building pipelines easier. These are just syntactic sugar, so not strictly necessary to use, but they assist readability.
See the examples above for how this comes together.
- RxPY, the canonical ReactiveX implementation for Python, and much more complete than this project. The tradeoff is that the RxPY domain-specific language is completely orthogonal to existing streaming and concurrency abstractions in Python.
- aioreactive appears to be an effort by one of the RxPY maintainers to migrate it onto
asyncio
and "[integrate] more naturally with the Python language." Unfortunately, it seems to be abandoned. - aiostream sounded appealing to me, but is licensed under the GPLv3. (Consequently, I haven't looked at the code at all and can't vouch for it one way or the other.)