Skip to content

Commit

Permalink
✨ add graceful shutdown for service. #14
Browse files Browse the repository at this point in the history
  • Loading branch information
perillaroc committed May 6, 2022
1 parent 20c50ed commit 91756f6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
8 changes: 7 additions & 1 deletion takler/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ class TaklerServer:
def __init__(self, host: str = None, port: int = None):
self.bunch = Bunch() # type: Bunch
self.scheduler = Scheduler(bunch=self.bunch)
self.network_service = TaklerService(scheduler=self.scheduler, host=host, port=port)
self.network_service = TaklerService(
scheduler=self.scheduler, host=host, port=port
)

async def start(self):
logger.info("start server...")
Expand All @@ -34,3 +36,7 @@ async def run(self):
loop.create_task(self.network_service.run())

await self.scheduler.run()

async def stop(self):
await self.network_service.stop()
await self.scheduler.stop()
5 changes: 5 additions & 0 deletions takler/server/network_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ async def run(self):
"""
await self.grpc_server.wait_for_termination()

async def stop(self):
logger.info("service shutting down..")
await self.grpc_server.stop(5)
logger.info("service shutting down..done")

async def RunInitCommand(self, request, context):
node_path = request.child_options.node_path
task_id = request.task_id
Expand Down
15 changes: 14 additions & 1 deletion takler/server/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, bunch: Bunch):
self.bunch = bunch # type: Bunch
self.interval_main_loop = 10.0 # type: float
self.command_queue = Queue() # type: Queue
self.should_stop = False # type: bool

async def start(self):
pass
Expand All @@ -38,12 +39,16 @@ async def run(self):
Start main loop.
"""
await self.main_loop()
await self.shutdown()

async def shutdown(self):
self.should_stop = False

async def main_loop(self):
"""
Main loop of scheduler.
"""
while True:
while not self.should_stop:
logger.info("main loop...")
start_time = time.time()

Expand All @@ -57,6 +62,14 @@ async def main_loop(self):

await asyncio.sleep(duration)

async def stop(self):
logger.info("scheduler shutting down...")
self.should_stop = True

while self.should_stop:
await asyncio.sleep(0.1)
logger.info("scheduler shutting down...done")

def travel_bunch(self):
"""
Travel all flows in bunch to resolve dependencies.
Expand Down

0 comments on commit 91756f6

Please sign in to comment.