Skip to content

Commit

Permalink
Be careful about freeing callback trampolines
Browse files Browse the repository at this point in the history
Our approach to handling Source and Slot objects is fairly clever: we
tie the call trampoline and closure to the same object that holds a
reference to the source object on the C side.  When we are about to
`__del__()` that object, we unref the source, preventing any further
events from being dispatched.  In this way, we can be completely sure
that systemd will never call our trampoline after it's been freed.

Unfortunately, this isn't good enough: we have a lot of cases where we
free a Source while it is currently being dispatched.  Until now we've
never noticed a problem, but Cockpit recently added a stress-test for
inotify (`test_fsinfo_watch_identity_changes`) which dispatches thousand
of events and runs long enough that garbage collection gets invoked,
freeing trampolines while they are currently running.  Python does not
hold a reference to the data, and this causes crashes on some
architectures.

Let's give Source and Slot a common base class (Trampoline) that models
their common behaviour.  This helper class also changes the `__del__()`
behaviour: in case some external caller has requested deferral of the
destruction of trampolines, we add them to a list just before we get
deleted, to prevent the FFI wrapper from being destroyed with us.

We know that the problem described above is only a problem if we're
dispatching from systemd's event loop, so setup deferral on entry to the
loop and drop the deferred objects on exit.

Closes #63
  • Loading branch information
allisonkarlitskaya committed Feb 1, 2024
1 parent 3b44156 commit 5788fa2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
13 changes: 4 additions & 9 deletions src/systemd_ctypes/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,7 @@ class Slot(libsystemd.sd_bus_slot):
def __init__(self, callback: Callable[[BusMessage], bool]):
def handler(message: WeakReference, _data: object, _err: object) -> int:
return 1 if callback(BusMessage.ref(message)) else 0
self.callback = libsystemd.sd_bus_message_handler_t(handler)
self.userdata = None

def cancel(self) -> None:
self._unref()
self.value = None
self.trampoline = libsystemd.sd_bus_message_handler_t(handler)


if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -363,7 +358,7 @@ async def call_async(
timeout: Optional[int] = None
) -> BusMessage:
pending = PendingCall()
self._call_async(byref(pending), message, pending.callback, pending.userdata, timeout or 0)
self._call_async(byref(pending), message, pending.trampoline, pending.userdata, timeout or 0)
return await pending.future

async def call_method_async(
Expand All @@ -384,12 +379,12 @@ async def call_method_async(

def add_match(self, rule: str, handler: Callable[[BusMessage], bool]) -> Slot:
slot = Slot(handler)
self._add_match(byref(slot), rule, slot.callback, slot.userdata)
self._add_match(byref(slot), rule, slot.trampoline, slot.userdata)
return slot

def add_object(self, path: str, obj: 'BaseObject') -> Slot:
slot = Slot(obj.message_received)
self._add_object(byref(slot), path, slot.callback, slot.userdata)
self._add_object(byref(slot), path, slot.trampoline, slot.userdata)
obj.registered_on_bus(self, path)
return slot

Expand Down
21 changes: 15 additions & 6 deletions src/systemd_ctypes/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
from typing import Callable, ClassVar, Coroutine, List, Optional, Tuple

from . import inotify, libsystemd
from .librarywrapper import Callback, Reference, UserData, byref
from .librarywrapper import Reference, UserData, byref


class Event(libsystemd.sd_event):
class Source(libsystemd.sd_event_source):
callback: Callback
userdata: UserData = None

def cancel(self) -> None:
self._unref()
self.value = None
Expand All @@ -52,11 +49,11 @@ def callback(source: libsystemd.sd_event_source,
event = _event.contents
handler(inotify.Event(event.mask), event.cookie, event.name)
return 0
self.callback = libsystemd.sd_event_inotify_handler_t(callback)
self.trampoline = libsystemd.sd_event_inotify_handler_t(callback)

def add_inotify(self, path: str, mask: inotify.Event, handler: InotifyHandler) -> InotifySource:
source = Event.InotifySource(handler)
self._add_inotify(byref(source), path, mask, source.callback, source.userdata)
self._add_inotify(byref(source), path, mask, source.trampoline, source.userdata)
return source

def add_inotify_fd(self, fd: int, mask: inotify.Event, handler: InotifyHandler) -> InotifySource:
Expand All @@ -78,6 +75,14 @@ def __init__(self, event: Optional[Event] = None) -> None:
def select(
self, timeout: Optional[float] = None
) -> List[Tuple[selectors.SelectorKey, int]]:
# It's common to drop the last reference to a Source or Slot object on
# a dispatch of that same source/slot from the main loop. If we happen
# to garbage collect before returning, the trampoline could be
# destroyed before we're done using it. Provide a mechanism to defer
# the destruction of trampolines for as long as we might be
# dispatching. This gets cleared again at the bottom, before return.
libsystemd.Trampoline.deferred = []

while self.sd_event.prepare():
self.sd_event.dispatch()
ready = super().select(timeout)
Expand All @@ -87,6 +92,10 @@ def select(
self.sd_event.dispatch()
while self.sd_event.prepare():
self.sd_event.dispatch()

# We can be sure we're not dispatching callbacks anymore
libsystemd.Trampoline.deferred = None

# This could return zero events with infinite timeout, but nobody seems to mind.
return [(key, events) for (key, events) in ready if key != self.key]

Expand Down
25 changes: 22 additions & 3 deletions src/systemd_ctypes/libsystemd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import ctypes
import os
import sys
from typing import List, Optional, Tuple, Union
from typing import ClassVar, List, Optional, Tuple, Union

from .inotify import inotify_event
from .librarywrapper import (
Expand All @@ -33,6 +33,25 @@
from .typing import Annotated


class Trampoline(ReferenceType):
deferred: 'ClassVar[list[Callback] | None]' = None
trampoline: Callback
userdata: UserData = None

def cancel(self) -> None:
self._unref()
self.value = None

def __del__(self) -> None:
# This might be the currently-dispatching callback — make sure we don't
# destroy the trampoline before we return. We drop the deferred list
# from the event loop when we're sure we're not doing any dispatches.
if self.deferred is not None:
self.deferred.append(self.trampoline)
if self.value is not None:
self._unref()


class sd_bus_error(ctypes.Structure):
# This is ABI, so we are safe to assume it doesn't change.
# Unfortunately, we lack anything like sd_bus_error_new().
Expand Down Expand Up @@ -65,7 +84,7 @@ class sd_id128(ctypes.Structure):
)


class sd_event_source(ReferenceType):
class sd_event_source(Trampoline):
...


Expand Down Expand Up @@ -105,7 +124,7 @@ def _default(ret: Reference['sd_event']) -> Union[None, Errno]:
...


class sd_bus_slot(ReferenceType):
class sd_bus_slot(Trampoline):
...


Expand Down

0 comments on commit 5788fa2

Please sign in to comment.