use crate::{ env::EnvConfig, gitea::{GiteaAPI, WebhookType}, open_router::OpenRouterClient, }; use serde::Deserialize; use std::{collections::HashSet, sync::Arc, time::Duration}; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument}; #[derive(Deserialize, Debug)] pub struct ReviewResult { pub reviews: Vec, pub comment: String, pub cost: Option, } #[derive(Deserialize, Debug)] pub struct ReviewItem { pub filename: String, pub line: Option, pub code: String, pub message: String, } #[derive(Clone)] pub struct Bot { config: EnvConfig, gitea_api: GiteaAPI, open_router_client: OpenRouterClient, http_client: reqwest::Client, max_concurrent: usize, actions_handled: Arc>>, } impl Bot { pub fn new(config: EnvConfig) -> anyhow::Result { let gitea_timeout = config.gitea_timeout; let open_router_timeout = config.open_router_timeout; Ok(Self { gitea_api: GiteaAPI::new(&config.gitea_url, &config.gitea_token, gitea_timeout)?, open_router_client: OpenRouterClient::new( &config.open_router_api_key, &config.open_router_model, open_router_timeout, )?, max_concurrent: config.bot_max_concurrent, config, actions_handled: Arc::new(Mutex::new(HashSet::new())), http_client: reqwest::Client::builder() .timeout(Duration::from_secs(gitea_timeout)) .build()?, }) } pub async fn start( &self, mut rx: tokio::sync::mpsc::Receiver, shutdown: CancellationToken, ) -> anyhow::Result<()> { info!("Bot started"); let sem = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent)); let mut tasks = tokio::task::JoinSet::new(); loop { let wb = tokio::select! { biased; _ = shutdown.cancelled() => break, msg = rx.recv() => match msg { Some(wb) => wb, None => break, }, }; while let Some(res) = tasks.try_join_next() { if let Err(e) = res { error!("Task panicked: {e}"); } } info!(queued = rx.len(), active = tasks.len(), "Webhook received"); let permit = sem.clone().acquire_owned().await?; let self_clone = self.clone(); tasks.spawn(async move { self_clone.exec(wb).await; drop(permit); }); } tasks.join_all().await; info!("Bot shutting down complete"); Ok(()) } #[instrument(skip(self))] pub async fn exec(&self, webhook: WebhookType) { tracing::Span::current().record("webhook_type", tracing::field::debug(&webhook)); let exec_result = match webhook { WebhookType::Review(review_payload) => crate::bot_actions::review::exec_review( &self.gitea_api, &self.open_router_client, &self.http_client, &self.config.open_router_model, review_payload, ), } .await; match exec_result { Ok(_) => info!("Task completed"), Err(err) => { error!(%err, "Task error"); sentry_anyhow::capture_anyhow(&err); } } } pub async fn check_and_mark(&self, event_id: u64) -> bool { let mut action_handled_lock = self.actions_handled.lock().await; !action_handled_lock.insert(event_id) } pub async fn unmark(&self, event_id: u64) { let mut action_handled_lock = self.actions_handled.lock().await; action_handled_lock.remove(&event_id); } } #[cfg(test)] mod tests { use super::*; fn make_actions_handled() -> Arc>> { Arc::new(Mutex::new(HashSet::new())) } async fn check_and_mark(actions_handled: &Arc>>, event_id: u64) -> bool { let mut lock = actions_handled.lock().await; !lock.insert(event_id) } async fn unmark(actions_handled: &Arc>>, event_id: u64) { let mut lock = actions_handled.lock().await; lock.remove(&event_id); } #[tokio::test] async fn test_check_and_mark_first_call_returns_false() { let actions_handled = make_actions_handled(); assert!(!check_and_mark(&actions_handled, 1).await); } #[tokio::test] async fn test_check_and_mark_second_call_returns_true() { let actions_handled = make_actions_handled(); check_and_mark(&actions_handled, 1).await; assert!(check_and_mark(&actions_handled, 1).await); } #[tokio::test] async fn test_check_and_mark_different_ids_are_independent() { let actions_handled = make_actions_handled(); check_and_mark(&actions_handled, 1).await; assert!(!check_and_mark(&actions_handled, 2).await); } #[tokio::test] async fn test_unmark_allows_reprocessing() { let actions_handled = make_actions_handled(); check_and_mark(&actions_handled, 1).await; unmark(&actions_handled, 1).await; assert!(!check_and_mark(&actions_handled, 1).await); } #[tokio::test] async fn test_unmark_nonexistent_id_is_noop() { let actions_handled = make_actions_handled(); unmark(&actions_handled, 99).await; assert!(!check_and_mark(&actions_handled, 99).await); } #[tokio::test] async fn test_check_and_mark_concurrent_same_id() { let actions_handled = make_actions_handled(); let actions_handled2 = Arc::clone(&actions_handled); let t1 = tokio::spawn(async move { check_and_mark(&actions_handled, 42).await }); let t2 = tokio::spawn(async move { check_and_mark(&actions_handled2, 42).await }); let (r1, r2) = tokio::join!(t1, t2); let results = [r1.unwrap(), r2.unwrap()]; // exactement un seul des deux doit retourner false (non traité) assert_eq!(results.iter().filter(|&&r| !r).count(), 1); // l'autre doit retourner true (déjà traité) assert_eq!(results.iter().filter(|&&r| r).count(), 1); } }