diff --git a/src/config.rs b/src/config.rs index 4440108..52745ca 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,20 +6,20 @@ use serde::Deserialize; #[derive(Deserialize, Clone)] pub struct MemoryStorageConfig { pub max_size: usize, - pub ttl: usize, + pub ttl: u64, } #[derive(Deserialize, Clone)] pub struct DiskStorageConfig { pub path: PathBuf, - pub ttl: usize, + pub ttl: u64, pub max_size: usize, } #[derive(Deserialize, Clone)] pub struct MixedStorageConfig { pub path: PathBuf, - pub ttl: usize, + pub ttl: u64, pub max_size: usize, } @@ -52,6 +52,7 @@ pub enum StorageStrategyConfig { #[derive(Deserialize)] pub struct StorageConfig { pub strategy: StorageStrategyConfig, + pub name: Option, #[serde(with = "serde_regex")] pub regex: regex::Regex, diff --git a/src/main.rs b/src/main.rs index e18df9c..1dbdc80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,19 +23,38 @@ async fn main() -> anyhow::Result<()> { storage_pool.init().await?; let app_state = AppState::new(storage_pool, secret_key); + let routes = match app_state.secret_key { + Some(_) => get(routes::handle_secure), + None => get(routes::handle_unsecure), + }; + + let app_state_th = Arc::new(Mutex::new(app_state)); + + tokio::spawn(launch_storages_tick(app_state_th.clone())); let app = Router::new() - .route( - "/*src", - match app_state.secret_key { - Some(_) => get(routes::handle_secure), - None => get(routes::handle_unsecure), - }, - ) - .with_state(Arc::new(Mutex::new(app_state))); + .route("/*src", routes) + .with_state(app_state_th); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; axum::serve(listener, app).await?; Ok(()) } + +async fn launch_storages_tick(app_state: Arc>) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let mut app_state_locked = app_state.lock().await; + let results = app_state_locked.storage_pool.tick().await; + let errs = results + .into_iter() + .filter(anyhow::Result::is_err) + .map(anyhow::Result::unwrap_err); + + for err in errs { + println!("error while tick {}", err); + } + } +} diff --git a/src/storages/disk.rs b/src/storages/disk.rs index ffa0cfd..4994ccb 100644 --- a/src/storages/disk.rs +++ b/src/storages/disk.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, pin::Pin}; +use std::{collections::HashMap, path::PathBuf, pin::Pin, time::SystemTime}; use crate::config::DiskStorageConfig; use async_trait::async_trait; @@ -9,13 +9,31 @@ use tokio_util::{bytes::Bytes, io::ReaderStream}; use super::Storage; +struct DiskStorageItem { + path: PathBuf, + created_at: SystemTime, +} + +impl DiskStorageItem { + fn new(path: PathBuf, created_at: SystemTime) -> Self { + Self { + path: path, + created_at: created_at, + } + } +} + pub struct DiskStorage { config: DiskStorageConfig, + items: HashMap, } impl DiskStorage { pub fn new(config: DiskStorageConfig) -> Self { - Self { config } + Self { + config, + items: HashMap::new(), + } } pub async fn retrieve_all( @@ -26,16 +44,10 @@ impl DiskStorage { Pin> + Send>>, )>, > { - let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?; let mut files = Vec::new(); - while let Some(file) = saved_files.next_entry().await? { - let key = file.file_name().to_string_lossy().to_string(); - println!("{}", key); - - let stream = self.retrieve(key.clone()).await.unwrap(); - - files.push((key, stream)); + for key in self.items.keys() { + files.push((key.clone(), self.retrieve(key.clone()).await.unwrap())); } Ok(files) @@ -47,6 +59,34 @@ impl Storage for DiskStorage { async fn init(&mut self) -> anyhow::Result<()> { tokio::fs::create_dir_all(self.config.path.clone()).await?; + let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?; + while let Some(file) = saved_files.next_entry().await? { + let key = file.file_name().to_string_lossy().to_string(); + let path = file.path(); + let metadata = file.metadata().await?; + + self.items + .insert(key, DiskStorageItem::new(path, metadata.created()?)); + } + + Ok(()) + } + + async fn tick(&mut self) -> anyhow::Result<()> { + let paths_deleted = self + .items + .iter() + .filter(|(_, item)| item.created_at.elapsed().unwrap().as_secs() >= self.config.ttl) + .map(|(_, item)| item.path.clone()) + .collect::>(); + + self.items + .retain(|_, item| item.created_at.elapsed().unwrap().as_secs() < self.config.ttl); + + for path in paths_deleted { + tokio::fs::remove_file(path).await?; + } + Ok(()) } diff --git a/src/storages/memory.rs b/src/storages/memory.rs index 02d00d0..493867e 100644 --- a/src/storages/memory.rs +++ b/src/storages/memory.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, pin::Pin}; +use std::{ + collections::HashMap, + pin::Pin, + time::{Duration, SystemTime}, +}; use anyhow::Ok; use async_trait::async_trait; @@ -9,9 +13,23 @@ use crate::config::MemoryStorageConfig; use super::Storage; +struct MemoryStorageItem { + data: Vec, + created_at: SystemTime, +} + +impl MemoryStorageItem { + fn new() -> Self { + Self { + data: Vec::new(), + created_at: SystemTime::now(), + } + } +} + pub struct MemoryStorage { config: MemoryStorageConfig, - items: HashMap>, + items: HashMap, } impl MemoryStorage { @@ -29,15 +47,23 @@ impl Storage for MemoryStorage { Ok(()) } + async fn tick(&mut self) -> anyhow::Result<()> { + self.items + .retain(|_, item| item.created_at.elapsed().unwrap().as_secs() < self.config.ttl); + + Ok(()) + } + async fn eligible(&self, _src: String) -> bool { true } async fn delete(&mut self, key: String) -> anyhow::Result<()> { - self.items - .remove(&key) - .ok_or_else(|| anyhow::anyhow!("")) - .map(|_| ()) + if self.items.get(&key).is_some() { + self.items.remove(&key); + } + + Ok(()) } async fn retrieve( @@ -46,7 +72,7 @@ impl Storage for MemoryStorage { ) -> Option> + Send>>> { match self.items.get(&key) { Some(item) => { - let bytes = tokio_util::bytes::Bytes::from(item.clone()); + let bytes = tokio_util::bytes::Bytes::from(item.data.clone()); let stream = futures::stream::iter([Ok(bytes)]); Some(Box::pin(stream)) @@ -61,13 +87,14 @@ impl Storage for MemoryStorage { mut stream: Pin> + Send>>, ) -> anyhow::Result<()> { if !self.items.contains_key(&key) { - self.items.insert(key.clone(), Vec::new()); + self.items.insert(key.clone(), MemoryStorageItem::new()); } while let Some(chunk) = stream.next().await { self.items .get_mut(&key) .unwrap() + .data .append(&mut chunk?.to_vec()); } diff --git a/src/storages/mixed.rs b/src/storages/mixed.rs index f0c55d4..70454aa 100644 --- a/src/storages/mixed.rs +++ b/src/storages/mixed.rs @@ -16,7 +16,7 @@ pub struct MixedStorage { impl MixedStorage { pub fn new(config: MixedStorageConfig) -> Self { Self { - disk: DiskStorage::new(config.clone().into()), + disk: DiskStorage::new(config.to_owned().into()), memory: MemoryStorage::new(config.into()), } } @@ -36,6 +36,13 @@ impl Storage for MixedStorage { Ok(()) } + async fn tick(&mut self) -> anyhow::Result<()> { + self.disk.tick().await?; + self.memory.tick().await?; + + Ok(()) + } + async fn eligible(&self, _src: String) -> bool { true } diff --git a/src/storages/mod.rs b/src/storages/mod.rs index 9a0e7c2..1e8f008 100644 --- a/src/storages/mod.rs +++ b/src/storages/mod.rs @@ -19,6 +19,7 @@ use self::{disk::DiskStorage, memory::MemoryStorage, mixed::MixedStorage}; pub trait Storage { async fn init(&mut self) -> anyhow::Result<()>; async fn eligible(&self, src: String) -> bool; + async fn tick(&mut self) -> anyhow::Result<()>; async fn delete(&mut self, key: String) -> anyhow::Result<()>; async fn retrieve( &self, @@ -50,6 +51,16 @@ impl StoragePool { Ok(()) } + pub async fn tick(&mut self) -> Vec> { + let mut results = vec![]; + + for item in &mut self.storages { + results.push(item.tick().await); + } + + results + } + pub async fn retrieve( &self, src: String,