diff --git a/pueue/src/daemon/callbacks.rs b/pueue/src/daemon/callbacks.rs index 5d620eab..baa08fc0 100644 --- a/pueue/src/daemon/callbacks.rs +++ b/pueue/src/daemon/callbacks.rs @@ -13,7 +13,7 @@ use pueue_lib::{ use super::state_helper::LockedState; /// Users can specify a callback that's fired whenever a task finishes. -/// Execute the callback by spawning a new subprocess. +/// The callback is performed by spawning a new subprocess. pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) { // Return early, if there's no callback specified let Some(template_string) = &settings.daemon.callback else { @@ -110,8 +110,8 @@ pub fn build_callback_command( handlebars.render_template(template_string, ¶meters) } -/// Look at all running callbacks and log any errors. -/// If everything went smoothly, simply remove them from the list. +/// Look at all running callbacks and check if they're still running. +/// Handle finished callbacks and log their outcome. pub fn check_callbacks(state: &mut LockedState) { let mut finished = Vec::new(); for (id, child) in state.callbacks.iter_mut().enumerate() { diff --git a/pueue/src/daemon/network/follow_log.rs b/pueue/src/daemon/network/follow_log.rs deleted file mode 100644 index 17182952..00000000 --- a/pueue/src/daemon/network/follow_log.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::io::Read; -use std::path::Path; -use std::time::Duration; - -use anyhow::Result; - -use pueue_lib::log::*; -use pueue_lib::network::message::*; -use pueue_lib::network::protocol::{send_message, GenericStream}; -use pueue_lib::state::SharedState; - -/// Handle the continuous stream of a some log output. -/// -/// It's not actually a stream in the sense of a low-level network stream, but rather a series of -/// `Message::Stream` messages, that each send a portion of new log output. -/// -/// It's basically our own chunked stream implementation on top of the protocol we established. -pub async fn handle_follow( - pueue_directory: &Path, - stream: &mut GenericStream, - state: &SharedState, - message: StreamRequestMessage, -) -> Result { - // The user can specify the id of the task they want to follow - // If the id isn't specified and there's only a single running task, this task will be used. - // However, if there are multiple running tasks, the user will have to specify an id. - let task_id = if let Some(task_id) = message.task_id { - task_id - } else { - // Get all ids of running tasks - let state = state.lock().unwrap(); - let running_ids: Vec<_> = state - .tasks - .iter() - .filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None }) - .collect(); - - // Return a message on "no" or multiple running tasks. - match running_ids.len() { - 0 => { - return Ok(create_failure_message("There are no running tasks.")); - } - 1 => running_ids[0], - _ => { - let running_ids = running_ids - .iter() - .map(|id| id.to_string()) - .collect::>() - .join(", "); - return Ok(create_failure_message(format!( - "Multiple tasks are running, please select one of the following: {running_ids}" - ))); - } - } - }; - - // It might be that the task is not yet running. - // Ensure that it exists and is started. - loop { - { - let state = state.lock().unwrap(); - let Some(task) = state.tasks.get(&task_id) else { - return Ok(create_failure_message( - "Pueue: The task to be followed doesn't exist.", - )); - }; - // The task is running or finished, we can start to follow. - if task.is_running() || task.is_done() { - break; - } - } - tokio::time::sleep(Duration::from_millis(1000)).await; - } - - let mut handle = match get_log_file_handle(task_id, pueue_directory) { - Err(_) => { - return Ok(create_failure_message( - "Couldn't find output files for task. Maybe it finished? Try `log`", - )) - } - Ok(handle) => handle, - }; - - // Get the output path. - // We need to check continuously, whether the file still exists, - // since the file can go away (e.g. due to finishing a task). - let path = get_log_path(task_id, pueue_directory); - - // If `lines` is passed as an option, we only want to show the last `X` lines. - // To achieve this, we seek the file handle to the start of the `Xth` line - // from the end of the file. - // The loop following this section will then only copy those last lines to stdout. - if let Some(lines) = message.lines { - if let Err(err) = seek_to_last_lines(&mut handle, lines) { - println!("Error seeking to last lines from log: {err}"); - } - } - - loop { - // Check whether the file still exists. Exit if it doesn't. - if !path.exists() { - return Ok(create_success_message( - "Pueue: Log file has gone away. Has the task been removed?", - )); - } - // Read the next chunk of text from the last position. - let mut buffer = Vec::new(); - - if let Err(err) = handle.read_to_end(&mut buffer) { - return Ok(create_failure_message(format!("Pueue Error: {err}"))); - }; - let text = String::from_utf8_lossy(&buffer).to_string(); - - // Only send a message, if there's actual new content. - if !text.is_empty() { - // Send the next chunk. - let response = Message::Stream(text); - send_message(response, stream).await?; - } - - // Check if the task in question does: - // 1. Still exist - // 2. Is still running - // - // In case it's not, close the stream. - { - let state = state.lock().unwrap(); - let Some(task) = state.tasks.get(&task_id) else { - return Ok(create_failure_message( - "Pueue: The followed task has been removed.", - )); - }; - - // The task is done, just close the stream. - if !task.is_running() { - return Ok(Message::Close); - } - } - - // Wait for 1 second before sending the next chunk. - tokio::time::sleep(Duration::from_millis(1000)).await; - } -} diff --git a/pueue/src/daemon/network/message_handler/add.rs b/pueue/src/daemon/network/message_handler/add.rs index 8310b3bf..f7377cc5 100644 --- a/pueue/src/daemon/network/message_handler/add.rs +++ b/pueue/src/daemon/network/message_handler/add.rs @@ -54,7 +54,7 @@ pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) - // If one is found, we expand the command, otherwise we just take the original command. // Anyhow, we save this separately and keep the original command in a separate field. // - // This allows us to have a debug experience and the user can opt to either show the + // This gives us better debugging capabilities and the user can opt to either show the // original command or the expanded command in their `status` view. task.command = insert_alias(settings, task.original_command.clone()); diff --git a/pueue/src/daemon/network/message_handler/log.rs b/pueue/src/daemon/network/message_handler/log.rs index 17dd3acf..07e1f6a0 100644 --- a/pueue/src/daemon/network/message_handler/log.rs +++ b/pueue/src/daemon/network/message_handler/log.rs @@ -1,8 +1,15 @@ use std::collections::BTreeMap; +use std::io::Read; +use std::path::Path; +use std::time::Duration; + +use anyhow::Result; use pueue_lib::failure_msg; use pueue_lib::log::read_and_compress_log_file; +use pueue_lib::log::*; use pueue_lib::network::message::*; +use pueue_lib::network::protocol::{send_message, GenericStream}; use pueue_lib::settings::Settings; use pueue_lib::state::SharedState; @@ -49,3 +56,136 @@ pub fn get_log(settings: &Settings, state: &SharedState, message: LogRequestMess } Message::LogResponse(tasks) } + +/// Handle the continuous stream of a some log output. +/// +/// It's not actually a stream in the sense of a low-level network stream, but rather a series of +/// `Message::Stream` messages, that each send a portion of new log output. +/// +/// It's basically our own chunked stream implementation on top of the protocol we established. +pub async fn follow_log( + pueue_directory: &Path, + stream: &mut GenericStream, + state: &SharedState, + message: StreamRequestMessage, +) -> Result { + // The user can specify the id of the task they want to follow + // If the id isn't specified and there's only a single running task, this task will be used. + // However, if there are multiple running tasks, the user will have to specify an id. + let task_id = if let Some(task_id) = message.task_id { + task_id + } else { + // Get all ids of running tasks + let state = state.lock().unwrap(); + let running_ids: Vec<_> = state + .tasks + .iter() + .filter_map(|(&id, t)| if t.is_running() { Some(id) } else { None }) + .collect(); + + // Return a message on "no" or multiple running tasks. + match running_ids.len() { + 0 => { + return Ok(create_failure_message("There are no running tasks.")); + } + 1 => running_ids[0], + _ => { + let running_ids = running_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(", "); + return Ok(create_failure_message(format!( + "Multiple tasks are running, please select one of the following: {running_ids}" + ))); + } + } + }; + + // It might be that the task is not yet running. + // Ensure that it exists and is started. + loop { + { + let state = state.lock().unwrap(); + let Some(task) = state.tasks.get(&task_id) else { + return Ok(create_failure_message( + "Pueue: The task to be followed doesn't exist.", + )); + }; + // The task is running or finished, we can start to follow. + if task.is_running() || task.is_done() { + break; + } + } + tokio::time::sleep(Duration::from_millis(1000)).await; + } + + let mut handle = match get_log_file_handle(task_id, pueue_directory) { + Err(_) => { + return Ok(create_failure_message( + "Couldn't find output files for task. Maybe it finished? Try `log`", + )) + } + Ok(handle) => handle, + }; + + // Get the output path. + // We need to check continuously, whether the file still exists, + // since the file can go away (e.g. due to finishing a task). + let path = get_log_path(task_id, pueue_directory); + + // If `lines` is passed as an option, we only want to show the last `X` lines. + // To achieve this, we seek the file handle to the start of the `Xth` line + // from the end of the file. + // The loop following this section will then only copy those last lines to stdout. + if let Some(lines) = message.lines { + if let Err(err) = seek_to_last_lines(&mut handle, lines) { + println!("Error seeking to last lines from log: {err}"); + } + } + + loop { + // Check whether the file still exists. Exit if it doesn't. + if !path.exists() { + return Ok(create_success_message( + "Pueue: Log file has gone away. Has the task been removed?", + )); + } + // Read the next chunk of text from the last position. + let mut buffer = Vec::new(); + + if let Err(err) = handle.read_to_end(&mut buffer) { + return Ok(create_failure_message(format!("Pueue Error: {err}"))); + }; + let text = String::from_utf8_lossy(&buffer).to_string(); + + // Only send a message, if there's actual new content. + if !text.is_empty() { + // Send the next chunk. + let response = Message::Stream(text); + send_message(response, stream).await?; + } + + // Check if the task in question does: + // 1. Still exist + // 2. Is still running + // + // In case it's not, close the stream. + { + let state = state.lock().unwrap(); + let Some(task) = state.tasks.get(&task_id) else { + return Ok(create_failure_message( + "Pueue: The followed task has been removed.", + )); + }; + + // The task is done, just close the stream. + if !task.is_running() { + return Ok(Message::Close); + } + } + + // Wait for 1 second before sending the next chunk. + tokio::time::sleep(Duration::from_millis(1000)).await; + } +} diff --git a/pueue/src/daemon/network/message_handler/mod.rs b/pueue/src/daemon/network/message_handler/mod.rs index 2dd2d781..adf4626b 100644 --- a/pueue/src/daemon/network/message_handler/mod.rs +++ b/pueue/src/daemon/network/message_handler/mod.rs @@ -25,6 +25,8 @@ mod start; mod stash; mod switch; +pub use log::follow_log; + pub fn handle_message(message: Message, state: &SharedState, settings: &Settings) -> Message { match message { Message::Add(message) => add::add_task(settings, state, message), diff --git a/pueue/src/daemon/network/mod.rs b/pueue/src/daemon/network/mod.rs index 7be7480a..45b5a072 100644 --- a/pueue/src/daemon/network/mod.rs +++ b/pueue/src/daemon/network/mod.rs @@ -1,4 +1,3 @@ -pub mod follow_log; pub mod message_handler; pub mod response_helper; pub mod socket; diff --git a/pueue/src/daemon/network/response_helper.rs b/pueue/src/daemon/network/response_helper.rs index 5e39e6ea..55fa38a2 100644 --- a/pueue/src/daemon/network/response_helper.rs +++ b/pueue/src/daemon/network/response_helper.rs @@ -21,11 +21,21 @@ pub fn ensure_group_exists<'state>( ))) } -/// Compile a response for actions that affect several given tasks. -/// These actions can sometimes only succeed for a part of the given tasks. +/// Compile a response for an action that affect several given tasks. +/// That action can sometimes only succeed for a portion of the given tasks. +/// E.g. only running tasks can be killed. /// -/// That's why this helper exists, which determines for which tasks the action succeeded -/// and which tasks failed, based on a given `filter` criterion. +/// That's why this helper exists, which determines for which tasks an action succeeds +/// and which tasks fail, based on a given `filter` criterion. +/// ```rs +/// task_ids = vec![1, 2, 4]; +/// task_action_response_helper( +/// "Tasks are being killed", +/// task_ids.clone(), +/// |task| task.is_running(), +/// &state, +/// ), +/// ``` pub fn task_action_response_helper( message: &str, task_ids: Vec, diff --git a/pueue/src/daemon/network/socket.rs b/pueue/src/daemon/network/socket.rs index 342fd1de..6688e1dd 100644 --- a/pueue/src/daemon/network/socket.rs +++ b/pueue/src/daemon/network/socket.rs @@ -12,12 +12,14 @@ use pueue_lib::network::secret::read_shared_secret; use pueue_lib::settings::Settings; use pueue_lib::state::SharedState; -use crate::daemon::network::follow_log::handle_follow; use crate::daemon::network::message_handler::handle_message; use crate::daemon::process_handler::initiate_shutdown; -/// Poll the listener and accept new incoming connections. -/// Create a new future to handle the message and spawn it. +use super::message_handler::follow_log; + +/// Listen for new connections on the socket. +/// On a new connection, the connected stream will be handled in a separate tokio task. +/// See [handle_incoming] for the actual connection handler function. pub async fn accept_incoming(settings: Settings, state: SharedState) -> Result<()> { let listener = get_listener(&settings.shared).await?; // Read secret once to prevent multiple disk reads. @@ -43,9 +45,19 @@ pub async fn accept_incoming(settings: Settings, state: SharedState) -> Result<( } } -/// Continuously poll the existing incoming futures. -/// In case we received an instruction, handle it and create a response future. -/// The response future is added to unix_responses and handled in a separate function. +/// Handle a new connection from a client. +/// +/// Pueue has a very simple protocol that needs to be followed. +/// 1. Client sends secret for authentication +/// 2. If secret is valid, the daemon sends its own version to the client. +/// 3. The Client sends the instruction message. +/// 4. The Daemon reads the instruction and acts upon it. +/// 5. The Daemon sends a response +/// +/// There're two edge-cases where this pattern is not valid: +/// 1. Shutdown. In that case the message is sent first and the daemon shuts down afterwards. +/// 2. Streaming of logs. The Daemon will continuously send messages with log chunks until +/// the watched task finished or the client disconnects. async fn handle_incoming( mut stream: GenericStream, state: SharedState, @@ -78,14 +90,11 @@ async fn handle_incoming( bail!("Received invalid secret"); } - // Send a short `ok` byte to the client, so it knows that the secret has been accepted. - // This is also the current version of the daemon, so the client can inform the user if the - // daemon needs a restart in case a version difference exists. + // Send confirmation to the client, that the secret was valid. + // This is also the current version of the daemon, so the client can inform user if the + // daemon needs a restart in case of a version mismatch. send_bytes(crate_version!().as_bytes(), &mut stream).await?; - // Get the directory for convenience purposes. - let pueue_directory = settings.shared.pueue_directory(); - loop { // Receive the actual instruction from the client let message_result = receive_message(&mut stream).await; @@ -111,13 +120,13 @@ async fn handle_incoming( // The client requested the output of a task. // Since this involves streaming content, we have to do some special handling. Message::StreamRequest(message) => { - handle_follow(&pueue_directory, &mut stream, &state, message).await? + let pueue_directory = settings.shared.pueue_directory(); + follow_log(&pueue_directory, &mut stream, &state, message).await? } - // Initialize the shutdown procedure. - // The message is forwarded to the TaskHandler, which is responsible for - // gracefully shutting down. + // To initiated a shutdown, a flag in Pueue's state is set that informs the TaskHandler + // to perform a graceful shutdown. // - // This is an edge-case as we have respond to the client first. + // However, this is an edge-case as we have respond to the client first. // Otherwise it might happen, that the daemon shuts down too fast and we aren't // capable of actually sending the message back to the client. Message::DaemonShutdown(shutdown_type) => {