Skip to content

Commit

Permalink
feat: job notification should emit job status ... (#1165)
Browse files Browse the repository at this point in the history
... for successful and failed jobs.
  • Loading branch information
milenkovicm authored Jan 20, 2025
1 parent fc6cd54 commit f1529cf
Showing 1 changed file with 54 additions and 9 deletions.
63 changes: 54 additions & 9 deletions ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,24 +380,27 @@ impl JobState for InMemoryJobState {
Some(Status::Successful(_)) | Some(Status::Failed(_))
) {
self.completed_jobs
.insert(job_id.to_string(), (status, Some(graph.clone())));
.insert(job_id.to_string(), (status.clone(), Some(graph.clone())));
self.running_jobs.remove(job_id);
} else if let Some(old_status) =
self.running_jobs.insert(job_id.to_string(), status)
{
self.job_event_sender.send(&JobStateEvent::JobUpdated {
job_id: job_id.to_string(),
status: old_status,
})
} else {
// otherwise update running job
self.running_jobs.insert(job_id.to_string(), status.clone());
}

// job change event emitted
// it is emitting current job status
self.job_event_sender.send(&JobStateEvent::JobUpdated {
job_id: job_id.to_string(),
status,
});

Ok(())
}

async fn get_session(&self, session_id: &str) -> Result<Arc<SessionContext>> {
self.sessions
.get(session_id)
.map(|sess| sess.clone())
.map(|session_ctx| session_ctx.clone())
.ok_or_else(|| {
BallistaError::General(format!("No session for {session_id} found"))
})
Expand Down Expand Up @@ -500,11 +503,15 @@ mod test {

use crate::cluster::memory::InMemoryJobState;
use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
use crate::cluster::{JobState, JobStateEvent};
use crate::test_utils::{
test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
};
use ballista_core::error::Result;
use ballista_core::serde::protobuf::JobStatus;
use ballista_core::utils::{default_config_producer, default_session_builder};
use futures::StreamExt;
use tokio::sync::Barrier;

#[tokio::test]
async fn test_in_memory_job_lifecycle() -> Result<()> {
Expand Down Expand Up @@ -571,4 +578,42 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_in_memory_job_notification() -> Result<()> {
let state = InMemoryJobState::new(
"",
Arc::new(default_session_builder),
Arc::new(default_config_producer),
);

let event_stream = state.job_state_events().await?;
let barrier = Arc::new(Barrier::new(2));

let events = tokio::spawn({
let barrier = barrier.clone();
async move {
barrier.wait().await;
event_stream.collect::<Vec<JobStateEvent>>().await
}
});

barrier.wait().await;
test_job_lifecycle(state, test_aggregation_plan(4).await).await?;
let result = events.await?;
assert_eq!(2, result.len());
match result.last().unwrap() {
JobStateEvent::JobUpdated {
status:
JobStatus {
status: Some(ballista_core::serde::protobuf::job_status::Status::Successful(_)),
..
},
..
} => (), // assert!(true, "Last status should be successful job notification"),
_ => panic!("JobUpdated status expected"),
}

Ok(())
}
}

0 comments on commit f1529cf

Please sign in to comment.