Compare commits

..

4 commits
tick ... main

Author SHA1 Message Date
qpismont
3d00a5c888 adding cache header 2024-07-16 19:14:14 +02:00
qpismont
2f5d8764b6 adding dockerfile 2024-07-14 22:36:40 +02:00
qpismont
d1bee14213 add tick function for memory storage 2024-07-14 22:22:10 +02:00
qpismont
319a407ae3 revive :) 2024-07-14 21:45:58 +02:00
12 changed files with 548 additions and 561 deletions

875
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,21 +7,21 @@ edition = "2021"
[dependencies] [dependencies]
futures = "0.3" futures = "0.3"
tokio = { version = "1.35", features = ["full", "tracing"] } tokio = { version = "1.38", features = ["full", "tracing"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-util = "0.7" tokio-util = "0.7"
axum = { version = "0.7" } axum = { version = "0.7" }
reqwest = { version = "0.11", features = ["stream"] } reqwest = { version = "0.12", features = ["stream"] }
anyhow = "1.0" anyhow = "1.0"
sentry = "0.32" sentry = "0.34"
dotenvy = "0.15" dotenvy = "0.15"
axum-macros = "0.4" axum-macros = "0.4"
magic-crypt = "3.1" magic-crypt = "3.1"
async-trait = "0.1" async-trait = "0.1"
serde = "1.0" serde = "1.0"
serde_yaml = "0.9" serde_yaml = "0.9"
base64 = "0.21" base64 = "0.22"
regex = "1.10" regex = "1.10"
serde_regex = "1.1" serde_regex = "1.1"

19
Dockerfile Normal file
View file

@ -0,0 +1,19 @@
FROM rust:1.79 AS builder
WORKDIR /app
COPY src/ src/
COPY Cargo.lock .
COPY Cargo.toml .
RUN cargo build --release
FROM rust:1.79 AS runner
WORKDIR /app
COPY --from=builder /app/target/release/imgproxy-rs .
RUN chmod +x imgproxy-rs
CMD [ "./imgproxy-rs" ]

View file

@ -6,20 +6,20 @@ use serde::Deserialize;
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone)]
pub struct MemoryStorageConfig { pub struct MemoryStorageConfig {
pub max_size: usize, pub max_size: usize,
pub ttl: u64, pub ttl: usize,
} }
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone)]
pub struct DiskStorageConfig { pub struct DiskStorageConfig {
pub path: PathBuf, pub path: PathBuf,
pub ttl: u64, pub ttl: usize,
pub max_size: usize, pub max_size: usize,
} }
#[derive(Deserialize, Clone)] #[derive(Deserialize, Clone)]
pub struct MixedStorageConfig { pub struct MixedStorageConfig {
pub path: PathBuf, pub path: PathBuf,
pub ttl: u64, pub ttl: usize,
pub max_size: usize, pub max_size: usize,
} }
@ -52,7 +52,6 @@ pub enum StorageStrategyConfig {
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct StorageConfig { pub struct StorageConfig {
pub strategy: StorageStrategyConfig, pub strategy: StorageStrategyConfig,
pub name: Option<String>,
#[serde(with = "serde_regex")] #[serde(with = "serde_regex")]
pub regex: regex::Regex, pub regex: regex::Regex,

View file

@ -1,13 +1,13 @@
use std::io::Write; use std::io::Write;
use base64::engine::general_purpose; use base64::engine::general_purpose;
use magic_crypt::{new_magic_crypt, MagicCryptTrait}; use magic_crypt::{MagicCrypt256, MagicCryptTrait};
pub fn decryt<I>(value: I, secret_key: String) -> anyhow::Result<String> pub fn decryt<I>(value: I, secret_key: String) -> anyhow::Result<String>
where where
I: AsRef<str>, I: AsRef<str>,
{ {
let mc = new_magic_crypt!(&secret_key, 256); let mc = MagicCrypt256::new(&secret_key, None::<&str>);
Ok(mc.decrypt_base64_to_string(value)?) Ok(mc.decrypt_base64_to_string(value)?)
} }

View file

@ -1,5 +1,4 @@
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use tokio::sync::Mutex;
use std::sync::Arc; use std::sync::Arc;
@ -23,38 +22,19 @@ async fn main() -> anyhow::Result<()> {
storage_pool.init().await?; storage_pool.init().await?;
let app_state = AppState::new(storage_pool, secret_key); let app_state = AppState::new(storage_pool, secret_key);
let routes = match app_state.secret_key {
Some(_) => get(routes::handle_secure),
None => get(routes::handle_unsecure),
};
let app_state_th = Arc::new(Mutex::new(app_state));
tokio::spawn(launch_storages_tick(app_state_th.clone()));
let app = Router::new() let app = Router::new()
.route("/*src", routes) .route(
.with_state(app_state_th); "/*src",
match app_state.secret_key {
Some(_) => get(routes::handle_secure),
None => get(routes::handle_unsecure),
},
)
.with_state(Arc::new(app_state));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; let listener = tokio::net::TcpListener::bind("0.0.0.0:3100").await?;
axum::serve(listener, app).await?; axum::serve(listener, app).await?;
Ok(()) Ok(())
} }
async fn launch_storages_tick(app_state: Arc<Mutex<AppState>>) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut app_state_locked = app_state.lock().await;
let results = app_state_locked.storage_pool.tick().await;
let errs = results
.into_iter()
.filter(anyhow::Result::is_err)
.map(anyhow::Result::unwrap_err);
for err in errs {
println!("error while tick {}", err);
}
}
}

View file

@ -6,6 +6,7 @@ use axum::{
response::IntoResponse, response::IntoResponse,
}; };
use futures::TryStreamExt; use futures::TryStreamExt;
use reqwest::header;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_stream::Stream; use tokio_stream::Stream;
use tokio_util::bytes::Bytes; use tokio_util::bytes::Bytes;
@ -14,41 +15,36 @@ use crate::{crypt, state::AppState};
pub async fn handle_secure( pub async fn handle_secure(
Path(data): Path<String>, Path(data): Path<String>,
state: State<Arc<Mutex<AppState>>>, state: State<Arc<AppState>>,
) -> Result<impl IntoResponse, MyAnyhow> { ) -> Result<impl IntoResponse, MyAnyhow> {
let app_state = state.lock().await; let decrypt = crypt::decryt(data, state.secret_key.clone().unwrap())?;
let decrypt = crypt::decryt(data, app_state.secret_key.clone().unwrap())?;
std::mem::drop(app_state);
let bytes = handle(decrypt, state).await?; let bytes = handle(decrypt, state).await?;
Ok(axum::body::Body::from_stream(bytes)) Ok(([(header::CACHE_CONTROL, "public, max-age=31919000")], axum::body::Body::from_stream(bytes)))
} }
pub async fn handle_unsecure( pub async fn handle_unsecure(
Path(src): Path<String>, Path(src): Path<String>,
state: State<Arc<Mutex<AppState>>>, state: State<Arc<AppState>>,
) -> Result<impl IntoResponse, MyAnyhow> { ) -> Result<impl IntoResponse, MyAnyhow> {
let bytes = handle(src, state).await?; let bytes = handle(src, state).await?;
Ok(axum::body::Body::from_stream(bytes)) Ok(([(header::CACHE_CONTROL, "public, max-age=31919000")], axum::body::Body::from_stream(bytes)))
} }
async fn handle( async fn handle(
src: String, src: String,
state: State<Arc<Mutex<AppState>>>, state: State<Arc<AppState>>,
) -> anyhow::Result<impl Stream<Item = Result<Bytes, anyhow::Error>> + Send> { ) -> anyhow::Result<impl Stream<Item = Result<Bytes, anyhow::Error>> + Send> {
let mut app_state = state.lock().await; let mut storage_pool = state.storage_pool.lock().await;
match app_state.storage_pool.retrieve(src.clone()).await { match storage_pool.retrieve(src.clone()).await {
Some(stream) => Ok(stream), Some(stream) => Ok(stream),
None => { None => {
let save_stream = fetch(src.clone()).await?; let save_stream = fetch(src.clone()).await?;
app_state storage_pool.save(src.clone(), save_stream).await?;
.storage_pool
.save(src.clone(), save_stream)
.await?;
Ok(app_state.storage_pool.retrieve(src).await.unwrap()) Ok(storage_pool.retrieve(src).await.unwrap())
} }
} }
} }

View file

@ -1,15 +1,17 @@
use tokio::sync::Mutex;
use crate::storages::StoragePool; use crate::storages::StoragePool;
pub struct AppState { pub struct AppState {
pub secret_key: Option<String>, pub secret_key: Option<String>,
pub storage_pool: StoragePool, pub storage_pool: Mutex<StoragePool>,
} }
impl AppState { impl AppState {
pub fn new(storage_pool: StoragePool, secret_key: Option<String>) -> Self { pub fn new(storage_pool: StoragePool, secret_key: Option<String>) -> Self {
Self { Self {
secret_key, secret_key,
storage_pool, storage_pool: Mutex::new(storage_pool),
} }
} }
} }

View file

@ -1,4 +1,4 @@
use std::{collections::HashMap, path::PathBuf, pin::Pin, time::SystemTime}; use std::{path::PathBuf, pin::Pin};
use crate::config::DiskStorageConfig; use crate::config::DiskStorageConfig;
use async_trait::async_trait; use async_trait::async_trait;
@ -9,31 +9,13 @@ use tokio_util::{bytes::Bytes, io::ReaderStream};
use super::Storage; use super::Storage;
struct DiskStorageItem {
path: PathBuf,
created_at: SystemTime,
}
impl DiskStorageItem {
fn new(path: PathBuf, created_at: SystemTime) -> Self {
Self {
path: path,
created_at: created_at,
}
}
}
pub struct DiskStorage { pub struct DiskStorage {
config: DiskStorageConfig, config: DiskStorageConfig,
items: HashMap<String, DiskStorageItem>,
} }
impl DiskStorage { impl DiskStorage {
pub fn new(config: DiskStorageConfig) -> Self { pub fn new(config: DiskStorageConfig) -> Self {
Self { Self { config }
config,
items: HashMap::new(),
}
} }
pub async fn retrieve_all( pub async fn retrieve_all(
@ -44,10 +26,16 @@ impl DiskStorage {
Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>, Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
)>, )>,
> { > {
let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?;
let mut files = Vec::new(); let mut files = Vec::new();
for key in self.items.keys() { while let Some(file) = saved_files.next_entry().await? {
files.push((key.clone(), self.retrieve(key.clone()).await.unwrap())); let key = file.file_name().to_string_lossy().to_string();
println!("{}", key);
let stream = self.retrieve(key.clone()).await.unwrap();
files.push((key, stream));
} }
Ok(files) Ok(files)
@ -59,34 +47,10 @@ impl Storage for DiskStorage {
async fn init(&mut self) -> anyhow::Result<()> { async fn init(&mut self) -> anyhow::Result<()> {
tokio::fs::create_dir_all(self.config.path.clone()).await?; tokio::fs::create_dir_all(self.config.path.clone()).await?;
let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?;
while let Some(file) = saved_files.next_entry().await? {
let key = file.file_name().to_string_lossy().to_string();
let path = file.path();
let metadata = file.metadata().await?;
self.items
.insert(key, DiskStorageItem::new(path, metadata.created()?));
}
Ok(()) Ok(())
} }
async fn tick(&mut self) -> anyhow::Result<()> { async fn tick(&mut self) -> anyhow::Result<()> {
let paths_deleted = self
.items
.iter()
.filter(|(_, item)| item.created_at.elapsed().unwrap().as_secs() >= self.config.ttl)
.map(|(_, item)| item.path.clone())
.collect::<Vec<PathBuf>>();
self.items
.retain(|_, item| item.created_at.elapsed().unwrap().as_secs() < self.config.ttl);
for path in paths_deleted {
tokio::fs::remove_file(path).await?;
}
Ok(()) Ok(())
} }

View file

@ -1,11 +1,8 @@
use std::{
collections::HashMap,
pin::Pin,
time::{Duration, SystemTime},
};
use anyhow::Ok; use anyhow::Ok;
use async_trait::async_trait; use async_trait::async_trait;
use std::time::{Duration, SystemTime};
use std::{collections::HashMap, pin::Pin};
use tokio::time::Instant;
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
use tokio_util::bytes::Bytes; use tokio_util::bytes::Bytes;
@ -15,16 +12,7 @@ use super::Storage;
struct MemoryStorageItem { struct MemoryStorageItem {
data: Vec<u8>, data: Vec<u8>,
created_at: SystemTime, added_at: Instant,
}
impl MemoryStorageItem {
fn new() -> Self {
Self {
data: Vec::new(),
created_at: SystemTime::now(),
}
}
} }
pub struct MemoryStorage { pub struct MemoryStorage {
@ -47,25 +35,25 @@ impl Storage for MemoryStorage {
Ok(()) Ok(())
} }
async fn tick(&mut self) -> anyhow::Result<()> {
self.items
.retain(|_, item| item.created_at.elapsed().unwrap().as_secs() < self.config.ttl);
Ok(())
}
async fn eligible(&self, _src: String) -> bool { async fn eligible(&self, _src: String) -> bool {
true true
} }
async fn delete(&mut self, key: String) -> anyhow::Result<()> { async fn tick(&mut self) -> anyhow::Result<()> {
if self.items.get(&key).is_some() { let ttl = self.config.ttl;
self.items.remove(&key); self.items
} .retain(|_, elt| elt.added_at.elapsed().as_secs() < ttl as u64);
Ok(()) Ok(())
} }
async fn delete(&mut self, key: String) -> anyhow::Result<()> {
self.items
.remove(&key)
.ok_or_else(|| anyhow::anyhow!(""))
.map(|_| ())
}
async fn retrieve( async fn retrieve(
&self, &self,
key: String, key: String,
@ -87,9 +75,17 @@ impl Storage for MemoryStorage {
mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>, mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if !self.items.contains_key(&key) { if !self.items.contains_key(&key) {
self.items.insert(key.clone(), MemoryStorageItem::new()); self.items.insert(
key.clone(),
MemoryStorageItem {
data: Vec::new(),
added_at: Instant::now(),
},
);
} }
println!("save {}", key);
while let Some(chunk) = stream.next().await { while let Some(chunk) = stream.next().await {
self.items self.items
.get_mut(&key) .get_mut(&key)

View file

@ -16,7 +16,7 @@ pub struct MixedStorage {
impl MixedStorage { impl MixedStorage {
pub fn new(config: MixedStorageConfig) -> Self { pub fn new(config: MixedStorageConfig) -> Self {
Self { Self {
disk: DiskStorage::new(config.to_owned().into()), disk: DiskStorage::new(config.clone().into()),
memory: MemoryStorage::new(config.into()), memory: MemoryStorage::new(config.into()),
} }
} }
@ -37,9 +37,6 @@ impl Storage for MixedStorage {
} }
async fn tick(&mut self) -> anyhow::Result<()> { async fn tick(&mut self) -> anyhow::Result<()> {
self.disk.tick().await?;
self.memory.tick().await?;
Ok(()) Ok(())
} }

View file

@ -19,8 +19,8 @@ use self::{disk::DiskStorage, memory::MemoryStorage, mixed::MixedStorage};
pub trait Storage { pub trait Storage {
async fn init(&mut self) -> anyhow::Result<()>; async fn init(&mut self) -> anyhow::Result<()>;
async fn eligible(&self, src: String) -> bool; async fn eligible(&self, src: String) -> bool;
async fn tick(&mut self) -> anyhow::Result<()>;
async fn delete(&mut self, key: String) -> anyhow::Result<()>; async fn delete(&mut self, key: String) -> anyhow::Result<()>;
async fn tick(&mut self) -> anyhow::Result<()>;
async fn retrieve( async fn retrieve(
&self, &self,
key: String, key: String,
@ -45,22 +45,13 @@ impl StoragePool {
pub async fn init(&mut self) -> anyhow::Result<()> { pub async fn init(&mut self) -> anyhow::Result<()> {
for item in &mut self.storages { for item in &mut self.storages {
println!("init storage");
item.init().await?; item.init().await?;
} }
Ok(()) Ok(())
} }
pub async fn tick(&mut self) -> Vec<anyhow::Result<()>> {
let mut results = vec![];
for item in &mut self.storages {
results.push(item.tick().await);
}
results
}
pub async fn retrieve( pub async fn retrieve(
&self, &self,
src: String, src: String,