diff --git a/rust-components/pm-bot-forwarding-handler/src/handlers.rs b/rust-components/pm-bot-forwarding-handler/src/handlers.rs index d3dbb2e..dfd04c0 100644 --- a/rust-components/pm-bot-forwarding-handler/src/handlers.rs +++ b/rust-components/pm-bot-forwarding-handler/src/handlers.rs @@ -402,9 +402,9 @@ pub async fn bot_reinitialize_handler( forwarding_bot_service: ForwardingBotService, ) -> anyhow::Result<()> { let parent_cx = update.cx.unwrap_or_default(); - let app_root = span!(tracing::Level::INFO, "bot_reinitialize_handler"); - app_root.set_parent(parent_cx); - let _guard = app_root.enter(); + let span = span!(tracing::Level::INFO, "bot_reinitialize_handler"); + span.set_parent(parent_cx); + let _guard = span.enter(); let parent_msg = callback_query .message diff --git a/rust-components/pm-bot-forwarding-handler/src/main.rs b/rust-components/pm-bot-forwarding-handler/src/main.rs index 1efdda3..df2c7bd 100644 --- a/rust-components/pm-bot-forwarding-handler/src/main.rs +++ b/rust-components/pm-bot-forwarding-handler/src/main.rs @@ -26,6 +26,10 @@ async fn main() -> anyhow::Result<()> { settings::Settings::read_from_default_file().expect("Failed to read settings"); observability::tracing::init_tracer(service_name, settings); + std::panic::set_hook(Box::new(|panic_info| { + log::error!("Panic occurred: {:?}", panic_info); + })); + let amqp_conn = new_amqp_connection(settings).await; let db = database::init_conn(settings.database.as_ref().unwrap()).await?; let redis_client = redis::client::new_client(settings); @@ -51,8 +55,8 @@ async fn main() -> anyhow::Result<()> { let run_bot = run(bot, listener, redis_storage, db, settings.clone()); let run_web_server = HttpServer::new(move || { App::new() - .wrap(actix_web::middleware::Logger::default()) .wrap(RequestTracing::default()) + .wrap(actix_web::middleware::Logger::default()) .app_data(actix_web::web::Data::new(forwarding_bot_service.clone())) .app_data(actix_web::web::Data::new( forwarding_message_service.clone(), diff --git a/rust-components/pm-bot-forwarding-handler/src/services/forwarding_bot.rs b/rust-components/pm-bot-forwarding-handler/src/services/forwarding_bot.rs index 5db4bb7..8df68a9 100644 --- a/rust-components/pm-bot-forwarding-handler/src/services/forwarding_bot.rs +++ b/rust-components/pm-bot-forwarding-handler/src/services/forwarding_bot.rs @@ -4,6 +4,7 @@ use reqwest::Url; use sea_orm::prelude::*; use sea_orm::ActiveValue; use teloxide::prelude::*; +use tracing::span; use pegasus_common::database::entities; use pegasus_common::settings::Settings; @@ -130,7 +131,16 @@ impl IForwardingBotService for ForwardingBotService { ..Default::default() } .insert(&self.db) - .await?; + .await + .map_err(|err| { + log::error!("Error creating bot record: {}", err); + err + })?; + + self.initialize_bot(bot.id).await.map_err(|err| { + log::error!("Error initializing bot: {}", err); + err + })?; Ok(bot) } @@ -146,8 +156,6 @@ impl IForwardingBotService for ForwardingBotService { .await? .ok_or_else(|| anyhow::anyhow!("Bot not found"))?; - self.initialize_bot(bot.id).await?; - Ok(bot) } @@ -168,6 +176,15 @@ impl IForwardingBotService for ForwardingBotService { .await? .ok_or_else(|| anyhow::anyhow!("Bot record not found"))?; + let client = Bot::new(&bot.bot_token); + client + .log_out() + .await + .map_err(|err| { + log::error!("Error logging out bot: {}", err); + }) + .ok(); + let client = self.new_bot_client(&bot.bot_token)?; client .set_webhook(Url::parse(&format!( @@ -175,7 +192,11 @@ impl IForwardingBotService for ForwardingBotService { bot.bot_token ))?) .secret_token(&bot.bot_webhook_secret) - .await?; + .await + .map_err(|err| { + log::error!("Error setting webhook: {}", err); + err + })?; Ok(()) } diff --git a/rust-components/pm-bot-forwarding-handler/src/services/forwarding_message.rs b/rust-components/pm-bot-forwarding-handler/src/services/forwarding_message.rs index 1e757c9..5e76e31 100644 --- a/rust-components/pm-bot-forwarding-handler/src/services/forwarding_message.rs +++ b/rust-components/pm-bot-forwarding-handler/src/services/forwarding_message.rs @@ -1,12 +1,8 @@ -use std::borrow::Cow; - -use opentelemetry::trace::{Status, TraceContextExt, Tracer}; -use opentelemetry::Context; use reqwest::Url; use sea_orm::prelude::*; use sea_orm::ActiveValue; use teloxide::prelude::*; -use teloxide::types::{MessageId, UpdateKind}; +use teloxide::types::{InputFile, MediaKind, MessageId, MessageKind, UpdateKind}; use pegasus_common::database::entities; use pegasus_common::settings::Settings; @@ -62,35 +58,40 @@ impl IForwardingMessageService for ForwardingMessageService { let chat = update .chat() - .ok_or_else(|| anyhow::anyhow!("Missing chat"))? - .to_owned(); + .ok_or_else(|| anyhow::anyhow!("Missing chat"))?; let message_id = match &update.kind { UpdateKind::Message(m) => m.id, - _ => return Ok(()), // ignore non-message updates + _ => { + // ignore non-message updates + log::debug!("Ignoring non-message update from chat {}", &chat.id.0); + return Ok(()); + } }; let client = self.new_bot_client(&bot.bot_token)?; let r = if chat.id.0 == bot.target_chat_id { - self.handle_target_chat_message(bot, update).await + log::debug!("Handling message reply from target chat {}", &chat.id.0); + self.handle_target_chat_message(bot, update.clone()).await } else if chat.is_private() { - self.handle_forward_message(bot, update).await + log::debug!("Handling message from chat {}", &chat.id.0); + self.handle_forward_message(bot, update.clone()).await } else { - log::debug!("Ignoring message from chat {}", chat.id.0); + log::debug!("Ignoring message from chat {}", &chat.id.0); return Ok(()); }; match r { Err(err) => { - log::error!("Error handling message: {}, chat_id: {}", err, chat.id.0); + log::error!("Error handling message: {}, chat_id: {}", err, &chat.id.0); client .send_message(chat.id, format!("Error handling message: {}", err)) .reply_to_message_id(message_id) .await?; } Ok(_) => { - log::debug!("Message handled successfully, chat_id: {}", chat.id.0); + log::debug!("Message handled successfully, chat_id: {}", &chat.id.0); client .send_message(chat.id, "Message sent") .reply_to_message_id(message_id) @@ -114,7 +115,13 @@ impl ForwardingMessageService { .ok_or_else(|| anyhow::anyhow!("Missing chat"))? .clone(); - let bot = self.new_bot_client(&bot_info.bot_token)?; + tracing::debug!("Creating bot client"); + + let bot = self + .new_bot_client(&bot_info.bot_token) + .map_err(|err| anyhow::anyhow!("Error creating bot client: {}", err))?; + + tracing::debug!("Matching update kind & media kind"); match update.kind { UpdateKind::Message(ref message) => { @@ -131,29 +138,82 @@ impl ForwardingMessageService { return Ok(()); } - // forward message to target chat - let msg_forward = bot - .send_message( - ChatId(bot_info.target_chat_id), - combine_forwarding_content( - &message - .from() - .map(|from| from.first_name.as_str()) - .unwrap_or("Unknown"), - chat.id.0, - message.id.0, - message.text().unwrap_or("Empty Message"), - ), - ) - .parse_mode(teloxide::types::ParseMode::Html) - .await?; + let meta = forwarding_meta( + &message + .from() + .map(|from| { + format!( + "{}{}", + from.first_name, + if from.last_name.is_some() { + format!(" {}", from.last_name.as_ref().unwrap()) + } else { + "".into() + } + ) + }) + .unwrap_or("Unknown".into()), + chat.id.0, + message.id.0, + ); + + let msg_forward_id = match &message.kind { + MessageKind::Common(common_message) => match &common_message.media_kind { + MediaKind::Text(media) => { + bot.send_message( + ChatId(bot_info.target_chat_id), + format!("{}\n\n{}", meta, media.text), + ) + .parse_mode(teloxide::types::ParseMode::Html) + .await? + .id + } + _ => { + let file = match &common_message.media_kind { + MediaKind::Animation(m) => Some(&m.animation.file), + MediaKind::Audio(m) => Some(&m.audio.file), + MediaKind::Contact(_) => None, + MediaKind::Document(m) => Some(&m.document.file), + MediaKind::Game(_) => None, + MediaKind::Venue(_) => None, + MediaKind::Location(_) => None, + MediaKind::Photo(m) => Some(&m.photo.last().unwrap().file), + MediaKind::Poll(_) => None, + MediaKind::Sticker(m) => Some(&m.sticker.file), + MediaKind::Text(_) => { + unreachable!("Text message already handled") + } + MediaKind::Video(m) => Some(&m.video.file), + MediaKind::VideoNote(_) => None, + MediaKind::Voice(m) => Some(&m.voice.file), + MediaKind::Migration(_) => None, + } + .ok_or_else(|| anyhow::anyhow!("Unsupported media kind"))?; + + bot.send_document( + ChatId(bot_info.target_chat_id), + InputFile::file_id(&file.id), + ) + .await? + .id + } + }, + _ => { + return Err(anyhow::anyhow!("Unsupported message kind")); + } + }; + + tracing::debug!( + name = "Storing message to database", + message_id = msg_forward_id.0, + ); // store message to database entities::pm_forwarding_message::ActiveModel { bot_id: ActiveValue::Set(bot_info.id), telegram_chat_id: ActiveValue::Set(chat.id.0), telegram_message_id: ActiveValue::Set(message.id.0), - forward_telegram_message_id: ActiveValue::Set(msg_forward.id.0), + forward_telegram_message_id: ActiveValue::Set(msg_forward_id.0), ..Default::default() } .save(&self.db) @@ -173,11 +233,12 @@ impl ForwardingMessageService { bot_info: entities::pm_forwarding_bot::Model, update: Update, ) -> anyhow::Result<()> { - let cx = Context::current(); - match update.kind { UpdateKind::Message(ref message) => { - let bot = Bot::new(bot_info.bot_token); + let bot = self + .new_bot_client(&bot_info.bot_token) + .map_err(|err| anyhow::anyhow!("Error creating bot client: {}", err))?; + let reply_id = message .reply_to_message() .map(|msg| msg.id.0) @@ -196,12 +257,9 @@ impl ForwardingMessageService { // reply to original message bot.send_message( ChatId(message_entity.telegram_chat_id), - message.text().ok_or_else(|| { - cx.span().set_status(Status::Error { - description: Cow::from("Missing message text"), - }); - anyhow::anyhow!("Missing message text") - })?, + message + .text() + .ok_or_else(|| anyhow::anyhow!("Missing message text"))?, ) .reply_to_message_id(MessageId(message_entity.telegram_message_id)) .await?; @@ -215,16 +273,14 @@ impl ForwardingMessageService { } } -fn combine_forwarding_content(from: &str, chat_id: i64, message_id: i32, text: &str) -> String { +fn forwarding_meta(from: &str, chat_id: i64, message_id: i32) -> String { format!( r#" From: {} Chat ID: {} Message ID: {} - - {} "#, - from, chat_id, chat_id, message_id, text + from, chat_id, chat_id, message_id ) .lines() .map(|line| line.trim_start()) @@ -233,22 +289,3 @@ fn combine_forwarding_content(from: &str, chat_id: i64, message_id: i32, text: & .trim() .to_string() } - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_combine_forwarding_content() { - let from = "test"; - let chat_id = 123; - let message_id = 456; - let text = "test message"; - - let content = combine_forwarding_content(from, chat_id, message_id, text); - assert_eq!( - content, - "From: test\nChat ID: 123\nMessage ID: 456\n\ntest message" - ); - } -}