Skip to content

Commit

Permalink
Bugfix/77 wait for race (#82)
Browse files Browse the repository at this point in the history
* Optimised raw event dispatching to uncover bug.

Looks like, at least on my machine, asyncio immediately invokes anything you await
rather than switching to another task on the queue first unless the call does raw
IO. I have confirmed this with Epoll, Poll and Select selector implementations on
a non-debug asyncio SelectorEventLoop implementation.

This means that the bulk of dispatching an event would currently occur as soon as
the event is dispatched rather than after another task runs, which could lead to
immediate slowdown if other tasks are queued.

Switching to sync dispatching and using create task to invoke the callback
management "later" seems to speed up this implementation significantly and allows
other race conditions we have not accounted for properly as part of #77 to be
detectable with test scripts that saturate the event loop.

* Updated CLi script to show OS type as well.

* Added code to allow debugging of asyncio loop blocking incidents.

* Fixes #77 dispatcher wait_for race condition.

* Removed async predicates for wait_for, removing last parts of race condition hopefully.

* Fixes #77 dispatcher wait_for race condition.
  • Loading branch information
Nekokatt authored Aug 24, 2020
1 parent 5fb11fd commit ad7f46f
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 156 deletions.
18 changes: 9 additions & 9 deletions hikari/api/event_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

EventT_co = typing.TypeVar("EventT_co", bound=base_events.Event, covariant=True)
EventT_inv = typing.TypeVar("EventT_inv", bound=base_events.Event)
PredicateT = typing.Callable[[EventT_co], typing.Union[bool, typing.Coroutine[typing.Any, typing.Any, bool]]]
AsyncCallbackT = typing.Callable[[EventT_inv], typing.Coroutine[typing.Any, typing.Any, None]]
PredicateT = typing.Callable[[EventT_co], bool]
CallbackT = typing.Callable[[EventT_inv], typing.Coroutine[typing.Any, typing.Any, None]]


class EventDispatcher(abc.ABC):
Expand Down Expand Up @@ -134,9 +134,7 @@ async def on_everyone_mentioned(event):
# For the sake of UX, I will check this at runtime instead and let the
# user use a static type checker.
@abc.abstractmethod
def subscribe(
self, event_type: typing.Type[typing.Any], callback: AsyncCallbackT[typing.Any]
) -> AsyncCallbackT[typing.Any]:
def subscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> CallbackT[typing.Any]:
"""Subscribe a given callback to a given event type.
Parameters
Expand Down Expand Up @@ -180,7 +178,7 @@ async def on_message(event):
# For the sake of UX, I will check this at runtime instead and let the
# user use a static type checker.
@abc.abstractmethod
def unsubscribe(self, event_type: typing.Type[typing.Any], callback: AsyncCallbackT[typing.Any]) -> None:
def unsubscribe(self, event_type: typing.Type[typing.Any], callback: CallbackT[typing.Any]) -> None:
"""Unsubscribe a given callback from a given event type, if present.
Parameters
Expand Down Expand Up @@ -210,7 +208,7 @@ async def on_message(event):
@abc.abstractmethod
def get_listeners(
self, event_type: typing.Type[EventT_co], *, polymorphic: bool = True,
) -> typing.Collection[AsyncCallbackT[EventT_co]]:
) -> typing.Collection[CallbackT[EventT_co]]:
"""Get the listeners for a given event type, if there are any.
Parameters
Expand Down Expand Up @@ -240,7 +238,7 @@ def get_listeners(
@abc.abstractmethod
def listen(
self, event_type: typing.Optional[typing.Type[EventT_co]] = None,
) -> typing.Callable[[AsyncCallbackT[EventT_co]], AsyncCallbackT[EventT_co]]:
) -> typing.Callable[[CallbackT[EventT_co]], CallbackT[EventT_co]]:
"""Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
Expand Down Expand Up @@ -285,11 +283,13 @@ async def wait_for(
The event type to listen for. This will listen for subclasses of
this type additionally.
predicate
A function or coroutine taking the event as the single parameter.
A function taking the event as the single parameter.
This should return `builtins.True` if the event is one you want to
return, or `builtins.False` if the event should not be returned.
If left as `None` (the default), then the first matching event type
that the bot receives (or any subtype) will be the one returned.
ASYNC PREDICATES ARE NOT SUPPORTED.
timeout : typing.Optional[builtins.float or builtins.int]
The amount of time to wait before raising an `asyncio.TimeoutError`
and giving up instead. This is measured in seconds. If
Expand Down
1 change: 1 addition & 0 deletions hikari/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ def main() -> None:
sys.stderr.write(f"hikari v{version} {sha1}\n")
sys.stderr.write(f"located at {path}\n")
sys.stderr.write(f"{py_impl} {py_ver} {py_compiler}\n")
sys.stderr.write(" ".join(frag.strip() for frag in platform.uname() if frag and frag.strip()) + "\n")
37 changes: 22 additions & 15 deletions hikari/events/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,6 @@ class ExceptionEvent(Event, typing.Generic[FailedEventT]):
side-effects on the application runtime.
"""

app: traits.RESTAware = attr.ib(metadata={attr_extensions.SKIP_DEEP_COPY: True})
# <<inherited docstring from Event>>.

shard: typing.Optional[gateway_shard.GatewayShard] = attr.ib(metadata={attr_extensions.SKIP_DEEP_COPY: True})
"""Shard that received the event.
Returns
-------
hikari.api.shard.GatewayShard
Shard that raised this exception.
This may be `builtins.None` if no specific shard was the cause of this
exception (e.g. when starting up or shutting down).
"""

exception: Exception = attr.ib()
"""Exception that was raised.
Expand All @@ -201,6 +186,28 @@ class ExceptionEvent(Event, typing.Generic[FailedEventT]):
# for us to remove this effect. This functionally changes nothing but it helps MyPy.
_failed_callback: FailedCallbackT[FailedEventT] = attr.ib()

@property
def app(self) -> traits.RESTAware:
# <<inherited docstring from Event>>.
return self.failed_event.app

@property
def shard(self) -> typing.Optional[gateway_shard.GatewayShard]:
"""Shard that received the event, if there was one associated.
Returns
-------
typing.Optional[hikari.api.shard.GatewayShard]
Shard that raised this exception.
This may be `builtins.None` if no specific shard was the cause of this
exception (e.g. when starting up or shutting down).
"""
shard = getattr(self.failed_event, "shard", None)
if isinstance(shard, gateway_shard.GatewayShard):
return shard
return None

@property
def failed_callback(self) -> FailedCallbackT[FailedEventT]:
"""Event callback that threw an exception.
Expand Down
60 changes: 47 additions & 13 deletions hikari/impl/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from hikari.impl import stateless_event_manager
from hikari.impl import stateless_guild_chunker as stateless_guild_chunker_impl
from hikari.impl import voice
from hikari.utilities import aio
from hikari.utilities import art
from hikari.utilities import constants
from hikari.utilities import date
Expand Down Expand Up @@ -574,30 +575,30 @@ async def start(self) -> None:
def listen(
self, event_type: typing.Optional[typing.Type[event_dispatcher.EventT_co]] = None,
) -> typing.Callable[
[event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co]],
event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co],
[event_dispatcher.CallbackT[event_dispatcher.EventT_co]],
event_dispatcher.CallbackT[event_dispatcher.EventT_co],
]:
# <<inherited docstring from event_dispatcher.EventDispatcher>>
return self.dispatcher.listen(event_type)

def get_listeners(
self, event_type: typing.Type[event_dispatcher.EventT_co], *, polymorphic: bool = True,
) -> typing.Collection[event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co]]:
) -> typing.Collection[event_dispatcher.CallbackT[event_dispatcher.EventT_co]]:
# <<inherited docstring from event_dispatcher.EventDispatcher>>
return self.dispatcher.get_listeners(event_type, polymorphic=polymorphic)

def subscribe(
self,
event_type: typing.Type[event_dispatcher.EventT_co],
callback: event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co],
) -> event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co]:
callback: event_dispatcher.CallbackT[event_dispatcher.EventT_co],
) -> event_dispatcher.CallbackT[event_dispatcher.EventT_co]:
# <<inherited docstring from event_dispatcher.EventDispatcher>>
return self.dispatcher.subscribe(event_type, callback)

def unsubscribe(
self,
event_type: typing.Type[event_dispatcher.EventT_co],
callback: event_dispatcher.AsyncCallbackT[event_dispatcher.EventT_co],
callback: event_dispatcher.CallbackT[event_dispatcher.EventT_co],
) -> None:
# <<inherited docstring from event_dispatcher.EventDispatcher>>
return self.dispatcher.unsubscribe(event_type, callback)
Expand Down Expand Up @@ -642,7 +643,12 @@ async def close(self) -> None:
await self._connector_factory.close()
self._global_ratelimit.close()

def run(self) -> None:
def run(
self,
*,
loop: typing.Optional[asyncio.AbstractEventLoop] = None,
slow_callback_duration: typing.Optional[float] = None,
) -> None:
"""Run this application on the current thread in an event loop.
This will use the event loop that is set for the current thread, or
Expand All @@ -658,19 +664,47 @@ def run(self) -> None:
The application is always guaranteed to be shut down before this
function completes or propagates any exception.
Parameters
----------
loop : typing.Optional[asyncio.AbstractEventLoop]
Event loop to run on. This defaults to `builtins.None`.
If `builtins.None`, the event loop set for the current thread will
be used. If the thread does not have an event loop, then one will
be created first and registered to the running thread.
It is advisable to only have one event loop per thread. Generally
you should not have a need to specify this.
slow_callback_duration : typing.Optional[builtins.float]
How long a coroutine should block for in seconds before it shows a
warning.
This defaults to being `builtins.None`, which will disable the
feature (since it may cause a small increase in execution latency).
If specified as a number, it will be enabled.
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
_LOGGER.debug("no event loop registered on this thread; now creating one...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop is None:
try:
loop = asyncio.get_event_loop()
_LOGGER.debug("using default thread's event loop")
except RuntimeError:
_LOGGER.debug("no event loop registered on this thread; now creating one...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# We always expect this to be populated by now.
loop: asyncio.AbstractEventLoop

if slow_callback_duration and slow_callback_duration > 0:
aio.patch_slow_callback_detection(slow_callback_duration)

try:
self._map_signal_handlers(
loop.add_signal_handler,
lambda *_: loop.create_task(self.close(), name="signal interrupt shutting down application"),
)
_LOGGER.debug("using default thread's event loop", loop)
loop.run_until_complete(self._shard_management_lifecycle())

except KeyboardInterrupt as ex:
Expand Down
Loading

0 comments on commit ad7f46f

Please sign in to comment.