diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..89343aa --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +target/ +.env +.devcontainer/ +docs/ diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..97d2632 --- /dev/null +++ b/.env.example @@ -0,0 +1,18 @@ +HTTP_PORT=3000 +BOT_NAME=Herald + +WEBHOOK_SIG_HEADER_SECRET= + +OPEN_ROUTER_API_KEY= +OPEN_ROUTER_MODEL=deepseek/deepseek-v4-flash +OPEN_ROUTER_TIMEOUT=120 + +BOT_MAX_CONCURRENT=5 + +GITEA_URL=https://gitea.example.com +GITEA_TOKEN= +GITEA_TIMEOUT=30 + +# Optional +SENTRY_DSN= +RUST_LOG=info diff --git a/.woodpecker/release.yml b/.woodpecker/release.yml new file mode 100644 index 0000000..812b4d8 --- /dev/null +++ b/.woodpecker/release.yml @@ -0,0 +1,19 @@ +when: + event: + - tag + +steps: + - name: release-docker + image: quay.io/buildah/stable + privileged: true + volumes: + - /data/woodpecker-builds:/data + commands: + - echo $DOCKER_PASSWORD | buildah login docker.io -u $DOCKER_USERNAME --password-stdin + - chmod +x scripts/build.sh + - bash scripts/build.sh + environment: + DOCKER_USERNAME: + from_secret: docker_username + DOCKER_PASSWORD: + from_secret: docker_password diff --git a/.woodpecker/tests.yml b/.woodpecker/tests.yml new file mode 100644 index 0000000..b151ff1 --- /dev/null +++ b/.woodpecker/tests.yml @@ -0,0 +1,15 @@ +when: + event: + - push + +steps: + - name: clippy + image: rust:1.96 + commands: + - rustup component add clippy + - cargo clippy + + - name: test + image: rust:1.96 + commands: + - cargo test diff --git a/.zed/settings.json b/.zed/settings.json new file mode 100644 index 0000000..d569920 --- /dev/null +++ b/.zed/settings.json @@ -0,0 +1,11 @@ +{ + "lsp": { + "rust-analyzer": { + "initialization_options": { + "check": { + "command": "clippy" + } + } + } + } +} diff --git a/Cargo.lock b/Cargo.lock index 402dbf2..2647c1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,7 +786,7 @@ checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" [[package]] name = "herald" -version = "0.1.0" +version = "1.0.0" dependencies = [ "anyhow", "axum", diff --git a/Cargo.toml b/Cargo.toml index f5025f1..29abeab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "herald" -version = "0.1.0" +version = "1.0.0" edition = "2024" [dependencies] diff --git a/Containerfile b/Containerfile new file mode 100644 index 0000000..6508b0c --- /dev/null +++ b/Containerfile @@ -0,0 +1,12 @@ +FROM rust:1.96 as builder + +WORKDIR /app +COPY . . +RUN cargo build --release + + +FROM debian:trixie-slim + +WORKDIR /app +COPY --from=builder /app/target/release/herald . +CMD [ "./herald" ] diff --git a/README.md b/README.md new file mode 100644 index 0000000..93d73d8 --- /dev/null +++ b/README.md @@ -0,0 +1,53 @@ +# Herald + +Herald is a Gitea bot that performs automated AI-powered code reviews on pull requests using [OpenRouter](https://openrouter.ai/). + +## Features + +- Listens for Gitea webhook events and triggers code reviews on pull request comments +- Streams reviews back to Gitea as PR comments +- Concurrent review processing with configurable parallelism +- Graceful shutdown — in-progress reviews finish before the process exits +- Error tracking via Sentry +- Tiny memory footprint (~4MB) thanks to Rust + +## Installation + +**Requirements:** Rust toolchain ([rustup.rs](https://rustup.rs)) + +```sh +cargo build --release +./target/release/herald +``` + +Herald reads its configuration from environment variables (a `.env` file is supported): + +| Variable | Description | +|---|---| +| `HTTP_PORT` | Port to listen on | +| `BOT_NAME` | The bot's Gitea username (used to detect mentions) | +| `WEBHOOK_SIG_HEADER_SECRET` | Gitea webhook secret for signature verification | +| `OPEN_ROUTER_API_KEY` | OpenRouter API key | +| `OPEN_ROUTER_MODEL` | Model to use (e.g. `deepseek/deepseek-v4-flash`) | +| `OPEN_ROUTER_TIMEOUT` | OpenRouter request timeout in seconds | +| `BOT_MAX_CONCURRENT` | Maximum number of concurrent reviews | +| `GITEA_URL` | Base URL of your Gitea instance | +| `GITEA_TOKEN` | Gitea API token | +| `GITEA_TIMEOUT` | Gitea API request timeout in seconds | +| `SENTRY_DSN` | *(optional)* Sentry DSN for error tracking | +| `RUST_LOG` | *(optional)* Log level, defaults to `info` | + +## Development + +The easiest way to get started is with the provided [Dev Container](https://containers.dev/) (VS Code or Zed with the dev container extension). + +Open the project and reopen it in the container — the Rust toolchain is pre-installed. + +**Without Dev Container**, you just need a Rust toolchain: + +```sh +rustup toolchain install stable +cargo run +``` + +Copy `.env.example` to `.env` and fill in your values before running. diff --git a/ROADMAP.md b/ROADMAP.md new file mode 100644 index 0000000..e69de29 diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..a34d59f --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -euo pipefail + +IMAGE="tintounn/herald" + +if [ -z "${CI_COMMIT_TAG:-}" ]; then + echo "Error: CI_COMMIT_TAG is not set" >&2 + exit 1 +fi + +TAG="${CI_COMMIT_TAG}" + +echo "Building ${IMAGE}:${TAG}..." +buildah build \ + --file Containerfile \ + --tag "docker.io/${IMAGE}:${TAG}" \ + . + +echo "Pushing ${IMAGE}:${TAG}..." +buildah push "${IMAGE}:${TAG}" + +echo "Done: ${IMAGE}:${TAG}" diff --git a/src/api.rs b/src/api.rs index 2958b1f..36b13d0 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,4 @@ -use anyhow::anyhow; -use axum::body::{Body, Bytes, to_bytes}; +use axum::body::{Bytes, to_bytes}; use axum::extract::{FromRef, FromRequest, State}; use axum::http::Request; use axum::response::IntoResponse; @@ -15,12 +14,14 @@ use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use tracing::{info, instrument}; +use tokio_util::sync::CancellationToken; + use crate::consts::{GITEA_EVENT_TYPE_HEADER_NAME, GITEA_SIG_HEADER_NAME, MAX_WEBHOOK_BODY_SIZE}; use crate::errors::AppError; use crate::gitea::WebhookType; use crate::state::AppState; -pub async fn start(app_state: AppState) -> anyhow::Result<()> { +pub async fn start(app_state: AppState, shutdown: CancellationToken) -> anyhow::Result<()> { let http_port = app_state.config.http_port; let app = Router::new() @@ -38,8 +39,12 @@ pub async fn start(app_state: AppState) -> anyhow::Result<()> { info!("Listening API on port {}", http_port); axum::serve(listener, app) + .with_graceful_shutdown(async move { shutdown.cancelled().await }) .await .map_err(anyhow::Error::from) + .inspect(|_| info!("API shutting down complete"))?; + + Ok(()) } async fn root() -> &'static str { @@ -53,10 +58,15 @@ async fn webhook( ) -> Result { tracing::Span::current().record("webhook_type", tracing::field::debug(&wb)); - app_state - .bot_tx - .try_send(wb) - .map_err(|_| AppError::ChannelFullErr)?; + let event_id = wb.event_id(); + if app_state.bot.check_and_mark(event_id).await { + return Err(AppError::AlreadyProcessedErr); + } + + if app_state.bot_tx.try_send(wb).is_err() { + app_state.bot.unmark(event_id).await; + return Err(AppError::ChannelFullErr); + } Ok((StatusCode::CREATED, "Task started")) } diff --git a/src/bot.rs b/src/bot.rs index c237c17..929bb16 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -4,7 +4,9 @@ use crate::{ open_router::OpenRouterClient, }; use serde::Deserialize; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument}; #[derive(Deserialize, Debug)] @@ -29,6 +31,7 @@ pub struct Bot { open_router_client: OpenRouterClient, http_client: reqwest::Client, max_concurrent: usize, + actions_handled: Arc>>, } impl Bot { @@ -45,6 +48,7 @@ impl Bot { )?, max_concurrent: config.bot_max_concurrent, config, + actions_handled: Arc::new(Mutex::new(HashSet::new())), http_client: reqwest::Client::builder() .timeout(Duration::from_secs(gitea_timeout)) .build()?, @@ -54,14 +58,23 @@ impl Bot { pub async fn start( &self, mut rx: tokio::sync::mpsc::Receiver, + shutdown: CancellationToken, ) -> anyhow::Result<()> { info!("Bot started"); let sem = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent)); let mut tasks = tokio::task::JoinSet::new(); - while let Some(wb) = rx.recv().await { - // Drain completed tasks to avoid the JoinSet growing unbounded + loop { + let wb = tokio::select! { + biased; + _ = shutdown.cancelled() => break, + msg = rx.recv() => match msg { + Some(wb) => wb, + None => break, + }, + }; + while let Some(res) = tasks.try_join_next() { if let Err(e) = res { error!("Task panicked: {e}"); @@ -78,11 +91,9 @@ impl Bot { }); } - // When all webhook tasks have completed, we can safely exit - // properly before returning tasks.join_all().await; - info!("Bot shutting down, channel closed"); + info!("Bot shutting down complete"); Ok(()) } @@ -108,4 +119,87 @@ impl Bot { } } } + + pub async fn check_and_mark(&self, event_id: u64) -> bool { + let mut action_handled_lock = self.actions_handled.lock().await; + + !action_handled_lock.insert(event_id) + } + + pub async fn unmark(&self, event_id: u64) { + let mut action_handled_lock = self.actions_handled.lock().await; + + action_handled_lock.remove(&event_id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_actions_handled() -> Arc>> { + Arc::new(Mutex::new(HashSet::new())) + } + + async fn check_and_mark(actions_handled: &Arc>>, event_id: u64) -> bool { + let mut lock = actions_handled.lock().await; + !lock.insert(event_id) + } + + async fn unmark(actions_handled: &Arc>>, event_id: u64) { + let mut lock = actions_handled.lock().await; + lock.remove(&event_id); + } + + #[tokio::test] + async fn test_check_and_mark_first_call_returns_false() { + let actions_handled = make_actions_handled(); + assert!(!check_and_mark(&actions_handled, 1).await); + } + + #[tokio::test] + async fn test_check_and_mark_second_call_returns_true() { + let actions_handled = make_actions_handled(); + check_and_mark(&actions_handled, 1).await; + assert!(check_and_mark(&actions_handled, 1).await); + } + + #[tokio::test] + async fn test_check_and_mark_different_ids_are_independent() { + let actions_handled = make_actions_handled(); + check_and_mark(&actions_handled, 1).await; + assert!(!check_and_mark(&actions_handled, 2).await); + } + + #[tokio::test] + async fn test_unmark_allows_reprocessing() { + let actions_handled = make_actions_handled(); + check_and_mark(&actions_handled, 1).await; + unmark(&actions_handled, 1).await; + assert!(!check_and_mark(&actions_handled, 1).await); + } + + #[tokio::test] + async fn test_unmark_nonexistent_id_is_noop() { + let actions_handled = make_actions_handled(); + unmark(&actions_handled, 99).await; + assert!(!check_and_mark(&actions_handled, 99).await); + } + + #[tokio::test] + async fn test_check_and_mark_concurrent_same_id() { + let actions_handled = make_actions_handled(); + let actions_handled2 = Arc::clone(&actions_handled); + + let t1 = tokio::spawn(async move { check_and_mark(&actions_handled, 42).await }); + let t2 = tokio::spawn(async move { check_and_mark(&actions_handled2, 42).await }); + + let (r1, r2) = tokio::join!(t1, t2); + let results = [r1.unwrap(), r2.unwrap()]; + + // exactement un seul des deux doit retourner false (non traité) + assert_eq!(results.iter().filter(|&&r| !r).count(), 1); + // l'autre doit retourner true (déjà traité) + assert_eq!(results.iter().filter(|&&r| r).count(), 1); + } } diff --git a/src/bot_actions/review.rs b/src/bot_actions/review.rs index 83df3fa..88a1d9f 100644 --- a/src/bot_actions/review.rs +++ b/src/bot_actions/review.rs @@ -35,7 +35,7 @@ pub async fn exec_review( let bot_result: Result = async { let git_diff = - download_git_diff(&http_client, &review_payload.pull_request.diff_url).await?; + download_git_diff(http_client, &review_payload.pull_request.diff_url).await?; let diff_for_llm = format_diff_for_review(&git_diff); diff --git a/src/env.rs b/src/env.rs index 514adfd..1d3e862 100644 --- a/src/env.rs +++ b/src/env.rs @@ -49,3 +49,34 @@ pub fn try_get_env(key: &str) -> anyhow::Result { Ok(env) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_try_get_env_returns_value() { + unsafe { std::env::set_var("TEST_ENV_PRESENT", "hello") }; + assert_eq!(try_get_env("TEST_ENV_PRESENT").unwrap(), "hello"); + } + + #[test] + fn test_try_get_env_missing_var_returns_error() { + unsafe { std::env::remove_var("TEST_ENV_MISSING") }; + assert!(try_get_env("TEST_ENV_MISSING").is_err()); + } + + #[test] + fn test_try_get_env_empty_value_returns_error() { + unsafe { std::env::set_var("TEST_ENV_EMPTY", "") }; + let err = try_get_env("TEST_ENV_EMPTY").unwrap_err(); + assert!(err.to_string().contains("TEST_ENV_EMPTY")); + } + + #[test] + fn test_try_get_env_whitespace_only_returns_error() { + unsafe { std::env::set_var("TEST_ENV_WHITESPACE", " ") }; + let err = try_get_env("TEST_ENV_WHITESPACE").unwrap_err(); + assert!(err.to_string().contains("TEST_ENV_WHITESPACE")); + } +} diff --git a/src/errors.rs b/src/errors.rs index 65c9830..589854b 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -25,6 +25,9 @@ pub enum AppError { #[error("Channel full")] ChannelFullErr, + #[error("Already processed")] + AlreadyProcessedErr, + #[error(transparent)] BadJsonStructErr(#[from] serde_json::Error), @@ -58,6 +61,7 @@ impl IntoResponse for AppError { StatusCode::UNAUTHORIZED, "WebHook sig header is invalid".to_string(), ), + AppError::AlreadyProcessedErr => (StatusCode::OK, "Already processed".to_string()), AppError::ChannelFullErr => { sentry_anyhow::capture_anyhow(&anyhow!("Max concurrent tasks reached")); ( diff --git a/src/gitea.rs b/src/gitea.rs index 49e4ced..34fb852 100644 --- a/src/gitea.rs +++ b/src/gitea.rs @@ -158,6 +158,14 @@ pub enum WebhookType { Review(ReviewPayload), } +impl WebhookType { + pub fn event_id(&self) -> u64 { + match self { + WebhookType::Review(payload) => payload.comment.id, + } + } +} + #[derive(Deserialize, Debug)] pub struct ReviewPayload { pub action: String, @@ -229,7 +237,12 @@ mod tests { "action": "created", "pull_request": { "id": 42, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 7, @@ -288,7 +301,12 @@ mod tests { "action": "edited", "pull_request": { "id": 1, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 1, @@ -314,7 +332,12 @@ mod tests { "action": "created", "pull_request": { "id": 99, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 5, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 12, @@ -346,7 +369,12 @@ mod tests { "action": "created", "pull_request": { "id": 1, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 1, @@ -367,7 +395,12 @@ mod tests { "action": "created", "pull_request": { "id": 1, - "diff_url": "https://mydiff.fr" + "diff_url": "https://mydiff.fr", + "number": 1, + "title": "My PR" + }, + "repository": { + "full_name": "owner/repo" }, "comment": { "id": 1, diff --git a/src/main.rs b/src/main.rs index 68687ca..52d1ed6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,10 @@ +use std::sync::Arc; + use crate::{bot::Bot, gitea::WebhookType, state::AppState}; + use dotenvy::dotenv; +use tokio::signal::unix::{SignalKind, signal}; +use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt}; @@ -54,11 +59,36 @@ async fn run() -> anyhow::Result<()> { "Starting Herald" ); + let shutdown = CancellationToken::new(); + let bot = Bot::new(config.clone())?; let (tx, rx) = tokio::sync::mpsc::channel::(config.bot_max_concurrent * 2); - let app_state = AppState { bot_tx: tx, config }; + let app_state = AppState { + bot_tx: tx, + bot: bot.clone(), + config, + }; - tokio::try_join!(bot.start(rx), api::start(app_state))?; + let signal = async { + let mut sigterm = signal(SignalKind::terminate())?; + let mut sigint = signal(SignalKind::interrupt())?; + tokio::select! { + _ = sigterm.recv() => info!("Received SIGTERM"), + _ = sigint.recv() => info!("Received SIGINT"), + } + + info!("Shutting down..."); + shutdown.cancel(); + anyhow::Ok(()) + }; + + tokio::try_join!( + bot.start(rx, shutdown.clone()), + api::start(app_state, shutdown.clone()), + signal + )?; + + info!("Shutdown complete"); Ok(()) } diff --git a/src/open_router.rs b/src/open_router.rs index 94bfe84..4320511 100644 --- a/src/open_router.rs +++ b/src/open_router.rs @@ -42,7 +42,7 @@ impl OpenRouterClient { Ok(ChatResult { message: response.choices[0] .content() - .map(|msg| String::from(msg)) + .map(String::from) .ok_or(anyhow::anyhow!("No content"))?, cost: response.usage.and_then(|u| u.cost), }) diff --git a/src/state.rs b/src/state.rs index 4b9cc02..7ef7b47 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,8 @@ -use crate::{env::EnvConfig, gitea::WebhookType}; +use crate::{bot::Bot, env::EnvConfig, gitea::WebhookType}; #[derive(Clone)] pub struct AppState { pub bot_tx: tokio::sync::mpsc::Sender, + pub bot: Bot, pub config: EnvConfig, }