From e73dcfb62d33aaf00b5f916c571e9340f05a2dc1 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Fri, 10 Jan 2025 11:51:36 +0800 Subject: [PATCH] feat: add get base_dir support in tman designer (#540) --- .../src/api/endpoints/fileSystem.ts | 10 + .../src/api/services/fileSystem.ts | 12 +- .../designer_frontend/src/types/fileSystem.ts | 4 + .../ten_manager/src/designer/base_dir/mod.rs | 82 ++++ core/src/ten_manager/src/designer/mod.rs | 9 +- .../ten_manager/src/designer/run_app/mod.rs | 373 ++++++++++++++++++ .../src/ten_rust/src/pkg_info/manifest/mod.rs | 4 + 7 files changed, 489 insertions(+), 5 deletions(-) create mode 100644 core/src/ten_manager/src/designer/run_app/mod.rs diff --git a/core/src/ten_manager/designer_frontend/src/api/endpoints/fileSystem.ts b/core/src/ten_manager/designer_frontend/src/api/endpoints/fileSystem.ts index f424e48600..638d28d5d3 100644 --- a/core/src/ten_manager/designer_frontend/src/api/endpoints/fileSystem.ts +++ b/core/src/ten_manager/designer_frontend/src/api/endpoints/fileSystem.ts @@ -12,6 +12,7 @@ import type { IFileContentResponse, ISetBaseDirResponse, IBaseDirResponse, + IGetBaseDirResponse, } from "@/types/fileSystem"; export const ENDPOINT_FILE_SYSTEM = { @@ -49,6 +50,15 @@ export const ENDPOINT_FILE_SYSTEM = { }) ), }, + [ENDPOINT_METHOD.GET]: { + url: `${API_DESIGNER_V1}/base-dir`, + method: ENDPOINT_METHOD.GET, + responseSchema: genResSchema( + z.object({ + base_dir: z.string().nullable(), + }) + ), + }, }, dirList: { [ENDPOINT_METHOD.GET]: { diff --git a/core/src/ten_manager/designer_frontend/src/api/services/fileSystem.ts b/core/src/ten_manager/designer_frontend/src/api/services/fileSystem.ts index 6ed7b241bc..4b5097c47d 100644 --- a/core/src/ten_manager/designer_frontend/src/api/services/fileSystem.ts +++ b/core/src/ten_manager/designer_frontend/src/api/services/fileSystem.ts @@ -13,6 +13,7 @@ import { } from "@/api/services/utils"; import { ENDPOINT_FILE_SYSTEM } from "@/api/endpoints"; import { ENDPOINT_METHOD } from "@/api/endpoints/constant"; +import { IGetBaseDirResponse, ISetBaseDirResponse } from "@/types/fileSystem"; // request functions ------------------------------- @@ -44,7 +45,9 @@ export const putFileContent = async ( return res; }; -export const putBaseDir = async (baseDir: string) => { +export const putBaseDir = async ( + baseDir: string +): Promise => { const template = ENDPOINT_FILE_SYSTEM.baseDir[ENDPOINT_METHOD.PUT]; const req = makeAPIRequest(template, { body: { base_dir: baseDir }, @@ -53,6 +56,13 @@ export const putBaseDir = async (baseDir: string) => { return template.responseSchema.parse(res).data; }; +export const getBaseDir = async (): Promise => { + const template = ENDPOINT_FILE_SYSTEM.baseDir[ENDPOINT_METHOD.GET]; + const req = makeAPIRequest(template, {}); + const res = await req; + return template.responseSchema.parse(res).data; +}; + export const getDirList = async (path: string) => { const encodedPath = encodeURIComponent(path); const template = ENDPOINT_FILE_SYSTEM.dirList[ENDPOINT_METHOD.GET]; diff --git a/core/src/ten_manager/designer_frontend/src/types/fileSystem.ts b/core/src/ten_manager/designer_frontend/src/types/fileSystem.ts index 4873046275..82d22ce752 100644 --- a/core/src/ten_manager/designer_frontend/src/types/fileSystem.ts +++ b/core/src/ten_manager/designer_frontend/src/types/fileSystem.ts @@ -21,3 +21,7 @@ export type TBaseDirEntry = { export interface IBaseDirResponse { entries: TBaseDirEntry[]; } + +export interface IGetBaseDirResponse { + base_dir: string | null; +} diff --git a/core/src/ten_manager/src/designer/base_dir/mod.rs b/core/src/ten_manager/src/designer/base_dir/mod.rs index fb5c998ab7..8f97dc7acf 100644 --- a/core/src/ten_manager/src/designer/base_dir/mod.rs +++ b/core/src/ten_manager/src/designer/base_dir/mod.rs @@ -28,6 +28,11 @@ pub struct SetBaseDirResponse { pub success: bool, } +#[derive(Serialize, Deserialize, Debug, PartialEq)] +pub struct GetBaseDirResponse { + pub base_dir: Option, +} + pub async fn set_base_dir( req: web::Json, state: web::Data>>, @@ -60,6 +65,20 @@ pub async fn set_base_dir( } } +pub async fn get_base_dir( + state: web::Data>>, +) -> impl Responder { + let state = state.read().unwrap(); + let response = ApiResponse { + status: Status::Ok, + data: GetBaseDirResponse { + base_dir: state.base_dir.clone(), + }, + meta: None, + }; + HttpResponse::Ok().json(response) +} + #[cfg(test)] mod tests { use actix_web::{test, App}; @@ -101,4 +120,67 @@ mod tests { assert!(resp.is_err()); } + + #[actix_web::test] + async fn test_get_base_dir_some() { + let designer_state = DesignerState { + base_dir: Some("/initial/path".to_string()), + all_pkgs: Some(vec![]), + tman_config: TmanConfig::default(), + }; + let designer_state = Arc::new(RwLock::new(designer_state)); + + let app = test::init_service( + App::new() + .app_data(web::Data::new(designer_state.clone())) + .route( + "/api/designer/v1/base-dir", + web::get().to(get_base_dir), + ), + ) + .await; + + let req = test::TestRequest::get() + .uri("/api/designer/v1/base-dir") + .to_request(); + let resp: ApiResponse = + test::call_and_read_body_json(&app, req).await; + + assert_eq!(resp.status, Status::Ok); + assert_eq!( + resp.data, + GetBaseDirResponse { + base_dir: Some("/initial/path".to_string()) + } + ); + } + + #[actix_web::test] + async fn test_get_base_dir_none() { + let designer_state = DesignerState { + base_dir: None, + all_pkgs: Some(vec![]), + tman_config: TmanConfig::default(), + }; + let designer_state = Arc::new(RwLock::new(designer_state)); + + let app = test::init_service( + App::new() + .app_data(web::Data::new(designer_state.clone())) + .route( + "/api/designer/v1/base-dir", + web::get().to(get_base_dir), + ), + ) + .await; + + let req = test::TestRequest::get() + .uri("/api/designer/v1/base-dir") + .to_request(); + let resp: ApiResponse = + test::call_and_read_body_json(&app, req).await; + + assert_eq!(resp.status, Status::Ok); + assert_eq!(resp.data, GetBaseDirResponse { base_dir: None }); + } } diff --git a/core/src/ten_manager/src/designer/mod.rs b/core/src/ten_manager/src/designer/mod.rs index a3ac63f795..45d5e35751 100644 --- a/core/src/ten_manager/src/designer/mod.rs +++ b/core/src/ten_manager/src/designer/mod.rs @@ -18,6 +18,7 @@ mod mock; mod packages; mod property; pub mod response; +mod run_app; mod terminal; mod version; @@ -28,8 +29,6 @@ use actix_web::web; use ten_rust::pkg_info::PkgInfo; use super::config::TmanConfig; -use terminal::ws_terminal; -use version::get_version; pub struct DesignerState { pub base_dir: Option, @@ -44,7 +43,7 @@ pub fn configure_routes( cfg.service( web::scope("/api/designer/v1") .app_data(state.clone()) - .route("/version", web::get().to(get_version)) + .route("/version", web::get().to(version::get_version)) .route( "/addons/extensions", web::get().to(addons::extensions::get_extension_addons), @@ -93,7 +92,9 @@ pub fn configure_routes( web::put().to(file_content::save_file_content), ) .route("/base-dir", web::put().to(base_dir::set_base_dir)) + .route("/base-dir", web::get().to(base_dir::get_base_dir)) .route("/dir-list/{path}", web::get().to(dir_list::list_dir)) - .route("/ws/terminal", web::get().to(ws_terminal)), + .route("/ws/run-app", web::get().to(run_app::run_app)) + .route("/ws/terminal", web::get().to(terminal::ws_terminal)), ); } diff --git a/core/src/ten_manager/src/designer/run_app/mod.rs b/core/src/ten_manager/src/designer/run_app/mod.rs new file mode 100644 index 0000000000..d8ba89a6c4 --- /dev/null +++ b/core/src/ten_manager/src/designer/run_app/mod.rs @@ -0,0 +1,373 @@ +// +// Copyright © 2025 Agora +// This file is part of TEN Framework, an open source project. +// Licensed under the Apache License, Version 2.0, with certain conditions. +// Refer to the "LICENSE" file in the root directory for more information. +// +use std::{ + collections::VecDeque, + process::{Child, Command, Stdio}, + sync::{Arc, Mutex, RwLock}, + thread, +}; + +use actix::{Actor, AsyncContext, Handler, Message, StreamHandler}; +use actix_web::{web, Error, HttpRequest, HttpResponse}; +use actix_web_actors::ws; +use serde::Deserialize; +use serde_json::Value; + +use ten_rust::pkg_info::{pkg_type::PkgType, PkgInfo}; + +use crate::designer::response::{ErrorResponse, Status}; +use crate::designer::{get_all_pkgs::get_all_pkgs, DesignerState}; + +#[derive(Deserialize)] +pub struct RunAppParams { + pub base_dir: String, + pub name: String, +} + +// For partial or line-based read from child. +#[derive(Message)] +#[rtype(result = "()")] +pub enum RunAppOutput { + StdOut(String), + StdErr(String), + Exit(i32), +} + +pub struct WsRunApp { + state: Arc>, + child: Option, + buffer_stdout: Arc>>, + buffer_stderr: Arc>>, +} + +impl WsRunApp { + pub fn new(state: Arc>) -> Self { + Self { + state, + child: None, + buffer_stdout: Arc::new(Mutex::new(VecDeque::new())), + buffer_stderr: Arc::new(Mutex::new(VecDeque::new())), + } + } +} + +impl Actor for WsRunApp { + type Context = ws::WebsocketContext; + + fn started(&mut self, ctx: &mut Self::Context) { + // We don't yet spawn the child command. We'll wait for the first + // message from client that includes `base_dir` and `name`. + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + // If the process is still running, try to kill it. + if let Some(mut child) = self.child.take() { + let _ = child.kill(); + } + } +} + +impl Handler for WsRunApp { + type Result = (); + + fn handle( + &mut self, + msg: RunAppOutput, + ctx: &mut Self::Context, + ) -> Self::Result { + match msg { + RunAppOutput::StdOut(line) => { + // We can push line into buffer. + { + let mut buf = self.buffer_stdout.lock().unwrap(); + buf.push_back(line.clone()); + // For example, keep only last 200 lines. + while buf.len() > 200 { + buf.pop_front(); + } + } + // Then send it to client. + let json_msg = serde_json::json!({ + "type": "stdout", + "data": line + }); + ctx.text(json_msg.to_string()); + } + RunAppOutput::StdErr(line) => { + { + let mut buf = self.buffer_stderr.lock().unwrap(); + buf.push_back(line.clone()); + // Keep only last 200 lines. + while buf.len() > 200 { + buf.pop_front(); + } + } + let json_msg = serde_json::json!({ + "type": "stderr", + "data": line + }); + ctx.text(json_msg.to_string()); + } + RunAppOutput::Exit(code) => { + let json_msg = serde_json::json!({ + "type": "exit", + "code": code + }); + ctx.text(json_msg.to_string()); + // close the WebSocket. + ctx.close(None); + } + } + } +} + +impl StreamHandler> for WsRunApp { + fn handle( + &mut self, + item: Result, + ctx: &mut Self::Context, + ) { + match item { + Ok(ws::Message::Text(text)) => { + // Attempt to parse the JSON text from client. + if let Ok(json) = serde_json::from_str::(&text) { + // If this is the initial 'run' command we expect. + if let Some(cmd_type) = + json.get("type").and_then(|v| v.as_str()) + { + if cmd_type == "run" { + // parse base_dir and name. + if let (Some(base_dir), Some(name)) = ( + json.get("base_dir").and_then(|v| v.as_str()), + json.get("name").and_then(|v| v.as_str()), + ) { + // 1) Check if base_dir matches the _state. + let mut guard = self.state.write().unwrap(); + if guard.base_dir.as_deref() != Some(base_dir) { + let err_msg = ErrorResponse { + status: Status::Fail, + message: format!("Base directory [{}] is not opened in DesignerState", base_dir), + error: None + }; + ctx.text( + serde_json::to_string(&err_msg) + .unwrap(), + ); + ctx.close(None); + return; + } + + // 2) get all packages. + if let Err(err) = get_all_pkgs(&mut guard) { + let error_response = + ErrorResponse::from_error( + &err, + "Error fetching packages:", + ); + ctx.text( + serde_json::to_string(&error_response) + .unwrap(), + ); + ctx.close(None); + return; + } + + // 3) find package of type == app. + let app_pkgs: Vec<&PkgInfo> = guard + .all_pkgs + .as_ref() + .map(|v| { + v.iter() + .filter(|p| { + p.basic_info + .type_and_name + .pkg_type + == PkgType::App + }) + .collect() + }) + .unwrap_or_default(); + + if app_pkgs.len() != 1 { + let err_msg = ErrorResponse { + status: Status::Fail, + message: "There should be exactly one app package, found 0 or more".to_string(), + error: None + }; + ctx.text( + serde_json::to_string(&err_msg) + .unwrap(), + ); + ctx.close(None); + return; + } + + let app_pkg = app_pkgs[0]; + + // 4) read the scripts from the manifest. + let scripts = match &app_pkg.manifest { + Some(m) => { + m.scripts.clone().unwrap_or_default() + } + None => { + let err_msg = ErrorResponse { + status: Status::Fail, + message: "No manifest found in the app package".to_string(), + error: None + }; + ctx.text( + serde_json::to_string(&err_msg) + .unwrap(), + ); + ctx.close(None); + return; + } + }; + + // 5) find script that matches 'name'. + let script_cmd = match scripts.get(name) { + Some(cmd) => cmd.clone(), + None => { + let err_msg = ErrorResponse { + status: Status::Fail, + message: format!("Script '{}' not found in app manifest", name), + error: None + }; + ctx.text( + serde_json::to_string(&err_msg) + .unwrap(), + ); + ctx.close(None); + return; + } + }; + + // 6) run the command line, capture + // stdout/stderr. + let child = match Command::new("sh") + .arg("-c") + .arg(&script_cmd) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Ok(c) => c, + Err(e) => { + let err_msg = ErrorResponse { + status: Status::Fail, + message: format!( + "Failed to spawn command: {}", + e + ), + error: None, + }; + ctx.text( + serde_json::to_string(&err_msg) + .unwrap(), + ); + ctx.close(None); + return; + } + }; + + self.child = Some(child); + + // spawn threads to read stdout & stderr. + let stdout_child = + self.child.as_mut().unwrap().stdout.take(); + let stderr_child = + self.child.as_mut().unwrap().stderr.take(); + + let addr = ctx.address(); + + // read stdout. + if let Some(mut out) = stdout_child { + let addr_stdout = addr.clone(); + + thread::spawn(move || { + use std::io::{BufRead, BufReader}; + let reader = BufReader::new(&mut out); + for line_res in reader.lines() { + match line_res { + Ok(line) => { + addr_stdout.do_send( + RunAppOutput::StdOut( + line, + ), + ); + } + Err(_) => break, + } + } + // after reading is finished. + }); + } + + // read stderr. + if let Some(mut err) = stderr_child { + let addr_stderr = addr.clone(); + + thread::spawn(move || { + use std::io::{BufRead, BufReader}; + let reader = BufReader::new(&mut err); + + for line_res in reader.lines() { + match line_res { + Ok(line) => { + addr_stderr.do_send( + RunAppOutput::StdErr( + line, + ), + ); + } + Err(_) => break, + } + } + // after reading is finished. + }); + } + + // wait for child exit in another thread. + let addr2 = ctx.address(); + if let Some(mut child) = self.child.take() { + thread::spawn(move || { + if let Ok(status) = child.wait() { + addr2.do_send(RunAppOutput::Exit( + status.code().unwrap_or(-1), + )); + } else { + addr2.do_send(RunAppOutput::Exit( + -1, + )); + } + }); + } + } + } else if cmd_type == "ping" { + // Simple keep-alive handshake, for example. + ctx.text(r#"{"type":"pong"}"#); + } + } + } + } + Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), + Ok(ws::Message::Close(_)) => { + ctx.close(None); + } + // ignore other message types. + _ => {} + } + } +} + +pub async fn run_app( + req: HttpRequest, + stream: web::Payload, + state: web::Data>>, +) -> Result { + ws::start(WsRunApp::new(state.get_ref().clone()), &req, stream) +} diff --git a/core/src/ten_rust/src/pkg_info/manifest/mod.rs b/core/src/ten_rust/src/pkg_info/manifest/mod.rs index a7a0a516c9..ac7f04da87 100644 --- a/core/src/ten_rust/src/pkg_info/manifest/mod.rs +++ b/core/src/ten_rust/src/pkg_info/manifest/mod.rs @@ -9,6 +9,7 @@ pub mod dependency; pub mod publish; pub mod support; +use std::collections::HashMap; use std::{fmt, fs, path::Path, str::FromStr}; use anyhow::{anyhow, Context, Result}; @@ -44,6 +45,9 @@ pub struct Manifest { #[serde(skip_serializing_if = "Option::is_none")] pub package: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub scripts: Option>, } impl FromStr for Manifest {