Skip to content

Commit

Permalink
fix: report cache and alert manager should install chrome (#3348)
Browse files Browse the repository at this point in the history
Co-authored-by: Ankur Srivastava <[email protected]>
Co-authored-by: Hengfei Yang <[email protected]>
  • Loading branch information
3 people committed Apr 26, 2024
1 parent 4296f61 commit 138ac5d
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 5 deletions.
2 changes: 1 addition & 1 deletion deploy/build/Dockerfile.pr.amd64
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

FROM public.ecr.aws/debian/debian:bookworm-slim as runtime
RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates curl htop iftop sysstat procps lsof net-tools sqlite3 && \
apt-get install -y --no-install-recommends libatk1.0-0 libnss3 libglib2.0-0/stable ca-certificates curl htop iftop sysstat procps lsof net-tools sqlite3 && \
update-ca-certificates
COPY ./bin/openobserve /
RUN ["/openobserve", "init-dir", "-p", "/data/"]
Expand Down
4 changes: 3 additions & 1 deletion src/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub async fn init() -> Result<(), anyhow::Error> {

// initialize chrome launch options, so that if chrome download is
// needed, it will happen now and not during serving report API
let _ = get_chrome_launch_options().await;
if cluster::is_alert_manager(&cluster::LOCAL_NODE_ROLE) {
let _ = get_chrome_launch_options().await;
}
Ok(())
}
8 changes: 8 additions & 0 deletions src/infra/src/scheduler/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ WHERE org = ? AND module_key = ? AND module = ?;"#,
) -> Result<Vec<Trigger>> {
let pool = CLIENT.clone();

log::debug!("Start pulling scheduled_job");
let now = chrono::Utc::now().timestamp_micros();
let report_max_time = now
+ Duration::try_seconds(report_timeout)
Expand Down Expand Up @@ -257,6 +258,11 @@ FOR UPDATE SKIP LOCKED;
}
};

log::debug!(
"scheduler pull: selected scheduled jobs for update: {}",
job_ids.len()
);

let job_ids: Vec<String> = job_ids.into_iter().map(|id| id.id.to_string()).collect();

if let Err(e) = sqlx::query(
Expand Down Expand Up @@ -284,6 +290,7 @@ WHERE FIND_IN_SET(id, ?);
return Err(e.into());
}

log::debug!("Update scheduled jobs for selected pull job ids");
if let Err(e) = tx.commit().await {
log::error!("[MYSQL] commit scheduler pull update error: {}", e);
return Err(e.into());
Expand All @@ -295,6 +302,7 @@ WHERE FIND_IN_SET(id, ?);
.bind(job_ids.join(","))
.fetch_all(&pool)
.await?;
log::debug!("Returning the pulled triggers: {}", jobs.len());
Ok(jobs)
}

Expand Down
30 changes: 30 additions & 0 deletions src/service/alerts/alert_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
};

pub async fn run() -> Result<(), anyhow::Error> {
log::debug!("Pulling jobs from scheduler");
// Scheduler pulls only those triggers that match the conditions-
// - trigger.next_run_at <= now
// - !(trigger.is_realtime && !trigger.is_silenced)
Expand All @@ -42,6 +43,8 @@ pub async fn run() -> Result<(), anyhow::Error> {
)
.await?;

log::debug!("Pulled {} jobs from scheduler", triggers.len());

for trigger in triggers {
tokio::task::spawn(async move {
if let Err(e) = handle_triggers(trigger).await {
Expand All @@ -60,6 +63,10 @@ pub async fn handle_triggers(trigger: db::scheduler::Trigger) -> Result<(), anyh
}

async fn handle_alert_triggers(trigger: db::scheduler::Trigger) -> Result<(), anyhow::Error> {
log::debug!(
"Inside handle_alert_triggers: processing trigger: {}",
&trigger.module_key
);
let columns = trigger.module_key.split('/').collect::<Vec<&str>>();
assert_eq!(columns.len(), 3);
let org_id = &trigger.org;
Expand Down Expand Up @@ -157,6 +164,11 @@ async fn handle_alert_triggers(trigger: db::scheduler::Trigger) -> Result<(), an
db::scheduler::update_trigger(new_trigger).await?;
}
Err(e) => {
log::error!(
"Error sending alert notification: org: {}, module_key: {}",
&new_trigger.org,
&new_trigger.module_key
);
db::scheduler::update_status(
&new_trigger.org,
new_trigger.module,
Expand All @@ -171,6 +183,11 @@ async fn handle_alert_triggers(trigger: db::scheduler::Trigger) -> Result<(), an
}
}
} else {
log::debug!(
"Alert conditions not satisfied, org: {}, module_key: {}",
&new_trigger.org,
&new_trigger.module_key
);
db::scheduler::update_trigger(new_trigger).await?;
trigger_data_stream.status = TriggerDataStatus::ConditionNotSatisfied;
}
Expand All @@ -183,6 +200,11 @@ async fn handle_alert_triggers(trigger: db::scheduler::Trigger) -> Result<(), an
}

async fn handle_report_triggers(trigger: db::scheduler::Trigger) -> Result<(), anyhow::Error> {
log::debug!(
"Inside handle_report_trigger,org: {}, module_key: {}",
&trigger.org,
&trigger.module_key
);
let org_id = &trigger.org;
// For report, trigger.module_key is the report name
let report_name = &trigger.module_key;
Expand All @@ -198,6 +220,11 @@ async fn handle_report_triggers(trigger: db::scheduler::Trigger) -> Result<(), a
};

if !report.enabled {
log::debug!(
"Report not enabled: org: {}, report: {}",
org_id,
report_name
);
// update trigger, check on next week
new_trigger.next_run_at += Duration::try_days(7).unwrap().num_microseconds().unwrap();
db::scheduler::update_trigger(new_trigger).await?;
Expand Down Expand Up @@ -269,14 +296,17 @@ async fn handle_report_triggers(trigger: db::scheduler::Trigger) -> Result<(), a
let now = Utc::now().timestamp_micros();
match report.send_subscribers().await {
Ok(_) => {
log::debug!("Report send_subscribers done, report: {}", report_name);
// Report generation successful, update the trigger
if run_once {
new_trigger.status = db::scheduler::TriggerStatus::Completed;
}
db::scheduler::update_trigger(new_trigger).await?;
log::debug!("Update trigger for report: {}", report_name);
trigger_data_stream.end_time = Utc::now().timestamp_micros();
}
Err(e) => {
log::error!("Error sending report to subscribers: {e}");
db::scheduler::update_status(
&new_trigger.org,
new_trigger.module,
Expand Down
12 changes: 12 additions & 0 deletions src/service/dashboards/reports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ async fn generate_report(
log::info!("launching browser for dashboard {dashboard_id}");
let (mut browser, mut handler) =
Browser::launch(get_chrome_launch_options().await.as_ref().unwrap().clone()).await?;
log::info!("browser launched");

let handle = tokio::task::spawn(async move {
while let Some(h) = handler.next().await {
Expand All @@ -332,14 +333,18 @@ async fn generate_report(
});

let web_url = format!("{}{}/web", CONFIG.common.web_url, CONFIG.common.base_uri);
log::info!("Navigating to web url: {}", &web_url);
let page = browser.new_page(&format!("{web_url}/login")).await?;
page.disable_log().await?;
log::info!("headless: new page created");

page.find_element("input[type='email']")
.await?
.click()
.await?
.type_str(user_id)
.await?;
log::info!("headless: email input filled");

page.find_element("input[type='password']")
.await?
Expand All @@ -349,6 +354,7 @@ async fn generate_report(
.await?
.press_key("Enter")
.await?;
log::info!("headless: password input filled");

// Does not seem to work for single page client application
page.wait_for_navigation().await?;
Expand Down Expand Up @@ -419,7 +425,11 @@ async fn generate_report(
}
};

log::info!("headless: going to dash url");

page.goto(&dashb_url).await?;
log::info!("headless: going to dash url");

// Wait for navigation does not really wait until it is fully loaded
page.wait_for_navigation().await?;

Expand Down Expand Up @@ -460,12 +470,14 @@ async fn generate_report(

browser.close().await?;
handle.await?;
log::info!("done with headless browser");
Ok((pdf_data, email_dashb_url))
}

async fn wait_for_panel_data_load(page: &Page) -> Result<(), anyhow::Error> {
let start = std::time::Instant::now();
let timeout = Duration::from_secs(CONFIG.chrome.chrome_sleep_secs.into());
log::info!("waiting for headless data to load");
loop {
if page
.find_element("span#dashboardVariablesAndPanelsDataLoaded")
Expand Down
18 changes: 17 additions & 1 deletion src/service/db/dashboards/reports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,23 @@ pub async fn watch() -> Result<(), anyhow::Error> {
match ev {
db::Event::Put(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
let item_value: Report = json::from_slice(&ev.value.unwrap()).unwrap();
let item_value: Report = if config::CONFIG.common.meta_store_external {
match db::get(&ev.key).await {
Ok(val) => match json::from_slice(&val) {
Ok(val) => val,
Err(e) => {
log::error!("Error getting value: {}", e);
continue;
}
},
Err(e) => {
log::error!("Error getting value: {}", e);
continue;
}
}
} else {
json::from_slice(&ev.value.unwrap()).unwrap()
};
DASHBOARD_REPORTS.insert(item_key.to_owned(), item_value);
}
db::Event::Delete(ev) => {
Expand Down
4 changes: 2 additions & 2 deletions web/src/components/reports/ReportList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ const perPageOptions: any = [
];
const resultTotal = ref<number>(0);
const maxRecordToReturn = ref<number>(100);
const selectedPerPage = ref<number>(2);
const selectedPerPage = ref<number>(20);
const pagination: any = ref({
rowsPerPage: 2,
rowsPerPage: 20,
});
const reportsStateLoadingMap: Ref<{ [key: string]: boolean }> = ref({});
Expand Down

0 comments on commit 138ac5d

Please sign in to comment.