Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(callbacks): queued_count and stashed_count callback vars #583

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TLDR: The new task state representation is more verbose but significantly cleane
- Add `command` filter to `pueue status`. [#524](https://github.com/Nukesor/pueue/issues/524) [#560](https://github.com/Nukesor/pueue/pull/560)
- Allow `pueue status` to order tasks by `enqueue_at`. [#554](https://github.com/Nukesor/pueue/issues/554)
- Added Windows service on Windows to allow a true daemon experience. [#344](https://github.com/Nukesor/pueue/issues/344) [#567](https://github.com/Nukesor/pueue/pull/567)
- Add `queued_count` and `stashed_count` to callback template variables. This allows users to fire callbacks when whole groups are finished. [#578](https://github.com/Nukesor/pueue/issues/578)

### Fixed

Expand Down
16 changes: 15 additions & 1 deletion pueue/src/daemon/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task)
};

// Build the command to be called from the template string in the configuration file.
let callback_command = match build_callback_command(settings, task, template_string) {
let callback_command = match build_callback_command(settings, state, task, template_string) {
Ok(callback_command) => callback_command,
Err(err) => {
error!("Failed to create callback command from template with error: {err}");
Expand Down Expand Up @@ -49,6 +49,7 @@ pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task)
/// finished task.
pub fn build_callback_command(
settings: &Settings,
state: &mut LockedState,
task: &Task,
template_string: &str,
) -> Result<String, RenderError> {
Expand All @@ -62,7 +63,20 @@ pub fn build_callback_command(
parameters.insert("id", task.id.to_string());
parameters.insert("command", task.command.clone());
parameters.insert("path", (*task.path.to_string_lossy()).to_owned());

// Add group information to template
// This includes how many stashed and queued tasks are left in the group.
parameters.insert("group", task.group.clone());
let queued_tasks = state
.filter_tasks_of_group(|task| task.is_queued(), &task.group)
.matching_ids
.len();
parameters.insert("queued_count", queued_tasks.to_string());
let stashed_tasks = state
.filter_tasks_of_group(|task| task.is_stashed(), &task.group)
.matching_ids
.len();
parameters.insert("stashed_count", stashed_tasks.to_string());

// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done { result, .. } = &task.status {
Expand Down
3 changes: 2 additions & 1 deletion pueue/src/daemon/process_handler/finish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) {
None => TaskResult::Killed,
};

info!("Task {task_id} finished with result: {result:?}");

// Update the tasks's state and return a clone for callback handling.
let task = {
let task = state
Expand Down Expand Up @@ -147,7 +149,6 @@ fn get_finished(state: &mut LockedState) -> Vec<((usize, String, usize), Option<
// Child process did not exit yet
Ok(None) => continue,
Ok(_exit_status) => {
info!("Task {task_id} just finished");
finished.push(((*task_id, group.clone(), *worker_id), None));
}
}
Expand Down
43 changes: 43 additions & 0 deletions pueue/tests/daemon/integration/callback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::fs::read_to_string;

use anyhow::{Context, Result};

use crate::helper::*;

/// Make sure that callback commands are executed while variables are
/// templated into the command as expected.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_callback_variables() -> Result<()> {
let (mut settings, tempdir) = daemon_base_setup()?;

// Configure the daemon to use a callback command that echos some variables into a file
// that's located in the temporary runtime directory of the daemon.
let tempdir_path = tempdir.path().to_path_buf();
let echo_command =
"echo '{{queued_count}}\n{{stashed_count}}\n{{command}}\n{{id}}\n{{result}}'";
settings.daemon.callback = Some(format!(
"{echo_command} > {}/testfile",
tempdir_path.to_string_lossy()
));
settings
.save(&Some(tempdir_path.join("pueue.yml")))
.context("Couldn't write pueue config to temporary directory")?;

// Create the daemon with the changed settings.
let daemon = daemon_with_settings(settings, tempdir).await?;
let shared = &daemon.settings.shared;

// Create one stashed task.
assert_success(create_stashed_task(shared, "stashed", None).await?);
// Create a task that'll then trigger the callback
assert_success(add_task(shared, "ls").await?);

// Give the callback command some time to be executed.
sleep_ms(3000).await;

let callback_output = read_to_string(tempdir_path.join("testfile"))?;

assert_eq!(callback_output, "0\n1\nls\n1\nSuccess\n");

Ok(())
}
1 change: 1 addition & 0 deletions pueue/tests/daemon/integration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod add;
mod aliases;
mod callback;
mod clean;
mod edit;
mod environment_variables;
Expand Down
32 changes: 5 additions & 27 deletions pueue/tests/daemon/integration/stashed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,22 @@ use pueue_lib::state::GroupStatus;
use rstest::rstest;

use pueue_lib::network::message::*;
use pueue_lib::settings::Shared;
use pueue_lib::task::*;

use crate::helper::*;

/// Helper to pause the whole daemon
pub async fn add_stashed_task(
shared: &Shared,
command: &str,
stashed: bool,
enqueue_at: Option<DateTime<Local>>,
) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.stashed = stashed;
message.enqueue_at = enqueue_at;

send_message(shared, message)
.await
.context("Failed to to add task message")
}

/// Tasks can be stashed and scheduled for being enqueued at a specific point in time.
///
/// Furthermore these stashed tasks can then be manually enqueued again.
#[rstest]
#[case(true, None)]
#[case(true, Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))]
#[case(false, Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))]
#[case(None)]
#[case(Some(Local::now() + TimeDelta::try_minutes(2).unwrap()))]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_enqueued_tasks(
#[case] stashed: bool,
#[case] enqueue_at: Option<DateTime<Local>>,
) -> Result<()> {
async fn test_enqueued_tasks(#[case] enqueue_at: Option<DateTime<Local>>) -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

assert_success(add_stashed_task(shared, "sleep 10", stashed, enqueue_at).await?);
assert_success(create_stashed_task(shared, "sleep 10", enqueue_at).await?);

// The task should be added in stashed state.
let task = wait_for_task_condition(shared, 0, |task| task.is_stashed()).await?;
Expand Down Expand Up @@ -77,10 +56,9 @@ async fn test_delayed_tasks() -> Result<()> {
let shared = &daemon.settings.shared;

// The task will be stashed and automatically enqueued after about 1 second.
let response = add_stashed_task(
let response = create_stashed_task(
shared,
"sleep 10",
true,
Some(Local::now() + TimeDelta::try_seconds(1).unwrap()),
)
.await?;
Expand Down
27 changes: 27 additions & 0 deletions pueue/tests/helper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! This module contains helper functions, which are used by both, the client and daemon tests.
use ::log::{warn, LevelFilter};
use anyhow::Result;
use simplelog::{Config, ConfigBuilder, TermLogger, TerminalMode};
use tokio::io::{self, AsyncWriteExt};

pub use pueue_lib::state::PUEUE_DEFAULT_GROUP;
Expand Down Expand Up @@ -27,6 +29,31 @@ pub use wait::*;
// Global acceptable test timeout
const TIMEOUT: u64 = 5000;

/// Use this function to enable log output for in-runtime daemon output.
#[allow(dead_code)]
pub fn enable_logger() {
let level = LevelFilter::Debug;

// Try to initialize the logger with the timezone set to the Local time of the machine.
let mut builder = ConfigBuilder::new();
let logger_config = match builder.set_time_offset_to_local() {
Err(_) => {
warn!("Failed to determine the local time of this machine. Fallback to UTC.");
Config::default()
}
Ok(builder) => builder.build(),
};

// Init a terminal logger
TermLogger::init(
level,
logger_config.clone(),
TerminalMode::Stderr,
simplelog::ColorChoice::Auto,
)
.unwrap()
}

/// A helper function to sleep for ms time.
/// Only used to avoid the biolerplate of importing the same stuff all over the place.
pub async fn sleep_ms(ms: u64) {
Expand Down
16 changes: 16 additions & 0 deletions pueue/tests/helper/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::env::vars;

use anyhow::{anyhow, Context, Result};

use chrono::{DateTime, Local};
use pueue_lib::network::message::*;
use pueue_lib::settings::*;
use pueue_lib::task::{Task, TaskStatus};
Expand All @@ -27,6 +28,21 @@ pub fn create_add_message(shared: &Shared, command: &str) -> AddMessage {
}
}

/// Helper to create a stashed task
pub async fn create_stashed_task(
shared: &Shared,
command: &str,
enqueue_at: Option<DateTime<Local>>,
) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.stashed = true;
message.enqueue_at = enqueue_at;

send_message(shared, message)
.await
.context("Failed to to add task message")
}

/// Helper to either continue the daemon or start specific tasks
pub async fn start_tasks(shared: &Shared, tasks: TaskSelection) -> Result<Message> {
let message = StartMessage { tasks };
Expand Down
Loading