diff --git a/CHANGELOG.md b/CHANGELOG.md index db99d83c..b59d84c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ Even though this refactoring significantly simplified the code, it also introduc ### Add - Add `--all` and `--group` to `pueue log`. [#509](https://github.com/Nukesor/pueue/issues/509) +- Add `pueue reset [group_names]` to allow resetting individual groups. [#482](https://github.com/Nukesor/pueue/issues/482) + This also refactors the way resets are done internally, resulting in a cleaner code architecture. ## \[3.4.1\] - 2024-06-04 diff --git a/pueue/src/client/cli.rs b/pueue/src/client/cli.rs index 7bbbe719..b52550ef 100644 --- a/pueue/src/client/cli.rs +++ b/pueue/src/client/cli.rs @@ -455,6 +455,10 @@ https://github.com/Nukesor/pueue/issues/350#issue-1359083118" /// Kill all tasks, clean up afterwards and reset EVERYTHING! Reset { + /// If groups are specified, only those specific groups will be reset. + #[arg(short, long)] + groups: Vec, + /// Don't ask for any confirmation. #[arg(short, long)] force: bool, diff --git a/pueue/src/client/client.rs b/pueue/src/client/client.rs index b62886b6..d20bce36 100644 --- a/pueue/src/client/client.rs +++ b/pueue/src/client/client.rs @@ -173,13 +173,23 @@ impl Client { async fn handle_complex_command(&mut self) -> Result { // This match handles all "complex" commands. match &self.subcommand { - SubCommand::Reset { force, .. } => { + SubCommand::Reset { force, groups } => { // Get the current state and check if there're any running tasks. // If there are, ask the user if they really want to reset the state. let state = get_state(&mut self.stream).await?; + + // Get the groups that should be reset. + let groups: Vec = if groups.is_empty() { + state.groups.keys().cloned().collect() + } else { + groups.clone() + }; + + // Check if there're any running tasks for that group let running_tasks = state .tasks .iter() + .filter(|(_id, task)| groups.contains(&task.group)) .filter_map(|(id, task)| if task.is_running() { Some(*id) } else { None }) .collect::>(); @@ -545,12 +555,18 @@ impl Client { group: group.clone(), } .into(), - SubCommand::Reset { force, .. } => { + SubCommand::Reset { force, groups, .. } => { if self.settings.client.show_confirmation_questions && !force { self.handle_user_confirmation("reset", &Vec::new())?; } - ResetMessage {}.into() + let target = if groups.is_empty() { + ResetTarget::All + } else { + ResetTarget::Groups(groups.clone()) + }; + + ResetMessage { target }.into() } SubCommand::Shutdown => Shutdown::Graceful.into(), SubCommand::Parallel { diff --git a/pueue/src/client/display/group.rs b/pueue/src/client/display/group.rs index 0da2b6d3..3d8ed9eb 100644 --- a/pueue/src/client/display/group.rs +++ b/pueue/src/client/display/group.rs @@ -57,6 +57,7 @@ pub fn get_group_headline(name: &str, group: &Group, style: &OutputStyle) -> Str let status = match group.status { GroupStatus::Running => style.style_text("running", Some(Color::Green), None), GroupStatus::Paused => style.style_text("paused", Some(Color::Yellow), None), + GroupStatus::Reset => style.style_text("resetting", Some(Color::Red), None), }; format!("{} ({} parallel): {}", name, group.parallel_tasks, status) diff --git a/pueue/src/daemon/network/message_handler/mod.rs b/pueue/src/daemon/network/message_handler/mod.rs index 1b1511ed..dac89d27 100644 --- a/pueue/src/daemon/network/message_handler/mod.rs +++ b/pueue/src/daemon/network/message_handler/mod.rs @@ -38,7 +38,7 @@ pub fn handle_message(message: Message, state: &SharedState, settings: &Settings Message::Parallel(message) => parallel::set_parallel_tasks(message, state), Message::Pause(message) => pause::pause(settings, state, message), Message::Remove(task_ids) => remove::remove(settings, state, task_ids), - Message::Reset(_) => reset::reset(settings, state), + Message::Reset(message) => reset::reset(settings, state, message), Message::Restart(message) => restart::restart_multiple(settings, state, message), Message::Send(message) => send::send(state, message), Message::Start(message) => start::start(settings, state, message), diff --git a/pueue/src/daemon/network/message_handler/reset.rs b/pueue/src/daemon/network/message_handler/reset.rs index 7adfcf99..e2e9e78b 100644 --- a/pueue/src/daemon/network/message_handler/reset.rs +++ b/pueue/src/daemon/network/message_handler/reset.rs @@ -1,4 +1,5 @@ -use pueue_lib::state::SharedState; +use pueue_lib::failure_msg; +use pueue_lib::state::{GroupStatus, SharedState}; use pueue_lib::{network::message::*, settings::Settings}; use crate::daemon::process_handler; @@ -6,9 +7,40 @@ use crate::daemon::process_handler; /// Invoked when calling `pueue reset`. /// Kill all children by using the `kill` function. /// Set the full_reset flag, which will prevent new tasks from being spawned. -pub fn reset(settings: &Settings, state: &SharedState) -> Message { +pub fn reset(settings: &Settings, state: &SharedState, message: ResetMessage) -> Message { let mut state = state.lock().unwrap(); - state.full_reset = true; - process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None); + + match message.target { + ResetTarget::All => { + // Mark all groups to be reset and kill all tasks + for (_name, group) in state.groups.iter_mut() { + group.status = GroupStatus::Reset; + } + process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None); + } + ResetTarget::Groups(groups) => { + // First up, check whether we actually have all requested groups. + for name in groups.iter() { + let group = state.groups.get(name); + if group.is_none() { + return failure_msg!("Group '{name}' doesn't exist."); + } + } + + // Mark all groups to be reset and kill its tasks + for name in groups.iter() { + let group = state.groups.get_mut(name).unwrap(); + group.status = GroupStatus::Reset; + + process_handler::kill::kill( + settings, + &mut state, + TaskSelection::Group(name.to_string()), + false, + None, + ); + } + } + } create_success_message("Everything is being reset right now.") } diff --git a/pueue/src/daemon/process_handler/finish.rs b/pueue/src/daemon/process_handler/finish.rs index fd08e0e1..e0ccee56 100644 --- a/pueue/src/daemon/process_handler/finish.rs +++ b/pueue/src/daemon/process_handler/finish.rs @@ -2,6 +2,7 @@ use anyhow::Context; use chrono::Local; use log::info; use pueue_lib::log::clean_log_handles; +use pueue_lib::state::GroupStatus; use pueue_lib::task::{TaskResult, TaskStatus}; use super::*; @@ -95,8 +96,13 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) { pause_on_failure(state, settings, &task.group); } - // Already remove the output files, if the daemon is being reset anyway - if state.full_reset { + // Already remove the output files, if this group is being reset. + if state + .groups + .get(&task.group) + .map(|group| group.status == GroupStatus::Reset) + .unwrap_or(true) + { clean_log_handles(*task_id, &settings.shared.pueue_directory()); } } diff --git a/pueue/src/daemon/task_handler.rs b/pueue/src/daemon/task_handler.rs index 90d21095..0ce7119d 100644 --- a/pueue/src/daemon/task_handler.rs +++ b/pueue/src/daemon/task_handler.rs @@ -6,7 +6,6 @@ use chrono::prelude::*; use log::{error, info}; use pueue_lib::children::Children; -use pueue_lib::log::*; use pueue_lib::network::message::*; use pueue_lib::network::protocol::socket_cleanup; use pueue_lib::settings::Settings; @@ -14,7 +13,7 @@ use pueue_lib::state::{Group, GroupStatus, SharedState}; use pueue_lib::task::{TaskResult, TaskStatus}; use crate::daemon::pid::cleanup_pid_file; -use crate::daemon::state_helper::{reset_state, save_state}; +use crate::daemon::state_helper::save_state; use crate::ok_or_shutdown; use super::callbacks::{check_callbacks, spawn_callback}; @@ -46,26 +45,24 @@ pub async fn run(state: SharedState, settings: Settings) -> Result<()> { } loop { - { + 'mutex_block: { let mut state = state.lock().unwrap(); check_callbacks(&mut state); handle_finished_tasks(&settings, &mut state); - enqueue_delayed_tasks(&settings, &mut state); - check_failed_dependencies(&settings, &mut state); + // Check if we're in shutdown. + // If all tasks are killed, we do some cleanup and exit. if state.shutdown.is_some() { - // Check if we're in shutdown. - // If all tasks are killed, we do some cleanup and exit. handle_shutdown(&settings, &mut state); - } else if state.full_reset { - // Wait until all tasks are killed. - // Once they are, reset everything and go back to normal - handle_reset(&settings, &mut state); - } else { - // Only start new tasks, if we aren't in the middle of a reset or shutdown. - spawn_new(&settings, &mut state); + break 'mutex_block; } + + // If we aren't in shutdown mode, do the usual stuff + handle_group_resets(&settings, &mut state); + enqueue_delayed_tasks(&settings, &mut state); + check_failed_dependencies(&settings, &mut state); + spawn_new(&settings, &mut state); } tokio::time::sleep(Duration::from_millis(300)).await; @@ -106,20 +103,28 @@ fn handle_shutdown(settings: &Settings, state: &mut LockedState) { /// and no new tasks will be spawned. /// This function checks, if all killed children have been handled. /// If that's the case, completely reset the state -fn handle_reset(settings: &Settings, state: &mut LockedState) { - // Don't do any reset logic, if we aren't in reset mode or if some children are still up. - if state.children.has_active_tasks() { - return; - } +fn handle_group_resets(_settings: &Settings, state: &mut LockedState) { + let groups_to_reset: Vec = state + .groups + .iter() + .filter(|(_name, group)| group.status == GroupStatus::Reset) + .map(|(name, _)| name.to_string()) + .collect(); + + for name in groups_to_reset.iter() { + // Don't do any reset logic, if there're still some children are still up. + if state.children.has_group_active_tasks(name) { + continue; + } - if let Err(error) = reset_state(state, settings) { - error!("Failed to reset state with error: {error:?}"); - }; + // Remove all tasks that belong to the group to reset + state.tasks.retain(|_id, task| &task.group != name); - if let Err(error) = reset_task_log_directory(&settings.shared.pueue_directory()) { - panic!("Error while resetting task log directory: {error}"); - }; - state.full_reset = false; + // Restart the group, now that it's devoid of tasks. + if let Some(group) = state.groups.get_mut(name) { + group.status = GroupStatus::Running; + } + } } /// As time passes, some delayed tasks may need to be enqueued. diff --git a/pueue/tests/daemon/integration/reset.rs b/pueue/tests/daemon/integration/reset.rs index ac4fd843..9e5bfa90 100644 --- a/pueue/tests/daemon/integration/reset.rs +++ b/pueue/tests/daemon/integration/reset.rs @@ -1,9 +1,9 @@ use anyhow::{Context, Result}; -use pueue_lib::network::message::*; +use pueue_lib::{network::message::*, state::GroupStatus}; use crate::helper::*; -/// A reset command kills all tasks and forces a clean state. +/// A reset command kills all tasks and forces a clean state accross groups. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_reset() -> Result<()> { let daemon = daemon().await?; @@ -12,21 +12,128 @@ async fn test_reset() -> Result<()> { // Start a long running task and make sure it's started add_task(shared, "ls").await?; add_task(shared, "failed").await?; - add_task(shared, "sleep 60").await?; + add_task_to_group(shared, "sleep 60", "test_2").await?; add_task(shared, "ls").await?; wait_for_task_condition(shared, 2, |task| task.is_running()).await?; - // Reset the daemon - send_message(shared, ResetMessage {}) - .await - .context("Failed to send Start tasks message")?; + // Reset all groups of the daemon + send_message( + shared, + ResetMessage { + target: ResetTarget::All, + }, + ) + .await + .context("Failed to send Start tasks message")?; - // Resetting is asynchronous, wait for the first task to disappear. + // Resetting is asynchronous, wait for all task to disappear. wait_for_task_absence(shared, 0).await?; + wait_for_task_absence(shared, 1).await?; + wait_for_task_absence(shared, 2).await?; + wait_for_task_absence(shared, 3).await?; // All tasks should have been removed. let state = get_state(shared).await?; assert!(state.tasks.is_empty(),); + // Both groups should be running. + assert_eq!( + state.groups.get("default").unwrap().status, + GroupStatus::Running + ); + assert_eq!( + state.groups.get("test_2").unwrap().status, + GroupStatus::Running + ); + + Ok(()) +} + +/// A reset command kills all tasks and forces a clean state. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_reset_single_group() -> Result<()> { + let daemon = daemon().await?; + let shared = &daemon.settings.shared; + + // Start a long running task and make sure it's started + add_task(shared, "ls").await?; + add_task(shared, "failed").await?; + add_task_to_group(shared, "sleep 60", "test_2").await?; + add_task_to_group(shared, "sleep 60", "test_3").await?; + wait_for_task_condition(shared, 2, |task| task.is_running()).await?; + + // Reset only the test_2 of the daemon. + send_message( + shared, + ResetMessage { + target: ResetTarget::Groups(vec!["test_2".to_string()]), + }, + ) + .await + .context("Failed to send Start tasks message")?; + + // Resetting is asynchronous, wait for the third task to disappear. + wait_for_task_absence(shared, 2).await?; + + // All tasks should have been removed. + let state = get_state(shared).await?; + assert_eq!( + state.tasks.len(), + 3, + "Only a single task should have been removed" + ); + + assert_eq!( + state.groups.get("test_2").unwrap().status, + GroupStatus::Running + ); + + Ok(()) +} + +/// A reset command kills all tasks and forces a clean state. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_reset_multiple_groups() -> Result<()> { + let daemon = daemon().await?; + let shared = &daemon.settings.shared; + + // Start a long running task and make sure it's started + add_task(shared, "ls").await?; + add_task(shared, "failed").await?; + add_task_to_group(shared, "sleep 60", "test_2").await?; + add_task_to_group(shared, "sleep 60", "test_3").await?; + wait_for_task_condition(shared, 2, |task| task.is_running()).await?; + + // Reset only the test_2 of the daemon. + send_message( + shared, + ResetMessage { + target: ResetTarget::Groups(vec!["test_2".to_string(), "test_3".to_string()]), + }, + ) + .await + .context("Failed to send Start tasks message")?; + + // Resetting is asynchronous, wait for the third task to disappear. + wait_for_task_absence(shared, 2).await?; + wait_for_task_absence(shared, 3).await?; + + // All tasks should have been removed. + let state = get_state(shared).await?; + assert_eq!( + state.tasks.len(), + 2, + "Only a two task should have been removed" + ); + + assert_eq!( + state.groups.get("test_2").unwrap().status, + GroupStatus::Running + ); + assert_eq!( + state.groups.get("test_3").unwrap().status, + GroupStatus::Running + ); + Ok(()) } diff --git a/pueue_lib/src/children.rs b/pueue_lib/src/children.rs index f16f18dc..5055c2db 100644 --- a/pueue_lib/src/children.rs +++ b/pueue_lib/src/children.rs @@ -15,6 +15,16 @@ impl Children { self.0.iter().any(|(_, pool)| !pool.is_empty()) } + /// Returns whether there are any active tasks for the given group. + /// + /// Returns `false` if the group cannot be found. + pub fn has_group_active_tasks(&self, group: &str) -> bool { + self.0 + .get(group) + .map(|pool| !pool.is_empty()) + .unwrap_or(false) + } + /// A convenience function to check whether there's child with a given task_id. /// We have to do a nested linear search, as these datastructure aren't indexed via task_ids. pub fn has_child(&self, task_id: usize) -> bool { diff --git a/pueue_lib/src/log.rs b/pueue_lib/src/log.rs index b119be7f..aed9cf6d 100644 --- a/pueue_lib/src/log.rs +++ b/pueue_lib/src/log.rs @@ -1,4 +1,4 @@ -use std::fs::{read_dir, remove_file, File}; +use std::fs::{remove_file, File}; use std::io::{self, prelude::*, Read, SeekFrom}; use std::path::{Path, PathBuf}; @@ -104,22 +104,6 @@ pub fn read_last_log_file_lines( Ok(read_last_lines(&mut file, lines)) } -/// Remove all files in the log directory. -pub fn reset_task_log_directory(pueue_dir: &Path) -> Result<(), Error> { - let task_log_dir = pueue_dir.join("task_logs"); - - let files = read_dir(&task_log_dir) - .map_err(|err| Error::IoPathError(task_log_dir, "reading task log files", err))?; - - for file in files.flatten() { - if let Err(err) = remove_file(file.path()) { - error!("Failed to delete log file: {err}"); - } - } - - Ok(()) -} - /// Read the last `amount` lines of a file to a string. /// /// Only use this for logic that doesn't stream from daemon to client! diff --git a/pueue_lib/src/network/message.rs b/pueue_lib/src/network/message.rs index 507b490c..9e497799 100644 --- a/pueue_lib/src/network/message.rs +++ b/pueue_lib/src/network/message.rs @@ -280,7 +280,17 @@ pub struct GroupResponseMessage { impl_into_message!(GroupResponseMessage, Message::GroupResponse); #[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)] -pub struct ResetMessage {} +pub enum ResetTarget { + // Reset all groups + All, + // Reset a list of specific groups + Groups(Vec), +} + +#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)] +pub struct ResetMessage { + pub target: ResetTarget, +} impl_into_message!(ResetMessage, Message::Reset); diff --git a/pueue_lib/src/state.rs b/pueue_lib/src/state.rs index a5eb65c9..1be1b036 100644 --- a/pueue_lib/src/state.rs +++ b/pueue_lib/src/state.rs @@ -19,6 +19,9 @@ pub type SharedState = Arc>; pub enum GroupStatus { Running, Paused, + // This state is set, if this group is being reset. + // This means that all tasks are being killed and removed. + Reset, } /// The representation of a group. @@ -62,11 +65,7 @@ pub struct State { /// This is runtime state and won't be serialised to disk. #[serde(default, skip)] pub shutdown: Option, - /// A simple flag which is used to signal that we're currently doing a full reset of the daemon. - /// This flag prevents new tasks from being spawned. - /// This is runtime state and won't be serialised to disk. - #[serde(default, skip)] - pub full_reset: bool, + /// Pueue's subprocess and worker pool representation. /// Take a look at [Children] for more info. /// This is runtime state and won't be serialised to disk. @@ -84,7 +83,6 @@ impl Clone for State { tasks: self.tasks.clone(), groups: self.groups.clone(), shutdown: self.shutdown.clone(), - full_reset: self.full_reset, ..Default::default() } } @@ -94,10 +92,7 @@ impl Clone for State { impl Eq for State {} impl PartialEq for State { fn eq(&self, other: &Self) -> bool { - self.tasks == other.tasks - && self.groups == other.groups - && self.shutdown == other.shutdown - && self.full_reset == other.full_reset + self.tasks == other.tasks && self.groups == other.groups && self.shutdown == other.shutdown } }