Skip to content

Commit

Permalink
Merge pull request #501 from Nukesor/unlimited-tasks
Browse files Browse the repository at this point in the history
add: Allow 0 in group limits for unlimited parallel tasks
  • Loading branch information
Nukesor authored Feb 15, 2024
2 parents edf8a74 + 6b845cb commit be40e46
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 17 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Added

- Support modification of task priorities via `pueue edit --priority/-o` and `pueue restart --edit-priority/-o` [#449](https://github.com/Nukesor/pueue/issues/449)
- Support modification of task priorities via `pueue edit --priority/-o` and `pueue restart --edit-priority/-o` [#449](https://github.com/Nukesor/pueue/issues/449).
- If no output directory is provided in `completions`, the generated file is printed to `stdout` [#489](https://github.com/Nukesor/pueue/issues/489).
- Allow setting the `parallel_tasks` value of groups to `0`. Setting this value allows unlimited tasks for that group [#500](https://github.com/Nukesor/pueue/issues/500).

### Fixed

Expand Down
18 changes: 3 additions & 15 deletions pueue/src/client/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ https://github.com/Nukesor/pueue/issues/350#issue-1359083118"
This limit is only considered when tasks are scheduled.")]
Parallel {
/// The amount of allowed parallel tasks.
#[arg(value_parser = min_one)]
/// Setting this to 0 means an unlimited amount of parallel tasks.
parallel_tasks: Option<usize>,

/// Set the amount for a specific group.
Expand All @@ -504,7 +504,8 @@ pub enum GroupCommand {
name: String,

/// Set the amount of parallel tasks this group can have.
#[arg(short, long, value_parser = min_one)]
/// Setting this to 0 means an unlimited amount of parallel tasks.
#[arg(short, long)]
parallel: Option<usize>,
},

Expand Down Expand Up @@ -573,16 +574,3 @@ fn parse_delay_until(src: &str) -> Result<DateTime<Local>, String> {
"could not parse as seconds or date expression",
))
}

/// Validator function. The input string has to be parsable as int and bigger than 0
fn min_one(value: &str) -> Result<usize, String> {
match value.parse::<usize>() {
Ok(value) => {
if value < 1 {
return Err("You must provide a value that's bigger than 0".into());
}
Ok(value)
}
Err(_) => Err("Failed to parse integer".into()),
}
}
6 changes: 6 additions & 0 deletions pueue/src/daemon/task_handler/spawn_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ impl TaskHandler {
return false;
}

// If parallel tasks are set to `0`, this means an unlimited amount of tasks may
// run at any given time.
if group.parallel_tasks == 0 {
return true;
}

// Get the currently running tasks by looking at the actually running processes.
// They're sorted by group, which makes this quite convenient.
let running_tasks = match self.children.0.get(&task.group) {
Expand Down
33 changes: 32 additions & 1 deletion pueue/tests/daemon/integration/parallel_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use pretty_assertions::assert_eq;

use pueue_lib::task::*;
use pueue_lib::{network::message::ParallelMessage, task::*};

use crate::helper::*;

Expand Down Expand Up @@ -56,3 +56,34 @@ async fn test_parallel_tasks() -> Result<()> {
}
Ok(())
}

/// Test that a group with a parallel limit of `0` has an unlimited amount of tasks.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_unlimited_parallel_tasks() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Add a new group with 1 slot
add_group_with_slots(shared, "testgroup", 1).await?;

// Add 10 long running tasks to this group, only 1 should be immediately started.
for _ in 0..10 {
assert_success(add_task_to_group(shared, "sleep 600", "testgroup").await?);
}
// Ensure the first tasks is started.
wait_for_task_condition(shared, 0, |task| task.is_running()).await?;

// Update the parallel limit of the group to 0
let message = ParallelMessage {
group: "testgroup".to_string(),
parallel_tasks: 0,
};
assert_success(send_message(shared, message).await?);

// Make sure all other tasks are started as well in quick succession.
for task_id in 1..10 {
wait_for_task_condition(shared, task_id, |task| task.is_running()).await?;
}

Ok(())
}

0 comments on commit be40e46

Please sign in to comment.