Skip to content

Commit

Permalink
feat(WIP): support multi types of message
Browse files Browse the repository at this point in the history
  • Loading branch information
AH-dark committed Apr 20, 2024
1 parent 3b0cc0e commit 27db76d
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 71 deletions.
6 changes: 3 additions & 3 deletions rust-components/pm-bot-forwarding-handler/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ pub async fn bot_reinitialize_handler(
forwarding_bot_service: ForwardingBotService,
) -> anyhow::Result<()> {
let parent_cx = update.cx.unwrap_or_default();
let app_root = span!(tracing::Level::INFO, "bot_reinitialize_handler");
app_root.set_parent(parent_cx);
let _guard = app_root.enter();
let span = span!(tracing::Level::INFO, "bot_reinitialize_handler");
span.set_parent(parent_cx);
let _guard = span.enter();

let parent_msg = callback_query
.message
Expand Down
6 changes: 5 additions & 1 deletion rust-components/pm-bot-forwarding-handler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ async fn main() -> anyhow::Result<()> {
settings::Settings::read_from_default_file().expect("Failed to read settings");
observability::tracing::init_tracer(service_name, settings);

std::panic::set_hook(Box::new(|panic_info| {
log::error!("Panic occurred: {:?}", panic_info);
}));

let amqp_conn = new_amqp_connection(settings).await;
let db = database::init_conn(settings.database.as_ref().unwrap()).await?;
let redis_client = redis::client::new_client(settings);
Expand All @@ -51,8 +55,8 @@ async fn main() -> anyhow::Result<()> {
let run_bot = run(bot, listener, redis_storage, db, settings.clone());
let run_web_server = HttpServer::new(move || {
App::new()
.wrap(actix_web::middleware::Logger::default())
.wrap(RequestTracing::default())
.wrap(actix_web::middleware::Logger::default())
.app_data(actix_web::web::Data::new(forwarding_bot_service.clone()))
.app_data(actix_web::web::Data::new(
forwarding_message_service.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use reqwest::Url;
use sea_orm::prelude::*;
use sea_orm::ActiveValue;
use teloxide::prelude::*;
use tracing::span;

use pegasus_common::database::entities;
use pegasus_common::settings::Settings;
Expand Down Expand Up @@ -130,7 +131,16 @@ impl IForwardingBotService for ForwardingBotService {
..Default::default()
}
.insert(&self.db)
.await?;
.await
.map_err(|err| {
log::error!("Error creating bot record: {}", err);
err
})?;

self.initialize_bot(bot.id).await.map_err(|err| {
log::error!("Error initializing bot: {}", err);
err
})?;

Ok(bot)
}
Expand All @@ -146,8 +156,6 @@ impl IForwardingBotService for ForwardingBotService {
.await?
.ok_or_else(|| anyhow::anyhow!("Bot not found"))?;

self.initialize_bot(bot.id).await?;

Ok(bot)
}

Expand All @@ -168,14 +176,27 @@ impl IForwardingBotService for ForwardingBotService {
.await?
.ok_or_else(|| anyhow::anyhow!("Bot record not found"))?;

let client = Bot::new(&bot.bot_token);
client
.log_out()
.await
.map_err(|err| {
log::error!("Error logging out bot: {}", err);
})
.ok();

let client = self.new_bot_client(&bot.bot_token)?;
client
.set_webhook(Url::parse(&format!(
"http://pm-bot-forwarding-handler:8080/webhook/{}",
bot.bot_token
))?)
.secret_token(&bot.bot_webhook_secret)
.await?;
.await
.map_err(|err| {
log::error!("Error setting webhook: {}", err);
err
})?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use std::borrow::Cow;

use opentelemetry::trace::{Status, TraceContextExt, Tracer};
use opentelemetry::Context;
use reqwest::Url;
use sea_orm::prelude::*;
use sea_orm::ActiveValue;
use teloxide::prelude::*;
use teloxide::types::{MessageId, UpdateKind};
use teloxide::types::{InputFile, MediaKind, MessageId, MessageKind, UpdateKind};

use pegasus_common::database::entities;
use pegasus_common::settings::Settings;
Expand Down Expand Up @@ -62,35 +58,40 @@ impl IForwardingMessageService for ForwardingMessageService {

let chat = update
.chat()
.ok_or_else(|| anyhow::anyhow!("Missing chat"))?
.to_owned();
.ok_or_else(|| anyhow::anyhow!("Missing chat"))?;

let message_id = match &update.kind {
UpdateKind::Message(m) => m.id,
_ => return Ok(()), // ignore non-message updates
_ => {
// ignore non-message updates
log::debug!("Ignoring non-message update from chat {}", &chat.id.0);
return Ok(());
}
};

let client = self.new_bot_client(&bot.bot_token)?;

let r = if chat.id.0 == bot.target_chat_id {
self.handle_target_chat_message(bot, update).await
log::debug!("Handling message reply from target chat {}", &chat.id.0);
self.handle_target_chat_message(bot, update.clone()).await
} else if chat.is_private() {
self.handle_forward_message(bot, update).await
log::debug!("Handling message from chat {}", &chat.id.0);
self.handle_forward_message(bot, update.clone()).await
} else {
log::debug!("Ignoring message from chat {}", chat.id.0);
log::debug!("Ignoring message from chat {}", &chat.id.0);
return Ok(());
};

match r {
Err(err) => {
log::error!("Error handling message: {}, chat_id: {}", err, chat.id.0);
log::error!("Error handling message: {}, chat_id: {}", err, &chat.id.0);
client
.send_message(chat.id, format!("Error handling message: {}", err))
.reply_to_message_id(message_id)
.await?;
}
Ok(_) => {
log::debug!("Message handled successfully, chat_id: {}", chat.id.0);
log::debug!("Message handled successfully, chat_id: {}", &chat.id.0);
client
.send_message(chat.id, "Message sent")
.reply_to_message_id(message_id)
Expand All @@ -114,7 +115,13 @@ impl ForwardingMessageService {
.ok_or_else(|| anyhow::anyhow!("Missing chat"))?
.clone();

let bot = self.new_bot_client(&bot_info.bot_token)?;
tracing::debug!("Creating bot client");

let bot = self
.new_bot_client(&bot_info.bot_token)
.map_err(|err| anyhow::anyhow!("Error creating bot client: {}", err))?;

tracing::debug!("Matching update kind & media kind");

match update.kind {
UpdateKind::Message(ref message) => {
Expand All @@ -131,29 +138,82 @@ impl ForwardingMessageService {
return Ok(());
}

// forward message to target chat
let msg_forward = bot
.send_message(
ChatId(bot_info.target_chat_id),
combine_forwarding_content(
&message
.from()
.map(|from| from.first_name.as_str())
.unwrap_or("Unknown"),
chat.id.0,
message.id.0,
message.text().unwrap_or("Empty Message"),
),
)
.parse_mode(teloxide::types::ParseMode::Html)
.await?;
let meta = forwarding_meta(
&message
.from()
.map(|from| {
format!(
"{}{}",
from.first_name,
if from.last_name.is_some() {
format!(" {}", from.last_name.as_ref().unwrap())
} else {
"".into()
}
)
})
.unwrap_or("Unknown".into()),
chat.id.0,
message.id.0,
);

let msg_forward_id = match &message.kind {
MessageKind::Common(common_message) => match &common_message.media_kind {
MediaKind::Text(media) => {
bot.send_message(
ChatId(bot_info.target_chat_id),
format!("{}\n\n{}", meta, media.text),
)
.parse_mode(teloxide::types::ParseMode::Html)
.await?
.id
}
_ => {
let file = match &common_message.media_kind {
MediaKind::Animation(m) => Some(&m.animation.file),
MediaKind::Audio(m) => Some(&m.audio.file),
MediaKind::Contact(_) => None,
MediaKind::Document(m) => Some(&m.document.file),
MediaKind::Game(_) => None,
MediaKind::Venue(_) => None,
MediaKind::Location(_) => None,
MediaKind::Photo(m) => Some(&m.photo.last().unwrap().file),
MediaKind::Poll(_) => None,
MediaKind::Sticker(m) => Some(&m.sticker.file),
MediaKind::Text(_) => {
unreachable!("Text message already handled")
}
MediaKind::Video(m) => Some(&m.video.file),
MediaKind::VideoNote(_) => None,
MediaKind::Voice(m) => Some(&m.voice.file),
MediaKind::Migration(_) => None,
}
.ok_or_else(|| anyhow::anyhow!("Unsupported media kind"))?;

bot.send_document(
ChatId(bot_info.target_chat_id),
InputFile::file_id(&file.id),
)
.await?
.id
}
},
_ => {
return Err(anyhow::anyhow!("Unsupported message kind"));
}
};

tracing::debug!(
name = "Storing message to database",
message_id = msg_forward_id.0,
);

// store message to database
entities::pm_forwarding_message::ActiveModel {
bot_id: ActiveValue::Set(bot_info.id),
telegram_chat_id: ActiveValue::Set(chat.id.0),
telegram_message_id: ActiveValue::Set(message.id.0),
forward_telegram_message_id: ActiveValue::Set(msg_forward.id.0),
forward_telegram_message_id: ActiveValue::Set(msg_forward_id.0),
..Default::default()
}
.save(&self.db)
Expand All @@ -173,11 +233,12 @@ impl ForwardingMessageService {
bot_info: entities::pm_forwarding_bot::Model,
update: Update,
) -> anyhow::Result<()> {
let cx = Context::current();

match update.kind {
UpdateKind::Message(ref message) => {
let bot = Bot::new(bot_info.bot_token);
let bot = self
.new_bot_client(&bot_info.bot_token)
.map_err(|err| anyhow::anyhow!("Error creating bot client: {}", err))?;

let reply_id = message
.reply_to_message()
.map(|msg| msg.id.0)
Expand All @@ -196,12 +257,9 @@ impl ForwardingMessageService {
// reply to original message
bot.send_message(
ChatId(message_entity.telegram_chat_id),
message.text().ok_or_else(|| {
cx.span().set_status(Status::Error {
description: Cow::from("Missing message text"),
});
anyhow::anyhow!("Missing message text")
})?,
message
.text()
.ok_or_else(|| anyhow::anyhow!("Missing message text"))?,
)
.reply_to_message_id(MessageId(message_entity.telegram_message_id))
.await?;
Expand All @@ -215,16 +273,14 @@ impl ForwardingMessageService {
}
}

fn combine_forwarding_content(from: &str, chat_id: i64, message_id: i32, text: &str) -> String {
fn forwarding_meta(from: &str, chat_id: i64, message_id: i32) -> String {
format!(
r#"
From: {}
Chat ID: <a href="tg://user?id={}">{}</a>
Message ID: <code>{}</code>
{}
"#,
from, chat_id, chat_id, message_id, text
from, chat_id, chat_id, message_id
)
.lines()
.map(|line| line.trim_start())
Expand All @@ -233,22 +289,3 @@ fn combine_forwarding_content(from: &str, chat_id: i64, message_id: i32, text: &
.trim()
.to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_combine_forwarding_content() {
let from = "test";
let chat_id = 123;
let message_id = 456;
let text = "test message";

let content = combine_forwarding_content(from, chat_id, message_id, text);
assert_eq!(
content,
"From: test\nChat ID: <a href=\"tg://user?id=123\">123</a>\nMessage ID: <code>456</code>\n\ntest message"
);
}
}

0 comments on commit 27db76d

Please sign in to comment.