diff --git a/.woodpecker/release.yml b/.woodpecker/release.yml new file mode 100644 index 0000000..2fb674a --- /dev/null +++ b/.woodpecker/release.yml @@ -0,0 +1,15 @@ +when: + event: + - tag + +steps: + - name: release-docker + image: docker:dind + commands: + - docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD + - bash scripts/build.sh + environment: + DOCKER_USERNAME: + from_secret: docker_username + DOCKER_PASSWORD: + from_secret: docker_password diff --git a/.woodpecker/tests.yml b/.woodpecker/tests.yml new file mode 100644 index 0000000..c004f82 --- /dev/null +++ b/.woodpecker/tests.yml @@ -0,0 +1,15 @@ +when: + event: + - push + - pull_request + +steps: + - name: clippy + image: rust:1.96 + commands: + - cargo clippy + + - name: test + image: rust:1.96 + commands: + - cargo test diff --git a/.zed/settings.json b/.zed/settings.json new file mode 100644 index 0000000..d569920 --- /dev/null +++ b/.zed/settings.json @@ -0,0 +1,11 @@ +{ + "lsp": { + "rust-analyzer": { + "initialization_options": { + "check": { + "command": "clippy" + } + } + } + } +} diff --git a/ROADMAP.md b/ROADMAP.md new file mode 100644 index 0000000..e69de29 diff --git a/src/api.rs b/src/api.rs index 7b432b8..36b13d0 100644 --- a/src/api.rs +++ b/src/api.rs @@ -58,10 +58,15 @@ async fn webhook( ) -> Result { tracing::Span::current().record("webhook_type", tracing::field::debug(&wb)); - app_state - .bot_tx - .try_send(wb) - .map_err(|_| AppError::ChannelFullErr)?; + let event_id = wb.event_id(); + if app_state.bot.check_and_mark(event_id).await { + return Err(AppError::AlreadyProcessedErr); + } + + if app_state.bot_tx.try_send(wb).is_err() { + app_state.bot.unmark(event_id).await; + return Err(AppError::ChannelFullErr); + } Ok((StatusCode::CREATED, "Task started")) } diff --git a/src/bot.rs b/src/bot.rs index fd593bb..929bb16 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -4,7 +4,8 @@ use crate::{ open_router::OpenRouterClient, }; use serde::Deserialize; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; +use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument}; @@ -30,6 +31,7 @@ pub struct Bot { open_router_client: OpenRouterClient, http_client: reqwest::Client, max_concurrent: usize, + actions_handled: Arc>>, } impl Bot { @@ -46,6 +48,7 @@ impl Bot { )?, max_concurrent: config.bot_max_concurrent, config, + actions_handled: Arc::new(Mutex::new(HashSet::new())), http_client: reqwest::Client::builder() .timeout(Duration::from_secs(gitea_timeout)) .build()?, @@ -72,7 +75,6 @@ impl Bot { }, }; - // Drain completed tasks to avoid the JoinSet growing unbounded while let Some(res) = tasks.try_join_next() { if let Err(e) = res { error!("Task panicked: {e}"); @@ -89,8 +91,6 @@ impl Bot { }); } - // When all webhook tasks have completed, we can safely exit - // properly before returning tasks.join_all().await; info!("Bot shutting down complete"); @@ -119,4 +119,87 @@ impl Bot { } } } + + pub async fn check_and_mark(&self, event_id: u64) -> bool { + let mut action_handled_lock = self.actions_handled.lock().await; + + !action_handled_lock.insert(event_id) + } + + pub async fn unmark(&self, event_id: u64) { + let mut action_handled_lock = self.actions_handled.lock().await; + + action_handled_lock.remove(&event_id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_actions_handled() -> Arc>> { + Arc::new(Mutex::new(HashSet::new())) + } + + async fn check_and_mark(actions_handled: &Arc>>, event_id: u64) -> bool { + let mut lock = actions_handled.lock().await; + !lock.insert(event_id) + } + + async fn unmark(actions_handled: &Arc>>, event_id: u64) { + let mut lock = actions_handled.lock().await; + lock.remove(&event_id); + } + + #[tokio::test] + async fn test_check_and_mark_first_call_returns_false() { + let actions_handled = make_actions_handled(); + assert!(!check_and_mark(&actions_handled, 1).await); + } + + #[tokio::test] + async fn test_check_and_mark_second_call_returns_true() { + let actions_handled = make_actions_handled(); + check_and_mark(&actions_handled, 1).await; + assert!(check_and_mark(&actions_handled, 1).await); + } + + #[tokio::test] + async fn test_check_and_mark_different_ids_are_independent() { + let actions_handled = make_actions_handled(); + check_and_mark(&actions_handled, 1).await; + assert!(!check_and_mark(&actions_handled, 2).await); + } + + #[tokio::test] + async fn test_unmark_allows_reprocessing() { + let actions_handled = make_actions_handled(); + check_and_mark(&actions_handled, 1).await; + unmark(&actions_handled, 1).await; + assert!(!check_and_mark(&actions_handled, 1).await); + } + + #[tokio::test] + async fn test_unmark_nonexistent_id_is_noop() { + let actions_handled = make_actions_handled(); + unmark(&actions_handled, 99).await; + assert!(!check_and_mark(&actions_handled, 99).await); + } + + #[tokio::test] + async fn test_check_and_mark_concurrent_same_id() { + let actions_handled = make_actions_handled(); + let actions_handled2 = Arc::clone(&actions_handled); + + let t1 = tokio::spawn(async move { check_and_mark(&actions_handled, 42).await }); + let t2 = tokio::spawn(async move { check_and_mark(&actions_handled2, 42).await }); + + let (r1, r2) = tokio::join!(t1, t2); + let results = [r1.unwrap(), r2.unwrap()]; + + // exactement un seul des deux doit retourner false (non traité) + assert_eq!(results.iter().filter(|&&r| !r).count(), 1); + // l'autre doit retourner true (déjà traité) + assert_eq!(results.iter().filter(|&&r| r).count(), 1); + } } diff --git a/src/bot_actions/review.rs b/src/bot_actions/review.rs index 83df3fa..88a1d9f 100644 --- a/src/bot_actions/review.rs +++ b/src/bot_actions/review.rs @@ -35,7 +35,7 @@ pub async fn exec_review( let bot_result: Result = async { let git_diff = - download_git_diff(&http_client, &review_payload.pull_request.diff_url).await?; + download_git_diff(http_client, &review_payload.pull_request.diff_url).await?; let diff_for_llm = format_diff_for_review(&git_diff); diff --git a/src/env.rs b/src/env.rs index 514adfd..1d3e862 100644 --- a/src/env.rs +++ b/src/env.rs @@ -49,3 +49,34 @@ pub fn try_get_env(key: &str) -> anyhow::Result { Ok(env) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_try_get_env_returns_value() { + unsafe { std::env::set_var("TEST_ENV_PRESENT", "hello") }; + assert_eq!(try_get_env("TEST_ENV_PRESENT").unwrap(), "hello"); + } + + #[test] + fn test_try_get_env_missing_var_returns_error() { + unsafe { std::env::remove_var("TEST_ENV_MISSING") }; + assert!(try_get_env("TEST_ENV_MISSING").is_err()); + } + + #[test] + fn test_try_get_env_empty_value_returns_error() { + unsafe { std::env::set_var("TEST_ENV_EMPTY", "") }; + let err = try_get_env("TEST_ENV_EMPTY").unwrap_err(); + assert!(err.to_string().contains("TEST_ENV_EMPTY")); + } + + #[test] + fn test_try_get_env_whitespace_only_returns_error() { + unsafe { std::env::set_var("TEST_ENV_WHITESPACE", " ") }; + let err = try_get_env("TEST_ENV_WHITESPACE").unwrap_err(); + assert!(err.to_string().contains("TEST_ENV_WHITESPACE")); + } +} diff --git a/src/errors.rs b/src/errors.rs index 65c9830..589854b 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -25,6 +25,9 @@ pub enum AppError { #[error("Channel full")] ChannelFullErr, + #[error("Already processed")] + AlreadyProcessedErr, + #[error(transparent)] BadJsonStructErr(#[from] serde_json::Error), @@ -58,6 +61,7 @@ impl IntoResponse for AppError { StatusCode::UNAUTHORIZED, "WebHook sig header is invalid".to_string(), ), + AppError::AlreadyProcessedErr => (StatusCode::OK, "Already processed".to_string()), AppError::ChannelFullErr => { sentry_anyhow::capture_anyhow(&anyhow!("Max concurrent tasks reached")); ( diff --git a/src/gitea.rs b/src/gitea.rs index 49e4ced..34fb852 100644 --- a/src/gitea.rs +++ b/src/gitea.rs @@ -158,6 +158,14 @@ pub enum WebhookType { Review(ReviewPayload), } +impl WebhookType { + pub fn event_id(&self) -> u64 { + match self { + WebhookType::Review(payload) => payload.comment.id, + } + } +} + #[derive(Deserialize, Debug)] pub struct ReviewPayload { pub action: String, @@ -229,7 +237,12 @@ mod tests { "action": "created", "pull_request": { "id": 42, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 7, @@ -288,7 +301,12 @@ mod tests { "action": "edited", "pull_request": { "id": 1, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 1, @@ -314,7 +332,12 @@ mod tests { "action": "created", "pull_request": { "id": 99, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 5, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 12, @@ -346,7 +369,12 @@ mod tests { "action": "created", "pull_request": { "id": 1, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 1, @@ -367,7 +395,12 @@ mod tests { "action": "created", "pull_request": { "id": 1, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 1, diff --git a/src/main.rs b/src/main.rs index f5e0ca8..52d1ed6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{bot::Bot, gitea::WebhookType, state::AppState}; use dotenvy::dotenv; @@ -61,7 +63,11 @@ async fn run() -> anyhow::Result<()> { let bot = Bot::new(config.clone())?; let (tx, rx) = tokio::sync::mpsc::channel::(config.bot_max_concurrent * 2); - let app_state = AppState { bot_tx: tx, config }; + let app_state = AppState { + bot_tx: tx, + bot: bot.clone(), + config, + }; let signal = async { let mut sigterm = signal(SignalKind::terminate())?; diff --git a/src/open_router.rs b/src/open_router.rs index 94bfe84..4320511 100644 --- a/src/open_router.rs +++ b/src/open_router.rs @@ -42,7 +42,7 @@ impl OpenRouterClient { Ok(ChatResult { message: response.choices[0] .content() - .map(|msg| String::from(msg)) + .map(String::from) .ok_or(anyhow::anyhow!("No content"))?, cost: response.usage.and_then(|u| u.cost), }) diff --git a/src/state.rs b/src/state.rs index 4b9cc02..7ef7b47 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,8 @@ -use crate::{env::EnvConfig, gitea::WebhookType}; +use crate::{bot::Bot, env::EnvConfig, gitea::WebhookType}; #[derive(Clone)] pub struct AppState { pub bot_tx: tokio::sync::mpsc::Sender, + pub bot: Bot, pub config: EnvConfig, }