Skip to content

Commit

Permalink
Add workers kill timeout option (#435)
Browse files Browse the repository at this point in the history
* implement a workers graceful timeout

* add parameter to readme

* fix wording / linting

* remove '.'

* ruff

* make graceful timeout an int range and disable it per default, add tests

* add runtime threading mode

* lint

* ruff

* skip graceful restart test on windows

* no restart on windows

* revert yielding the pid and separating restart tests

* improve code comment

* only sleep if necessary

* revert tests

* revert conftest from upstream master

* fix ruff error

* Minor refactor

---------

Co-authored-by: Giovanni Barillari <[email protected]>
  • Loading branch information
hendrikmuhs and gi0baro authored Dec 12, 2024
1 parent 6f3aa5c commit bde1255
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ Options:
The maximum amount of time in seconds a
worker will be kept alive before respawn
[env var: GRANIAN_WORKERS_LIFETIME; x>=60]
--workers-kill-timeout INTEGER RANGE
The amount of time in seconds to wait for
killing workers that refused to gracefully
stop [env var:
GRANIAN_WORKERS_KILL_TIMEOUT; default:
(disabled); 1<=x<=1800]
--factory / --no-factory Treat target as a factory function, that
should be invoked to build the actual target
[env var: GRANIAN_FACTORY; default:
Expand Down
8 changes: 8 additions & 0 deletions granian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ def option(*param_decls: str, cls: Optional[Type[click.Option]] = None, **attrs:
type=click.IntRange(60),
help='The maximum amount of time in seconds a worker will be kept alive before respawn',
)
@option(
'--workers-kill-timeout',
type=click.IntRange(1, 1800),
help='The amount of time in seconds to wait for killing workers that refused to gracefully stop',
show_default='disabled',
)
@option(
'--factory/--no-factory',
default=False,
Expand Down Expand Up @@ -281,6 +287,7 @@ def cli(
respawn_failed_workers: bool,
respawn_interval: float,
workers_lifetime: Optional[int],
workers_kill_timeout: Optional[int],
factory: bool,
reload: bool,
reload_paths: Optional[List[pathlib.Path]],
Expand Down Expand Up @@ -339,6 +346,7 @@ def cli(
respawn_failed_workers=respawn_failed_workers,
respawn_interval=respawn_interval,
workers_lifetime=workers_lifetime,
workers_kill_timeout=workers_kill_timeout,
factory=factory,
reload=reload,
reload_paths=reload_paths,
Expand Down
28 changes: 26 additions & 2 deletions granian/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def terminate(self):
self.interrupt_by_parent = True
self.proc.terminate()

def kill(self):
self.interrupt_by_parent = True
self.proc.kill()

def join(self, timeout=None):
self.proc.join(timeout=timeout)

Expand Down Expand Up @@ -95,6 +99,7 @@ def __init__(
respawn_failed_workers: bool = False,
respawn_interval: float = 3.5,
workers_lifetime: Optional[int] = None,
workers_kill_timeout: Optional[int] = None,
factory: bool = False,
reload: bool = False,
reload_paths: Optional[Sequence[Path]] = None,
Expand Down Expand Up @@ -136,6 +141,7 @@ def __init__(
self.reload_on_changes = reload
self.respawn_interval = respawn_interval
self.workers_lifetime = workers_lifetime
self.workers_kill_timeout = workers_kill_timeout
self.factory = factory
self.reload_paths = reload_paths or [Path.cwd()]
self.reload_ignore_paths = reload_ignore_paths or ()
Expand Down Expand Up @@ -450,13 +456,31 @@ def socket_loader():
time.sleep(delay)
logger.info(f'Stopping old worker-{idx + 1}')
old_proc.terminate()
old_proc.join()
old_proc.join(self.workers_kill_timeout)
if self.workers_kill_timeout:
# the process might still be reported alive after `join`, let's context switch
if old_proc.proc.is_alive():
time.sleep(0.001)
if old_proc.proc.is_alive():
logger.warning(f'Killing old worker-{idx + 1} after it refused to gracefully stop')
old_proc.kill()
old_proc.join()

def _stop_workers(self):
for proc in self.procs:
proc.terminate()

for proc in self.procs:
proc.join()
proc.join(self.workers_kill_timeout)
if self.workers_kill_timeout:
# the process might still be reported after `join`, let's context switch
if proc.proc.is_alive():
time.sleep(0.001)
if proc.proc.is_alive():
logger.warning(f'Killing worker-{proc.idx} after it refused to gracefully stop')
proc.kill()
proc.join()

self.procs.clear()

def _workers_lifetime_watcher(self, ttl):
Expand Down

0 comments on commit bde1255

Please sign in to comment.