From a2d898c07d8129f9f2200482b2912c8778a925ff Mon Sep 17 00:00:00 2001 From: qpismont Date: Wed, 10 Jun 2026 17:50:24 +0000 Subject: [PATCH] Fix sentry http request info Add async bot running with semaphore --- src/api.rs | 27 +++++++++++++++++++++------ src/bot.rs | 24 ++++++++++++++++++++++-- src/env.rs | 3 +++ src/errors.rs | 14 ++++++++++++-- src/gitea.rs | 1 + src/main.rs | 13 ++++++------- src/open_router.rs | 1 + 7 files changed, 66 insertions(+), 17 deletions(-) diff --git a/src/api.rs b/src/api.rs index 3a2eb42..5179232 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use axum::body::{Body, Bytes, to_bytes}; use axum::extract::{FromRef, FromRequest, State}; use axum::http::Request; @@ -6,10 +7,11 @@ use axum::routing::{get, post}; use axum::{Json, Router}; use hmac::{Hmac, KeyInit, Mac}; use reqwest::StatusCode; -use sentry::integrations::tower::NewSentryLayer; +use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer}; use serde_json::Value; use sha2::Sha256; use subtle::ConstantTimeEq; +use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use tracing::{info, instrument}; @@ -22,10 +24,14 @@ pub async fn start(app_state: AppState) -> anyhow::Result<()> { let http_port = app_state.config.http_port; let app = Router::new() - .layer(NewSentryLayer::>::new_from_top()) - .layer(TraceLayer::new_for_http()) .route("/", get(root)) .route("/webhook", post(webhook)) + .layer( + ServiceBuilder::new() + .layer(NewSentryLayer::>::new_from_top()) + .layer(SentryHttpLayer::new()) + .layer(TraceLayer::new_for_http()), + ) .with_state(app_state); let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", http_port)).await?; @@ -49,9 +55,8 @@ async fn webhook( app_state .bot_tx - .send(wb) - .await - .map_err(anyhow::Error::from)?; + .try_send(wb) + .map_err(|_| AppError::ChannelFullErr)?; Ok((StatusCode::CREATED, "Task started")) } @@ -74,6 +79,16 @@ where let type_header = extract_header(GITEA_EVENT_TYPE_HEADER_NAME, headers)?; let body_bytes = read_body(req.into_body()).await?; + let body_str = String::from_utf8_lossy(&body_bytes).into_owned(); + sentry::configure_scope(|scope| { + scope.add_event_processor(move |mut event| { + let mut request = event.request.take().unwrap_or_default(); + request.data = Some(body_str.clone()); + event.request = Some(request); + Some(event) + }); + }); + verify_signature( app_state.config.webhook_secret.as_bytes(), &sig_header, diff --git a/src/bot.rs b/src/bot.rs index c5d4a7e..b3cf700 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -4,7 +4,7 @@ use crate::{ open_router::OpenRouterClient, }; use serde::Deserialize; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tracing::{error, info, instrument}; #[derive(Deserialize, Debug)] @@ -22,11 +22,13 @@ pub struct ReviewItem { pub message: String, } +#[derive(Clone)] pub struct Bot { config: EnvConfig, gitea_api: GiteaAPI, open_router_client: OpenRouterClient, http_client: reqwest::Client, + max_concurrent: usize, } impl Bot { @@ -41,6 +43,7 @@ impl Bot { &config.open_router_model, open_router_timeout, )?, + max_concurrent: config.bot_max_concurrent, config, http_client: reqwest::Client::builder() .timeout(Duration::from_secs(gitea_timeout)) @@ -54,8 +57,25 @@ impl Bot { ) -> anyhow::Result<()> { info!("Bot started"); + let sem = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent)); + let mut tasks = tokio::task::JoinSet::new(); + while let Some(wb) = rx.recv().await { - self.exec(wb).await; + let permit = sem.clone().acquire_owned().await?; + let self_clone = self.clone(); + + tasks.spawn(async move { + self_clone.exec(wb).await; + drop(permit); + }); + } + + // Le channel est fermé : on attend que les tâches en cours se terminent + // proprement avant de rendre la main + while let Some(res) = tasks.join_next().await { + if let Err(e) = res { + error!("Task panicked: {e}"); + } } info!("Bot shutting down, channel closed"); diff --git a/src/env.rs b/src/env.rs index 79ce619..514adfd 100644 --- a/src/env.rs +++ b/src/env.rs @@ -8,6 +8,7 @@ pub struct EnvConfig { pub open_router_model: String, pub open_router_timeout: u64, pub bot_name: String, + pub bot_max_concurrent: usize, pub gitea_url: String, pub gitea_token: String, pub gitea_timeout: u64, @@ -20,6 +21,7 @@ pub fn load_config() -> anyhow::Result { let open_router_api_key = try_get_env("OPEN_ROUTER_API_KEY")?; let open_router_model = try_get_env("OPEN_ROUTER_MODEL")?; let open_router_timeout = try_get_env("OPEN_ROUTER_TIMEOUT")?.parse()?; + let bot_max_concurrent = try_get_env("BOT_MAX_CONCURRENT")?.parse()?; let gitea_url = try_get_env("GITEA_URL")?; let gitea_token = try_get_env("GITEA_TOKEN")?; let gitea_timeout = try_get_env("GITEA_TIMEOUT")?.parse()?; @@ -31,6 +33,7 @@ pub fn load_config() -> anyhow::Result { open_router_api_key, open_router_model, open_router_timeout, + bot_max_concurrent, gitea_url, gitea_token, gitea_timeout, diff --git a/src/errors.rs b/src/errors.rs index d4425b5..65c9830 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,6 @@ +use anyhow::anyhow; use axum::response::IntoResponse; use reqwest::StatusCode; -use tracing::error; #[derive(thiserror::Error, Debug)] pub enum AppError { @@ -22,6 +22,9 @@ pub enum AppError { #[error("WebHook have bad action")] InvalidActionErr, + #[error("Channel full")] + ChannelFullErr, + #[error(transparent)] BadJsonStructErr(#[from] serde_json::Error), @@ -55,8 +58,15 @@ impl IntoResponse for AppError { StatusCode::UNAUTHORIZED, "WebHook sig header is invalid".to_string(), ), + AppError::ChannelFullErr => { + sentry_anyhow::capture_anyhow(&anyhow!("Max concurrent tasks reached")); + ( + StatusCode::SERVICE_UNAVAILABLE, + "Max concurrent tasks reached".to_string(), + ) + } AppError::Other(err) => { - error!(%err, "Internal server error"); + sentry_anyhow::capture_anyhow(&err); ( StatusCode::INTERNAL_SERVER_ERROR, "Internal server error".to_string(), diff --git a/src/gitea.rs b/src/gitea.rs index e137e0a..49e4ced 100644 --- a/src/gitea.rs +++ b/src/gitea.rs @@ -6,6 +6,7 @@ use tracing::instrument; use crate::{bot::ReviewResult, errors::AppError}; +#[derive(Clone)] pub struct GiteaAPI { base_url: String, client: reqwest::Client, diff --git a/src/main.rs b/src/main.rs index 50b1f07..68687ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,20 +24,21 @@ fn main() -> anyhow::Result<()> { ) .init(); - if let Ok(sentry_dsn) = env::try_get_env("SENTRY_DSN") { + let _sentry_guard = if let Ok(sentry_dsn) = env::try_get_env("SENTRY_DSN") { info!("Initialize sentry"); - let _guard = sentry::init(( + Some(sentry::init(( sentry_dsn, sentry::ClientOptions { release: sentry::release_name!(), send_default_pii: true, ..Default::default() }, - )); + ))) } else { warn!("SENTRY_DSN not set, sentry will not be initialized"); - } + None + }; tokio::runtime::Runtime::new()?.block_on(run()) } @@ -54,9 +55,7 @@ async fn run() -> anyhow::Result<()> { ); let bot = Bot::new(config.clone())?; - - let (tx, rx) = tokio::sync::mpsc::channel::(1); - + let (tx, rx) = tokio::sync::mpsc::channel::(config.bot_max_concurrent * 2); let app_state = AppState { bot_tx: tx, config }; tokio::try_join!(bot.start(rx), api::start(app_state))?; diff --git a/src/open_router.rs b/src/open_router.rs index 1a3585a..94bfe84 100644 --- a/src/open_router.rs +++ b/src/open_router.rs @@ -8,6 +8,7 @@ pub struct ChatResult { pub cost: Option, } +#[derive(Clone)] pub struct OpenRouterClient { client: openrouter_rs::OpenRouterClient, model: String,