Starting impl Sentry and tracing #3
+21
-6
@@ -1,3 +1,4 @@
|
|||||||
|
use anyhow::anyhow;
|
||||||
use axum::body::{Body, Bytes, to_bytes};
|
use axum::body::{Body, Bytes, to_bytes};
|
||||||
|
|
|||||||
use axum::extract::{FromRef, FromRequest, State};
|
use axum::extract::{FromRef, FromRequest, State};
|
||||||
use axum::http::Request;
|
use axum::http::Request;
|
||||||
@@ -6,10 +7,11 @@ use axum::routing::{get, post};
|
|||||||
use axum::{Json, Router};
|
use axum::{Json, Router};
|
||||||
use hmac::{Hmac, KeyInit, Mac};
|
use hmac::{Hmac, KeyInit, Mac};
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use sentry::integrations::tower::NewSentryLayer;
|
use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
use subtle::ConstantTimeEq;
|
use subtle::ConstantTimeEq;
|
||||||
|
use tower::ServiceBuilder;
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
use tracing::{info, instrument};
|
use tracing::{info, instrument};
|
||||||
|
|
||||||
@@ -22,10 +24,14 @@ pub async fn start(app_state: AppState) -> anyhow::Result<()> {
|
|||||||
let http_port = app_state.config.http_port;
|
let http_port = app_state.config.http_port;
|
||||||
|
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.layer(NewSentryLayer::<Request<Body>>::new_from_top())
|
|
||||||
.layer(TraceLayer::new_for_http())
|
|
||||||
.route("/", get(root))
|
.route("/", get(root))
|
||||||
.route("/webhook", post(webhook))
|
.route("/webhook", post(webhook))
|
||||||
|
.layer(
|
||||||
|
ServiceBuilder::new()
|
||||||
|
.layer(NewSentryLayer::<Request<_>>::new_from_top())
|
||||||
|
.layer(SentryHttpLayer::new())
|
||||||
|
.layer(TraceLayer::new_for_http()),
|
||||||
|
)
|
||||||
.with_state(app_state);
|
.with_state(app_state);
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", http_port)).await?;
|
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", http_port)).await?;
|
||||||
@@ -49,9 +55,8 @@ async fn webhook(
|
|||||||
|
|
||||||
app_state
|
app_state
|
||||||
.bot_tx
|
.bot_tx
|
||||||
.send(wb)
|
.try_send(wb)
|
||||||
.await
|
.map_err(|_| AppError::ChannelFullErr)?;
|
||||||
.map_err(anyhow::Error::from)?;
|
|
||||||
|
|
||||||
Ok((StatusCode::CREATED, "Task started"))
|
Ok((StatusCode::CREATED, "Task started"))
|
||||||
}
|
}
|
||||||
@@ -74,6 +79,16 @@ where
|
|||||||
let type_header = extract_header(GITEA_EVENT_TYPE_HEADER_NAME, headers)?;
|
let type_header = extract_header(GITEA_EVENT_TYPE_HEADER_NAME, headers)?;
|
||||||
let body_bytes = read_body(req.into_body()).await?;
|
let body_bytes = read_body(req.into_body()).await?;
|
||||||
|
|
||||||
|
let body_str = String::from_utf8_lossy(&body_bytes).into_owned();
|
||||||
|
sentry::configure_scope(|scope| {
|
||||||
|
Herald
commented
Ajouter un event processor à chaque requête est problématique : les processors s'accumulent et ne sont jamais retirés, ce qui peut entraîner une fuite mémoire et une dégradation des performances. De plus, Sentry peut déjà capturer le corps de la requête si l'option Ajouter un event processor à chaque requête est problématique : les processors s'accumulent et ne sont jamais retirés, ce qui peut entraîner une fuite mémoire et une dégradation des performances. De plus, Sentry peut déjà capturer le corps de la requête si l'option `attach_request_body` est activée dans `ClientOptions` (ou via l'intégration tower). Il est préférable de configurer cela globalement plutôt que manuellement à chaque appel.
|
|||||||
|
scope.add_event_processor(move |mut event| {
|
||||||
|
let mut request = event.request.take().unwrap_or_default();
|
||||||
|
request.data = Some(body_str.clone());
|
||||||
|
event.request = Some(request);
|
||||||
|
Some(event)
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
verify_signature(
|
verify_signature(
|
||||||
app_state.config.webhook_secret.as_bytes(),
|
app_state.config.webhook_secret.as_bytes(),
|
||||||
&sig_header,
|
&sig_header,
|
||||||
|
|||||||
+22
-2
@@ -4,7 +4,7 @@ use crate::{
|
|||||||
open_router::OpenRouterClient,
|
open_router::OpenRouterClient,
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::time::Duration;
|
use std::{sync::Arc, time::Duration};
|
||||||
use tracing::{error, info, instrument};
|
use tracing::{error, info, instrument};
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
@@ -22,11 +22,13 @@ pub struct ReviewItem {
|
|||||||
pub message: String,
|
pub message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Bot {
|
pub struct Bot {
|
||||||
config: EnvConfig,
|
config: EnvConfig,
|
||||||
gitea_api: GiteaAPI,
|
gitea_api: GiteaAPI,
|
||||||
open_router_client: OpenRouterClient,
|
open_router_client: OpenRouterClient,
|
||||||
http_client: reqwest::Client,
|
http_client: reqwest::Client,
|
||||||
|
max_concurrent: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Bot {
|
impl Bot {
|
||||||
@@ -41,6 +43,7 @@ impl Bot {
|
|||||||
&config.open_router_model,
|
&config.open_router_model,
|
||||||
open_router_timeout,
|
open_router_timeout,
|
||||||
)?,
|
)?,
|
||||||
|
max_concurrent: config.bot_max_concurrent,
|
||||||
config,
|
config,
|
||||||
http_client: reqwest::Client::builder()
|
http_client: reqwest::Client::builder()
|
||||||
.timeout(Duration::from_secs(gitea_timeout))
|
.timeout(Duration::from_secs(gitea_timeout))
|
||||||
@@ -54,8 +57,25 @@ impl Bot {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
info!("Bot started");
|
info!("Bot started");
|
||||||
|
|
||||||
|
let sem = Arc::new(tokio::sync::Semaphore::new(self.max_concurrent));
|
||||||
|
let mut tasks = tokio::task::JoinSet::new();
|
||||||
|
|
||||||
while let Some(wb) = rx.recv().await {
|
while let Some(wb) = rx.recv().await {
|
||||||
self.exec(wb).await;
|
let permit = sem.clone().acquire_owned().await?;
|
||||||
|
let self_clone = self.clone();
|
||||||
|
|
||||||
|
tasks.spawn(async move {
|
||||||
|
self_clone.exec(wb).await;
|
||||||
|
drop(permit);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Le channel est fermé : on attend que les tâches en cours se terminent
|
||||||
|
qpismont marked this conversation as resolved
Herald
commented
Les commentaires sont en français, alors que le reste du code est en anglais. Pour la cohérence du projet, il est conseillé de rédiger tous les commentaires en anglais (ou dans une seule langue). Les commentaires sont en français, alors que le reste du code est en anglais. Pour la cohérence du projet, il est conseillé de rédiger tous les commentaires en anglais (ou dans une seule langue).
|
|||||||
|
// proprement avant de rendre la main
|
||||||
|
while let Some(res) = tasks.join_next().await {
|
||||||
|
if let Err(e) = res {
|
||||||
|
error!("Task panicked: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Bot shutting down, channel closed");
|
info!("Bot shutting down, channel closed");
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ pub struct EnvConfig {
|
|||||||
pub open_router_model: String,
|
pub open_router_model: String,
|
||||||
pub open_router_timeout: u64,
|
pub open_router_timeout: u64,
|
||||||
pub bot_name: String,
|
pub bot_name: String,
|
||||||
|
pub bot_max_concurrent: usize,
|
||||||
pub gitea_url: String,
|
pub gitea_url: String,
|
||||||
pub gitea_token: String,
|
pub gitea_token: String,
|
||||||
pub gitea_timeout: u64,
|
pub gitea_timeout: u64,
|
||||||
@@ -20,6 +21,7 @@ pub fn load_config() -> anyhow::Result<EnvConfig> {
|
|||||||
let open_router_api_key = try_get_env("OPEN_ROUTER_API_KEY")?;
|
let open_router_api_key = try_get_env("OPEN_ROUTER_API_KEY")?;
|
||||||
let open_router_model = try_get_env("OPEN_ROUTER_MODEL")?;
|
let open_router_model = try_get_env("OPEN_ROUTER_MODEL")?;
|
||||||
let open_router_timeout = try_get_env("OPEN_ROUTER_TIMEOUT")?.parse()?;
|
let open_router_timeout = try_get_env("OPEN_ROUTER_TIMEOUT")?.parse()?;
|
||||||
|
let bot_max_concurrent = try_get_env("BOT_MAX_CONCURRENT")?.parse()?;
|
||||||
let gitea_url = try_get_env("GITEA_URL")?;
|
let gitea_url = try_get_env("GITEA_URL")?;
|
||||||
let gitea_token = try_get_env("GITEA_TOKEN")?;
|
let gitea_token = try_get_env("GITEA_TOKEN")?;
|
||||||
let gitea_timeout = try_get_env("GITEA_TIMEOUT")?.parse()?;
|
let gitea_timeout = try_get_env("GITEA_TIMEOUT")?.parse()?;
|
||||||
@@ -31,6 +33,7 @@ pub fn load_config() -> anyhow::Result<EnvConfig> {
|
|||||||
open_router_api_key,
|
open_router_api_key,
|
||||||
open_router_model,
|
open_router_model,
|
||||||
open_router_timeout,
|
open_router_timeout,
|
||||||
|
bot_max_concurrent,
|
||||||
gitea_url,
|
gitea_url,
|
||||||
gitea_token,
|
gitea_token,
|
||||||
gitea_timeout,
|
gitea_timeout,
|
||||||
|
|||||||
+12
-2
@@ -1,6 +1,6 @@
|
|||||||
|
use anyhow::anyhow;
|
||||||
use axum::response::IntoResponse;
|
use axum::response::IntoResponse;
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum AppError {
|
pub enum AppError {
|
||||||
@@ -22,6 +22,9 @@ pub enum AppError {
|
|||||||
#[error("WebHook have bad action")]
|
#[error("WebHook have bad action")]
|
||||||
InvalidActionErr,
|
InvalidActionErr,
|
||||||
|
|
||||||
|
#[error("Channel full")]
|
||||||
|
ChannelFullErr,
|
||||||
|
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
BadJsonStructErr(#[from] serde_json::Error),
|
BadJsonStructErr(#[from] serde_json::Error),
|
||||||
|
|
||||||
@@ -55,8 +58,15 @@ impl IntoResponse for AppError {
|
|||||||
StatusCode::UNAUTHORIZED,
|
StatusCode::UNAUTHORIZED,
|
||||||
"WebHook sig header is invalid".to_string(),
|
"WebHook sig header is invalid".to_string(),
|
||||||
),
|
),
|
||||||
|
AppError::ChannelFullErr => {
|
||||||
|
sentry_anyhow::capture_anyhow(&anyhow!("Max concurrent tasks reached"));
|
||||||
|
(
|
||||||
|
StatusCode::SERVICE_UNAVAILABLE,
|
||||||
|
"Max concurrent tasks reached".to_string(),
|
||||||
|
)
|
||||||
|
}
|
||||||
AppError::Other(err) => {
|
AppError::Other(err) => {
|
||||||
error!(%err, "Internal server error");
|
sentry_anyhow::capture_anyhow(&err);
|
||||||
(
|
(
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
"Internal server error".to_string(),
|
"Internal server error".to_string(),
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use tracing::instrument;
|
|||||||
|
|
||||||
use crate::{bot::ReviewResult, errors::AppError};
|
use crate::{bot::ReviewResult, errors::AppError};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct GiteaAPI {
|
pub struct GiteaAPI {
|
||||||
base_url: String,
|
base_url: String,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
|
|||||||
+6
-7
@@ -24,20 +24,21 @@ fn main() -> anyhow::Result<()> {
|
|||||||
)
|
)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
if let Ok(sentry_dsn) = env::try_get_env("SENTRY_DSN") {
|
let _sentry_guard = if let Ok(sentry_dsn) = env::try_get_env("SENTRY_DSN") {
|
||||||
info!("Initialize sentry");
|
info!("Initialize sentry");
|
||||||
|
|
||||||
let _guard = sentry::init((
|
Some(sentry::init((
|
||||||
sentry_dsn,
|
sentry_dsn,
|
||||||
sentry::ClientOptions {
|
sentry::ClientOptions {
|
||||||
release: sentry::release_name!(),
|
release: sentry::release_name!(),
|
||||||
send_default_pii: true,
|
send_default_pii: true,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
));
|
)))
|
||||||
} else {
|
} else {
|
||||||
warn!("SENTRY_DSN not set, sentry will not be initialized");
|
warn!("SENTRY_DSN not set, sentry will not be initialized");
|
||||||
}
|
None
|
||||||
|
};
|
||||||
|
|
||||||
tokio::runtime::Runtime::new()?.block_on(run())
|
tokio::runtime::Runtime::new()?.block_on(run())
|
||||||
}
|
}
|
||||||
@@ -54,9 +55,7 @@ async fn run() -> anyhow::Result<()> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let bot = Bot::new(config.clone())?;
|
let bot = Bot::new(config.clone())?;
|
||||||
|
let (tx, rx) = tokio::sync::mpsc::channel::<WebhookType>(config.bot_max_concurrent * 2);
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel::<WebhookType>(1);
|
|
||||||
|
|
||||||
let app_state = AppState { bot_tx: tx, config };
|
let app_state = AppState { bot_tx: tx, config };
|
||||||
|
|
||||||
tokio::try_join!(bot.start(rx), api::start(app_state))?;
|
tokio::try_join!(bot.start(rx), api::start(app_state))?;
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ pub struct ChatResult {
|
|||||||
pub cost: Option<f64>,
|
pub cost: Option<f64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct OpenRouterClient {
|
pub struct OpenRouterClient {
|
||||||
client: openrouter_rs::OpenRouterClient,
|
client: openrouter_rs::OpenRouterClient,
|
||||||
model: String,
|
model: String,
|
||||||
|
|||||||
Reference in New Issue
Block a user
Le fichier importe des éléments d'Axum, alors que le projet semble utiliser Actix-Web (présent dans Cargo.lock). Cette incohérence peut causer des erreurs de compilation ou des comportements inattendus. Vérifiez si le projet est en cours de migration vers Axum ou s'il s'agit d'une erreur. Dans tous les cas, un seul framework HTTP doit être utilisé.