Skip to content

Commit

Permalink
Merge pull request #550 from Nukesor/reset-by-group
Browse files Browse the repository at this point in the history
add: Allow resetting individual groups
  • Loading branch information
Nukesor authored Jun 24, 2024
2 parents 4e800a8 + 41a0dae commit 0e22596
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 72 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Even though this refactoring significantly simplified the code, it also introduc
### Add

- Add `--all` and `--group` to `pueue log`. [#509](https://github.com/Nukesor/pueue/issues/509)
- Add `pueue reset [group_names]` to allow resetting individual groups. [#482](https://github.com/Nukesor/pueue/issues/482)
This also refactors the way resets are done internally, resulting in a cleaner code architecture.

## \[3.4.1\] - 2024-06-04

Expand Down
4 changes: 4 additions & 0 deletions pueue/src/client/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ https://github.com/Nukesor/pueue/issues/350#issue-1359083118"

/// Kill all tasks, clean up afterwards and reset EVERYTHING!
Reset {
/// If groups are specified, only those specific groups will be reset.
#[arg(short, long)]
groups: Vec<String>,

/// Don't ask for any confirmation.
#[arg(short, long)]
force: bool,
Expand Down
22 changes: 19 additions & 3 deletions pueue/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,23 @@ impl Client {
async fn handle_complex_command(&mut self) -> Result<bool> {
// This match handles all "complex" commands.
match &self.subcommand {
SubCommand::Reset { force, .. } => {
SubCommand::Reset { force, groups } => {
// Get the current state and check if there're any running tasks.
// If there are, ask the user if they really want to reset the state.
let state = get_state(&mut self.stream).await?;

// Get the groups that should be reset.
let groups: Vec<String> = if groups.is_empty() {
state.groups.keys().cloned().collect()
} else {
groups.clone()
};

// Check if there're any running tasks for that group
let running_tasks = state
.tasks
.iter()
.filter(|(_id, task)| groups.contains(&task.group))
.filter_map(|(id, task)| if task.is_running() { Some(*id) } else { None })
.collect::<Vec<_>>();

Expand Down Expand Up @@ -545,12 +555,18 @@ impl Client {
group: group.clone(),
}
.into(),
SubCommand::Reset { force, .. } => {
SubCommand::Reset { force, groups, .. } => {
if self.settings.client.show_confirmation_questions && !force {
self.handle_user_confirmation("reset", &Vec::new())?;
}

ResetMessage {}.into()
let target = if groups.is_empty() {
ResetTarget::All
} else {
ResetTarget::Groups(groups.clone())
};

ResetMessage { target }.into()
}
SubCommand::Shutdown => Shutdown::Graceful.into(),
SubCommand::Parallel {
Expand Down
1 change: 1 addition & 0 deletions pueue/src/client/display/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn get_group_headline(name: &str, group: &Group, style: &OutputStyle) -> Str
let status = match group.status {
GroupStatus::Running => style.style_text("running", Some(Color::Green), None),
GroupStatus::Paused => style.style_text("paused", Some(Color::Yellow), None),
GroupStatus::Reset => style.style_text("resetting", Some(Color::Red), None),
};

format!("{} ({} parallel): {}", name, group.parallel_tasks, status)
Expand Down
2 changes: 1 addition & 1 deletion pueue/src/daemon/network/message_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn handle_message(message: Message, state: &SharedState, settings: &Settings
Message::Parallel(message) => parallel::set_parallel_tasks(message, state),
Message::Pause(message) => pause::pause(settings, state, message),
Message::Remove(task_ids) => remove::remove(settings, state, task_ids),
Message::Reset(_) => reset::reset(settings, state),
Message::Reset(message) => reset::reset(settings, state, message),
Message::Restart(message) => restart::restart_multiple(settings, state, message),
Message::Send(message) => send::send(state, message),
Message::Start(message) => start::start(settings, state, message),
Expand Down
40 changes: 36 additions & 4 deletions pueue/src/daemon/network/message_handler/reset.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,46 @@
use pueue_lib::state::SharedState;
use pueue_lib::failure_msg;
use pueue_lib::state::{GroupStatus, SharedState};
use pueue_lib::{network::message::*, settings::Settings};

use crate::daemon::process_handler;

/// Invoked when calling `pueue reset`.
/// Kill all children by using the `kill` function.
/// Set the full_reset flag, which will prevent new tasks from being spawned.
pub fn reset(settings: &Settings, state: &SharedState) -> Message {
pub fn reset(settings: &Settings, state: &SharedState, message: ResetMessage) -> Message {
let mut state = state.lock().unwrap();
state.full_reset = true;
process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None);

match message.target {
ResetTarget::All => {
// Mark all groups to be reset and kill all tasks
for (_name, group) in state.groups.iter_mut() {
group.status = GroupStatus::Reset;
}
process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None);
}
ResetTarget::Groups(groups) => {
// First up, check whether we actually have all requested groups.
for name in groups.iter() {
let group = state.groups.get(name);
if group.is_none() {
return failure_msg!("Group '{name}' doesn't exist.");
}
}

// Mark all groups to be reset and kill its tasks
for name in groups.iter() {
let group = state.groups.get_mut(name).unwrap();
group.status = GroupStatus::Reset;

process_handler::kill::kill(
settings,
&mut state,
TaskSelection::Group(name.to_string()),
false,
None,
);
}
}
}
create_success_message("Everything is being reset right now.")
}
10 changes: 8 additions & 2 deletions pueue/src/daemon/process_handler/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Context;
use chrono::Local;
use log::info;
use pueue_lib::log::clean_log_handles;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::{TaskResult, TaskStatus};

use super::*;
Expand Down Expand Up @@ -95,8 +96,13 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) {
pause_on_failure(state, settings, &task.group);
}

// Already remove the output files, if the daemon is being reset anyway
if state.full_reset {
// Already remove the output files, if this group is being reset.
if state
.groups
.get(&task.group)
.map(|group| group.status == GroupStatus::Reset)
.unwrap_or(true)
{
clean_log_handles(*task_id, &settings.shared.pueue_directory());
}
}
Expand Down
57 changes: 31 additions & 26 deletions pueue/src/daemon/task_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use chrono::prelude::*;
use log::{error, info};

use pueue_lib::children::Children;
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::socket_cleanup;
use pueue_lib::settings::Settings;
use pueue_lib::state::{Group, GroupStatus, SharedState};
use pueue_lib::task::{TaskResult, TaskStatus};

use crate::daemon::pid::cleanup_pid_file;
use crate::daemon::state_helper::{reset_state, save_state};
use crate::daemon::state_helper::save_state;
use crate::ok_or_shutdown;

use super::callbacks::{check_callbacks, spawn_callback};
Expand Down Expand Up @@ -46,26 +45,24 @@ pub async fn run(state: SharedState, settings: Settings) -> Result<()> {
}

loop {
{
'mutex_block: {
let mut state = state.lock().unwrap();

check_callbacks(&mut state);
handle_finished_tasks(&settings, &mut state);
enqueue_delayed_tasks(&settings, &mut state);
check_failed_dependencies(&settings, &mut state);

// Check if we're in shutdown.
// If all tasks are killed, we do some cleanup and exit.
if state.shutdown.is_some() {
// Check if we're in shutdown.
// If all tasks are killed, we do some cleanup and exit.
handle_shutdown(&settings, &mut state);
} else if state.full_reset {
// Wait until all tasks are killed.
// Once they are, reset everything and go back to normal
handle_reset(&settings, &mut state);
} else {
// Only start new tasks, if we aren't in the middle of a reset or shutdown.
spawn_new(&settings, &mut state);
break 'mutex_block;
}

// If we aren't in shutdown mode, do the usual stuff
handle_group_resets(&settings, &mut state);
enqueue_delayed_tasks(&settings, &mut state);
check_failed_dependencies(&settings, &mut state);
spawn_new(&settings, &mut state);
}

tokio::time::sleep(Duration::from_millis(300)).await;
Expand Down Expand Up @@ -106,20 +103,28 @@ fn handle_shutdown(settings: &Settings, state: &mut LockedState) {
/// and no new tasks will be spawned.
/// This function checks, if all killed children have been handled.
/// If that's the case, completely reset the state
fn handle_reset(settings: &Settings, state: &mut LockedState) {
// Don't do any reset logic, if we aren't in reset mode or if some children are still up.
if state.children.has_active_tasks() {
return;
}
fn handle_group_resets(_settings: &Settings, state: &mut LockedState) {
let groups_to_reset: Vec<String> = state
.groups
.iter()
.filter(|(_name, group)| group.status == GroupStatus::Reset)
.map(|(name, _)| name.to_string())
.collect();

for name in groups_to_reset.iter() {
// Don't do any reset logic, if there're still some children are still up.
if state.children.has_group_active_tasks(name) {
continue;
}

if let Err(error) = reset_state(state, settings) {
error!("Failed to reset state with error: {error:?}");
};
// Remove all tasks that belong to the group to reset
state.tasks.retain(|_id, task| &task.group != name);

if let Err(error) = reset_task_log_directory(&settings.shared.pueue_directory()) {
panic!("Error while resetting task log directory: {error}");
};
state.full_reset = false;
// Restart the group, now that it's devoid of tasks.
if let Some(group) = state.groups.get_mut(name) {
group.status = GroupStatus::Running;
}
}
}

/// As time passes, some delayed tasks may need to be enqueued.
Expand Down
123 changes: 115 additions & 8 deletions pueue/tests/daemon/integration/reset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::{Context, Result};
use pueue_lib::network::message::*;
use pueue_lib::{network::message::*, state::GroupStatus};

use crate::helper::*;

/// A reset command kills all tasks and forces a clean state.
/// A reset command kills all tasks and forces a clean state accross groups.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_reset() -> Result<()> {
let daemon = daemon().await?;
Expand All @@ -12,21 +12,128 @@ async fn test_reset() -> Result<()> {
// Start a long running task and make sure it's started
add_task(shared, "ls").await?;
add_task(shared, "failed").await?;
add_task(shared, "sleep 60").await?;
add_task_to_group(shared, "sleep 60", "test_2").await?;
add_task(shared, "ls").await?;
wait_for_task_condition(shared, 2, |task| task.is_running()).await?;

// Reset the daemon
send_message(shared, ResetMessage {})
.await
.context("Failed to send Start tasks message")?;
// Reset all groups of the daemon
send_message(
shared,
ResetMessage {
target: ResetTarget::All,
},
)
.await
.context("Failed to send Start tasks message")?;

// Resetting is asynchronous, wait for the first task to disappear.
// Resetting is asynchronous, wait for all task to disappear.
wait_for_task_absence(shared, 0).await?;
wait_for_task_absence(shared, 1).await?;
wait_for_task_absence(shared, 2).await?;
wait_for_task_absence(shared, 3).await?;

// All tasks should have been removed.
let state = get_state(shared).await?;
assert!(state.tasks.is_empty(),);

// Both groups should be running.
assert_eq!(
state.groups.get("default").unwrap().status,
GroupStatus::Running
);
assert_eq!(
state.groups.get("test_2").unwrap().status,
GroupStatus::Running
);

Ok(())
}

/// A reset command kills all tasks and forces a clean state.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_reset_single_group() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Start a long running task and make sure it's started
add_task(shared, "ls").await?;
add_task(shared, "failed").await?;
add_task_to_group(shared, "sleep 60", "test_2").await?;
add_task_to_group(shared, "sleep 60", "test_3").await?;
wait_for_task_condition(shared, 2, |task| task.is_running()).await?;

// Reset only the test_2 of the daemon.
send_message(
shared,
ResetMessage {
target: ResetTarget::Groups(vec!["test_2".to_string()]),
},
)
.await
.context("Failed to send Start tasks message")?;

// Resetting is asynchronous, wait for the third task to disappear.
wait_for_task_absence(shared, 2).await?;

// All tasks should have been removed.
let state = get_state(shared).await?;
assert_eq!(
state.tasks.len(),
3,
"Only a single task should have been removed"
);

assert_eq!(
state.groups.get("test_2").unwrap().status,
GroupStatus::Running
);

Ok(())
}

/// A reset command kills all tasks and forces a clean state.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_reset_multiple_groups() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Start a long running task and make sure it's started
add_task(shared, "ls").await?;
add_task(shared, "failed").await?;
add_task_to_group(shared, "sleep 60", "test_2").await?;
add_task_to_group(shared, "sleep 60", "test_3").await?;
wait_for_task_condition(shared, 2, |task| task.is_running()).await?;

// Reset only the test_2 of the daemon.
send_message(
shared,
ResetMessage {
target: ResetTarget::Groups(vec!["test_2".to_string(), "test_3".to_string()]),
},
)
.await
.context("Failed to send Start tasks message")?;

// Resetting is asynchronous, wait for the third task to disappear.
wait_for_task_absence(shared, 2).await?;
wait_for_task_absence(shared, 3).await?;

// All tasks should have been removed.
let state = get_state(shared).await?;
assert_eq!(
state.tasks.len(),
2,
"Only a two task should have been removed"
);

assert_eq!(
state.groups.get("test_2").unwrap().status,
GroupStatus::Running
);
assert_eq!(
state.groups.get("test_3").unwrap().status,
GroupStatus::Running
);

Ok(())
}
Loading

0 comments on commit 0e22596

Please sign in to comment.