use axum::body::{Bytes, to_bytes}; use axum::extract::{FromRef, FromRequest, State}; use axum::http::Request; use axum::response::IntoResponse; use axum::routing::{get, post}; use axum::{Json, Router}; use hmac::{Hmac, KeyInit, Mac}; use reqwest::StatusCode; use sentry::integrations::tower::{NewSentryLayer, SentryHttpLayer}; use serde_json::Value; use sha2::Sha256; use subtle::ConstantTimeEq; use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use tracing::{info, instrument}; use tokio_util::sync::CancellationToken; use crate::consts::{GITEA_EVENT_TYPE_HEADER_NAME, GITEA_SIG_HEADER_NAME, MAX_WEBHOOK_BODY_SIZE}; use crate::errors::AppError; use crate::gitea::WebhookType; use crate::state::AppState; pub async fn start(app_state: AppState, shutdown: CancellationToken) -> anyhow::Result<()> { let http_port = app_state.config.http_port; let app = Router::new() .route("/", get(root)) .route("/webhook", post(webhook)) .layer( ServiceBuilder::new() .layer(NewSentryLayer::>::new_from_top()) .layer(SentryHttpLayer::new()) .layer(TraceLayer::new_for_http()), ) .with_state(app_state); let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", http_port)).await?; info!("Listening API on port {}", http_port); axum::serve(listener, app) .with_graceful_shutdown(async move { shutdown.cancelled().await }) .await .map_err(anyhow::Error::from) .inspect(|_| info!("API shutting down complete"))?; Ok(()) } async fn root() -> &'static str { "Hi, i'm Herald :)" } #[instrument(skip(app_state), fields(webhook_type), err)] async fn webhook( State(app_state): State, WebhookExtract(wb): WebhookExtract, ) -> Result { tracing::Span::current().record("webhook_type", tracing::field::debug(&wb)); let event_id = wb.event_id(); if app_state.bot.check_and_mark(event_id).await { return Err(AppError::AlreadyProcessedErr); } if app_state.bot_tx.try_send(wb).is_err() { app_state.bot.unmark(event_id).await; return Err(AppError::ChannelFullErr); } Ok((StatusCode::CREATED, "Task started")) } pub struct WebhookExtract(pub WebhookType); impl FromRequest for WebhookExtract where AppState: FromRef, S: Send + Sync, { type Rejection = AppError; #[instrument(skip(req, state), err)] async fn from_request(req: axum::extract::Request, state: &S) -> Result { let app_state = AppState::from_ref(state); let headers = req.headers(); let sig_header = extract_header(GITEA_SIG_HEADER_NAME, headers)?; let type_header = extract_header(GITEA_EVENT_TYPE_HEADER_NAME, headers)?; let body_bytes = read_body(req.into_body()).await?; verify_signature( app_state.config.webhook_secret.as_bytes(), &sig_header, &body_bytes, )?; let body_str = String::from_utf8_lossy(&body_bytes).into_owned(); sentry::configure_scope(|scope| { scope.add_event_processor(move |mut event| { let mut request = event.request.take().unwrap_or_default(); request.data = Some(body_str.clone()); event.request = Some(request); Some(event) }); }); let webhook = parse_webhook(&type_header, &app_state.config.bot_name, &body_bytes)?; Ok(WebhookExtract(webhook)) } } fn extract_header(key: &str, headers: &axum::http::HeaderMap) -> Result { let value = headers .get(key) .ok_or(AppError::WebHookMissingHeaderErr(key.into()))? .to_str() .map_err(anyhow::Error::from)?; Ok(value.to_owned()) } async fn read_body(body: axum::body::Body) -> Result { to_bytes(body, MAX_WEBHOOK_BODY_SIZE) .await .map_err(anyhow::Error::from) .map_err(AppError::from) } fn parse_webhook(header: &str, bot_name: &str, body_bytes: &[u8]) -> Result { let Json(value) = Json::::from_bytes(body_bytes).map_err(|_| AppError::MalformedJsonErr)?; WebhookType::from_event(header, bot_name, value) } fn verify_signature(secret_key: &[u8], sig_header: &str, body: &[u8]) -> Result<(), AppError> { let sig_header_decoded = hex::decode(sig_header).map_err(|_| AppError::WebHookSigHeaderInvalidErr)?; let mut mac = Hmac::::new_from_slice(secret_key).map_err(anyhow::Error::from)?; mac.update(body); let generated_hmac = mac.finalize().into_bytes(); bool::from(generated_hmac.ct_eq(&sig_header_decoded)) .then_some(()) .ok_or(AppError::WebHookSigHeaderInvalidErr) }