Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async extension in Python binding in order to use asyncio m… #91

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_unit_test",
"args": [
// "--gtest_filter=TenErrorTest.cpp_thread"
"--gtest_filter=TenErrorTest.cpp_thread"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}/out/linux/x64/",
Expand Down Expand Up @@ -379,9 +379,10 @@
"program": "${workspaceFolder}/out/linux/x64/ten_manager/bin/tman",
"cwd": "${workspaceFolder}/out/linux/x64/",
"args": [
"--verbose",
"dev-server",
"--base-dir=/home/wei/MyData/Temp/ASTRA.ai/agents",
"--port=49484"
"--base-dir=/home/sunxilin/ten_framework_internal_base/ten_framework/out/linux/x64/tests/ten_runtime/integration/python/two_async_extensions_in_different_groups_python/two_async_extensions_in_different_groups_python_app",
"--port=49483"
],
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct ten_py_ten_env_t {

// Mark whether the gil state need to be released after 'on_deinit_done'.
bool need_to_release_gil_state;
PyThreadState* py_thread_state;
} ten_py_ten_env_t;

TEN_RUNTIME_PRIVATE_API bool ten_py_ten_env_check_integrity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#
from .app import App
from .extension import Extension
from .async_extension import AsyncExtension
from .async_ten_env import AsyncTenEnv
from .addon import Addon
from .decorator import (
register_addon_as_extension,
Expand All @@ -28,7 +30,9 @@
"register_addon_as_extension_group",
"App",
"Extension",
"AsyncExtension",
"TenEnv",
"AsyncTenEnv",
"Cmd",
"StatusCode",
"VideoFrame",
Expand Down
3 changes: 3 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ def on_configure(self, ten_env: TenEnv) -> None:

def on_init(self, ten_env: TenEnv) -> None:
ten_env.on_init_done()

def on_deinit(self, ten_env: TenEnv) -> None:
ten_env.on_deinit_done()
140 changes: 140 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/async_extension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#
# Copyright © 2024 Agora
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0, with certain conditions.
# Refer to the "LICENSE" file in the root directory for more information.
#
import asyncio
import threading
from typing import final
from libten_runtime_python import _Extension
from .video_frame import VideoFrame
from .audio_frame import AudioFrame
from .ten_env import TenEnv
from .cmd import Cmd
from .data import Data
from .async_ten_env import AsyncTenEnv


class AsyncExtension(_Extension):
def __init__(self, name: str) -> None:
self._ten_stop_event = asyncio.Event()

def __del__(self) -> None:
self._ten_stop_event.set()
if hasattr(self, "_ten_thread"):
self._ten_thread.join()

async def _thread_routine(self, ten_env: TenEnv):
self._ten_loop = asyncio.get_running_loop()
self._async_ten_env = AsyncTenEnv(
ten_env, self._ten_loop, self._ten_thread
)

await self.on_configure(self._async_ten_env)

# Suspend the thread until stopEvent is set.
await self._ten_stop_event.wait()

await self.on_deinit(self._async_ten_env)

async def _stop_thread(self):
self._ten_stop_event.set()

@final
def _proxy_on_configure(self, ten_env: TenEnv) -> None:
# We pass the TenEnv object to another Python thread without worrying
# about the thread safety issue of the TenEnv API, because the actual
# execution logic of all TenEnv APIs occurs in the extension thread.
# We only need to ensure that the TenEnv object should remain valid
# while it is being used. The way to achieve this is to ensure that the
# Python thread remains alive until TenEnv.on_deinit_done is called.
self._ten_thread = threading.Thread(
target=asyncio.run, args=(self._thread_routine(ten_env),)
)
self._ten_thread.start()

@final
def _proxy_on_init(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(
self.on_init(self._async_ten_env), self._ten_loop
)

@final
def _proxy_on_start(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(
self.on_start(self._async_ten_env), self._ten_loop
)

@final
def _proxy_on_stop(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(
self.on_stop(self._async_ten_env), self._ten_loop
)

@final
def _proxy_on_deinit(self, ten_env: TenEnv) -> None:
asyncio.run_coroutine_threadsafe(self._stop_thread(), self._ten_loop)

@final
def _proxy_on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None:
asyncio.run_coroutine_threadsafe(
self.on_cmd(self._async_ten_env, cmd), self._ten_loop
)

@final
def _proxy_on_data(self, ten_env: TenEnv, data: Data) -> None:
asyncio.run_coroutine_threadsafe(
self.on_data(self._async_ten_env, data), self._ten_loop
)

@final
def _proxy_on_video_frame(
self, ten_env: TenEnv, video_frame: VideoFrame
) -> None:
asyncio.run_coroutine_threadsafe(
self.on_video_frame(self._async_ten_env, video_frame),
self._ten_loop,
)

@final
def _proxy_on_audio_frame(
self, ten_env: TenEnv, audio_frame: AudioFrame
) -> None:
asyncio.run_coroutine_threadsafe(
self.on_audio_frame(self._async_ten_env, audio_frame),
self._ten_loop,
)

# Override these methods in your extension

async def on_configure(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_configure_done()

async def on_init(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_init_done()

async def on_start(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_start_done()

async def on_stop(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_stop_done()

async def on_deinit(self, async_ten_env: AsyncTenEnv) -> None:
async_ten_env.on_deinit_done()

async def on_cmd(self, async_ten_env: AsyncTenEnv, cmd: Cmd) -> None:
pass

async def on_data(self, async_ten_env: AsyncTenEnv, data: Data) -> None:
pass

async def on_video_frame(
self, async_ten_env: AsyncTenEnv, video_frame: VideoFrame
) -> None:
pass

async def on_audio_frame(
self, async_ten_env: AsyncTenEnv, audio_frame: AudioFrame
) -> None:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright © 2024 Agora
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0, with certain conditions.
# Refer to the "LICENSE" file in the root directory for more information.
#
from asyncio import AbstractEventLoop
import asyncio
import threading
from .cmd import Cmd
from .cmd_result import CmdResult
from .ten_env import TenEnv


class AsyncTenEnv(TenEnv):

def __init__(
self, ten_env: TenEnv, loop: AbstractEventLoop, thread: threading.Thread
) -> None:
self._internal = ten_env._internal
self._ten_loop = loop
self._ten_thread = thread
ten_env._set_release_handler(lambda: self._on_release())

def __del__(self) -> None:
pass

async def send_cmd(self, cmd: Cmd) -> CmdResult:
q = asyncio.Queue(1)
self._internal.send_cmd(
cmd,
lambda ten_env, result: asyncio.run_coroutine_threadsafe(
q.put(result), self._ten_loop
), # type: ignore
)
return await q.get()

async def send_json(self, json_str: str) -> CmdResult:
q = asyncio.Queue(1)
self._internal.send_json(
json_str,
lambda ten_env, result: asyncio.run_coroutine_threadsafe(
q.put(result), self._ten_loop
), # type: ignore
)
return await q.get()

def _deinit_routine(self) -> None:
# Wait for the internal thread to finish.
self._ten_thread.join()

self._internal.on_deinit_done()

def _on_release(self) -> None:
if hasattr(self, "_deinit_thread"):
self._deinit_thread.join()

def on_deinit_done(self) -> None:
# Start the deinit thread to avoid blocking the extension thread.
self._deinit_thread = threading.Thread(target=self._deinit_routine)
self._deinit_thread.start()
36 changes: 36 additions & 0 deletions core/src/ten_runtime/binding/python/interface/ten/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,62 @@ def _proxy_on_configure(self, ten_env: TenEnv) -> None:
def on_configure(self, ten_env: TenEnv) -> None:
ten_env.on_configure_done()

@final
def _proxy_on_init(self, ten_env: TenEnv) -> None:
self.on_init(ten_env)

def on_init(self, ten_env: TenEnv) -> None:
ten_env.on_init_done()

@final
def _proxy_on_start(self, ten_env: TenEnv) -> None:
self.on_start(ten_env)

def on_start(self, ten_env: TenEnv) -> None:
ten_env.on_start_done()

@final
def _proxy_on_stop(self, ten_env: TenEnv) -> None:
self.on_stop(ten_env)

def on_stop(self, ten_env: TenEnv) -> None:
ten_env.on_stop_done()

@final
def _proxy_on_deinit(self, ten_env: TenEnv) -> None:
self.on_deinit(ten_env)

def on_deinit(self, ten_env: TenEnv) -> None:
ten_env.on_deinit_done()

@final
def _proxy_on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None:
self.on_cmd(ten_env, cmd)

def on_cmd(self, ten_env: TenEnv, cmd: Cmd) -> None:
pass

@final
def _proxy_on_data(self, ten_env: TenEnv, data: Data) -> None:
self.on_data(ten_env, data)

def on_data(self, ten_env: TenEnv, data: Data) -> None:
pass

@final
def _proxy_on_video_frame(
self, ten_env: TenEnv, video_frame: VideoFrame
) -> None:
self.on_video_frame(ten_env, video_frame)

def on_video_frame(self, ten_env: TenEnv, video_frame: VideoFrame) -> None:
pass

@final
def _proxy_on_audio_frame(
self, ten_env: TenEnv, audio_frame: AudioFrame
) -> None:
self.on_audio_frame(ten_env, audio_frame)

def on_audio_frame(self, ten_env: TenEnv, audio_frame: AudioFrame) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class _App:
self,
ten_env: _TenEnv,
) -> None: ...
def on_deinit(
self,
ten_env: _TenEnv,
) -> None: ...

class _Extension:
def __init__(self, name: str): ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ def __init__(self, internal_obj: _TenEnv) -> None:
def __del__(self) -> None:
pass

def _set_release_handler(self, handler: Callable[[], None]) -> None:
self._release_handler = handler

def _on_release(self) -> None:
if hasattr(self, "_release_handler"):
self._release_handler()

def on_configure_done(self) -> None:
from .addon import Addon

Expand Down
2 changes: 1 addition & 1 deletion core/src/ten_runtime/binding/python/native/app/app.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ static void proxy_on_configure(ten_app_t *app, ten_env_t *ten_env) {
// achieving numerical consistency between PyGILState_Ensure and
// PyGILState_Release, and only then will the Python thread state be
// released.
ten_py_eval_save_thread();
py_ten_env->py_thread_state = ten_py_eval_save_thread();
} else {
// No need to release the GIL.
}
Expand Down
Loading
Loading