Skip to content

Commit

Permalink
feat(notification): slack notification system implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Joxit committed Jul 16, 2020
1 parent 10c96ef commit 52a5795
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 42 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ structopt = "^0.3"
yaml-rust = "^0.4"
linked-hash-map = "^0.5"
libc = "^0.2"
attohttpc = "^0.15"
openssl = { version = "^0.10", features = ["vendored"] }
json = "^0.12"
3 changes: 2 additions & 1 deletion examples/resources/notification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ notification:
slack:
url: https://hooks.slack.com/services/XXXXX/XXXXX/XXXXX
channel: '#channel'
emoji: ':rocket:'
emoji: ':rocket:'
when: always
35 changes: 33 additions & 2 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::Config;
use crate::config::{Config, WhenNotify};
use crate::fst::*;
use libc::{fork, signal};
use libc::{SIGHUP, SIG_IGN};
Expand Down Expand Up @@ -99,7 +99,7 @@ impl Run {
.map_err(|msg| format!("Can't run command `{}`: {}", cmd_line, msg))?;
processes[task.id()] = Some(child);
} else if graph_iter.is_done() {
return Ok(());
break;
} else {
let mut done = 0;
for id in 0..processes.len() {
Expand All @@ -108,6 +108,11 @@ impl Run {
done = done + 1;
graph_iter.mark_done(id);
processes[id] = None;
self.notify(
&config,
format!("Task {} ended", graph.get_state_from_id(id).label()).as_str(),
WhenNotify::TaskEnd,
);
}
}
}
Expand All @@ -116,6 +121,32 @@ impl Run {
}
}
}

self.notify(&config, "All tasks ended", WhenNotify::End);

Ok(())
}

fn notify(&self, config: &Config, msg: &str, when: WhenNotify) {
if let Some(notification) = config.notification() {
if *notification.when() == WhenNotify::Never
|| (*notification.when() != WhenNotify::Always && *notification.when() != when)
{
return;
}
if let Some(slack) = notification.slack() {
if let Some(when_slack) = slack.when() {
if *when_slack == WhenNotify::Never
|| (*when_slack != WhenNotify::Always && *when_slack != when)
{
return;
}
}
if let Err(e) = crate::notification::post_slack(&slack, msg) {
eprintln!("Can't use slac notification: {}", e);
}
}
}
}

fn stdout(&self) -> Result<Stdio, String> {
Expand Down
26 changes: 24 additions & 2 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@ pub struct Config {
#[derive(Debug, PartialEq, Clone)]
pub struct Notification {
slack: Option<Slack>,
when: WhenNotify,
}

#[derive(Debug, PartialEq, Clone)]
pub struct Slack {
url: String,
channel: String,
emoji: Option<String>,
when: Option<WhenNotify>,
}

#[derive(Debug, PartialEq, Clone)]
pub enum WhenNotify {
Always,
TaskEnd,
End,
Never,
}

impl Config {
Expand Down Expand Up @@ -50,11 +60,19 @@ impl Config {
pub fn concurrency(&self) -> i64 {
self.concurrency
}

pub fn notification(&self) -> &Option<Notification> {
&self.notification
}
}

impl Notification {
pub fn slack(&self) -> &Slack {
&self.slack()
pub fn slack(&self) -> &Option<Slack> {
&self.slack
}

pub fn when(&self) -> &WhenNotify {
&self.when
}
}

Expand All @@ -70,4 +88,8 @@ impl Slack {
pub fn emoji(&self) -> &Option<String> {
&self.emoji
}

pub fn when(&self) -> &Option<WhenNotify> {
&self.when
}
}
27 changes: 27 additions & 0 deletions src/config/yaml_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const SLACK_KEY: &str = "slack";
const URL_KEY: &str = "url";
const CHANNEL_KEY: &str = "channel";
const EMOJI_KEY: &str = "emoji";
const WHEN_KEY: &str = "when";

pub trait YamlTasksScheduler {
fn get_tasks(&self) -> Result<HashMap<String, Task>, String>;
Expand All @@ -21,6 +22,7 @@ pub trait YamlTasksScheduler {
fn get_notification(&self) -> Result<Option<Notification>, String>;
fn get_slack(&self) -> Result<Option<Slack>, String>;
fn get_string(&self, key: &str) -> Result<Option<String>, String>;
fn get_when_notify(&self) -> Result<Option<WhenNotify>, String>;
}

impl YamlTasksScheduler for LinkedHashMap<Yaml, Yaml> {
Expand Down Expand Up @@ -91,6 +93,7 @@ impl YamlTasksScheduler for LinkedHashMap<Yaml, Yaml> {
if let Some(notification) = self.get(&Yaml::String(String::from(NOTIFICATION_KEY))) {
return Ok(Some(Notification {
slack: notification.get_slack()?,
when: notification.get_when_notify()?.unwrap_or(WhenNotify::End),
}));
}
Ok(None)
Expand All @@ -106,6 +109,7 @@ impl YamlTasksScheduler for LinkedHashMap<Yaml, Yaml> {
.get_string(CHANNEL_KEY)?
.ok_or(String::from("Slack channel is required!"))?,
emoji: slack.get_string(EMOJI_KEY)?,
when: slack.get_when_notify()?,
}));
}
Ok(None)
Expand All @@ -122,6 +126,21 @@ impl YamlTasksScheduler for LinkedHashMap<Yaml, Yaml> {
Ok(None)
}
}

fn get_when_notify(&self) -> Result<Option<WhenNotify>, String> {
if let Some(when) = self.get_string(WHEN_KEY)? {
match when.as_str() {
"always" => Ok(Some(WhenNotify::Always)),
"task-end" => Ok(Some(WhenNotify::TaskEnd)),
"end" => Ok(Some(WhenNotify::End)),
"never" => Ok(Some(WhenNotify::Never)),
"" => Ok(None),
_ => Err(format!("{} is an incorrect value for when", when)),
}
} else {
Ok(None)
}
}
}

impl YamlTasksScheduler for Yaml {
Expand Down Expand Up @@ -179,6 +198,14 @@ impl YamlTasksScheduler for Yaml {
Ok(None)
}
}

fn get_when_notify(&self) -> Result<Option<WhenNotify>, String> {
if let Some(when_notify) = self.as_hash() {
when_notify.get_when_notify()
} else {
Ok(None)
}
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/fst/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum TaskStatus {
}

impl TaskIter {
pub fn new(fst: TaskFst) -> TaskIter {
pub fn new(fst: &TaskFst) -> TaskIter {
let copy = fst.clone();
TaskIter {
fst: copy,
Expand Down
40 changes: 4 additions & 36 deletions src/fst/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ impl TaskFst {
self.states[to].prev.push(from);
}

pub fn get_state_from_label(&self, label: String) -> &TaskFstState {
self.states.iter().find(|s| s.label == label).unwrap()
}

pub fn get_state_id_from_label(&self, label: String) -> usize {
self.states.iter().position(|s| s.label == label).unwrap()
pub fn get_state_from_id(&self, id: usize) -> &TaskFstState {
&self.states[id]
}

pub fn is_cyclic(&self) -> bool {
Expand Down Expand Up @@ -81,8 +77,8 @@ impl TaskFst {
false
}

pub fn iter(self) -> TaskIter {
TaskIter::new(self)
pub fn iter(&self) -> TaskIter {
TaskIter::new(&self)
}
}

Expand Down Expand Up @@ -140,34 +136,6 @@ mod test {
)
}

#[test]
pub fn get_state_from_label() {
let mut fst = TaskFst::new();
fst.add_state("a".to_string());
fst.add_state("b".to_string());
fst.add_arc(0, 1);

assert_eq!(
fst.get_state_from_label("b".to_string()),
&TaskFstState {
label: "b".to_string(),
id: 1,
next: vec![],
prev: vec![0]
}
)
}

#[test]
pub fn get_state_id_from_label() {
let mut fst = TaskFst::new();
fst.add_state("a".to_string());
fst.add_state("b".to_string());
fst.add_arc(0, 1);

assert_eq!(fst.get_state_id_from_label("b".to_string()), 1)
}

#[test]
pub fn is_cyclic() {
let mut fst = TaskFst::new();
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use structopt::StructOpt;
mod commands;
mod config;
mod fst;
mod notification;

#[derive(Debug, StructOpt)]
#[structopt(name = "task-scheduler", author, about)]
Expand Down
2 changes: 2 additions & 0 deletions src/notification/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub use crate::notification::slack::*;
mod slack;
30 changes: 30 additions & 0 deletions src/notification/slack.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use crate::config::Slack;

pub fn post_slack(slack: &Slack, message: &str) -> Result<(), String> {
let mut content = json::object! {
"channel" => slack.channel().as_str(),
"username" => "task-scheduler",
"text" => message
};

if let Some(emoji) = slack.emoji() {
content
.insert("icon_emoji", emoji.as_str())
.map_err(|msg| format!("{}", msg))?;
}

let resp = attohttpc::post(slack.url())
.text(content.dump())
.send()
.unwrap();

if resp.status() != 200 {
Err(format!(
"Notification failed: status code {} and body: {}",
resp.status(),
resp.text().unwrap_or("<Empty Body>".to_string())
))
} else {
Ok(())
}
}

0 comments on commit 52a5795

Please sign in to comment.