WIP: starting adding tick() for cleanup storage #1

Draft
qpismont wants to merge 2 commits from tick into main
6 changed files with 98 additions and 23 deletions
Showing only changes of commit 8bba82f26c - Show all commits

View file

@ -52,6 +52,7 @@ pub enum StorageStrategyConfig {
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct StorageConfig { pub struct StorageConfig {
pub strategy: StorageStrategyConfig, pub strategy: StorageStrategyConfig,
pub name: Option<String>,
#[serde(with = "serde_regex")] #[serde(with = "serde_regex")]
pub regex: regex::Regex, pub regex: regex::Regex,

View file

@ -23,19 +23,35 @@ async fn main() -> anyhow::Result<()> {
storage_pool.init().await?; storage_pool.init().await?;
let app_state = AppState::new(storage_pool, secret_key); let app_state = AppState::new(storage_pool, secret_key);
let routes = match app_state.secret_key {
Some(_) => get(routes::handle_secure),
None => get(routes::handle_unsecure),
};
let app_state_th = Arc::new(Mutex::new(app_state));
tokio::spawn(launch_storages_tick(app_state_th.clone()));
let app = Router::new() let app = Router::new()
.route( .route("/*src", routes)
"/*src", .with_state(app_state_th);
match app_state.secret_key {
Some(_) => get(routes::handle_secure),
None => get(routes::handle_unsecure),
},
)
.with_state(Arc::new(Mutex::new(app_state)));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?; axum::serve(listener, app).await?;
Ok(()) Ok(())
} }
async fn launch_storages_tick(app_state: Arc<Mutex<AppState>>) {
loop {
let mut app_state_locked = app_state.lock().await;
match app_state_locked.storage_pool.tick().await {
Ok(_) => todo!(),
Err(err) => println!("{}", err),
}
std::mem::drop(app_state_locked);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

View file

@ -1,4 +1,4 @@
use std::{path::PathBuf, pin::Pin}; use std::{collections::HashMap, path::PathBuf, pin::Pin, time::SystemTime};
use crate::config::DiskStorageConfig; use crate::config::DiskStorageConfig;
use async_trait::async_trait; use async_trait::async_trait;
@ -9,13 +9,31 @@ use tokio_util::{bytes::Bytes, io::ReaderStream};
use super::Storage; use super::Storage;
struct DiskStorageItem {
path: PathBuf,
created_at: SystemTime,
}
impl DiskStorageItem {
fn new(path: PathBuf, created_at: SystemTime) -> Self {
Self {
path: path,
created_at: created_at,
}
}
}
pub struct DiskStorage { pub struct DiskStorage {
config: DiskStorageConfig, config: DiskStorageConfig,
items: HashMap<String, DiskStorageItem>,
} }
impl DiskStorage { impl DiskStorage {
pub fn new(config: DiskStorageConfig) -> Self { pub fn new(config: DiskStorageConfig) -> Self {
Self { config } Self {
config,
items: HashMap::new(),
}
} }
pub async fn retrieve_all( pub async fn retrieve_all(
@ -26,16 +44,10 @@ impl DiskStorage {
Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>, Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
)>, )>,
> { > {
let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?;
let mut files = Vec::new(); let mut files = Vec::new();
while let Some(file) = saved_files.next_entry().await? { for key in self.items.keys() {
let key = file.file_name().to_string_lossy().to_string(); files.push((key.clone(), self.retrieve(key.clone()).await.unwrap()));
println!("{}", key);
let stream = self.retrieve(key.clone()).await.unwrap();
files.push((key, stream));
} }
Ok(files) Ok(files)
@ -47,6 +59,20 @@ impl Storage for DiskStorage {
async fn init(&mut self) -> anyhow::Result<()> { async fn init(&mut self) -> anyhow::Result<()> {
tokio::fs::create_dir_all(self.config.path.clone()).await?; tokio::fs::create_dir_all(self.config.path.clone()).await?;
let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?;
while let Some(file) = saved_files.next_entry().await? {
let key = file.file_name().to_string_lossy().to_string();
let path = file.path();
let metadata = file.metadata().await?;
self.items
.insert(key, DiskStorageItem::new(path, metadata.created()?));
}
Ok(())
}
async fn tick(&self) -> anyhow::Result<()> {
Ok(()) Ok(())
} }

View file

@ -1,4 +1,4 @@
use std::{collections::HashMap, pin::Pin}; use std::{collections::HashMap, pin::Pin, time::SystemTime};
use anyhow::Ok; use anyhow::Ok;
use async_trait::async_trait; use async_trait::async_trait;
@ -9,9 +9,23 @@ use crate::config::MemoryStorageConfig;
use super::Storage; use super::Storage;
struct MemoryStorageItem {
data: Vec<u8>,
created_at: SystemTime,
}
impl MemoryStorageItem {
fn new() -> Self {
Self {
data: Vec::new(),
created_at: SystemTime::now(),
}
}
}
pub struct MemoryStorage { pub struct MemoryStorage {
config: MemoryStorageConfig, config: MemoryStorageConfig,
items: HashMap<String, Vec<u8>>, items: HashMap<String, MemoryStorageItem>,
} }
impl MemoryStorage { impl MemoryStorage {
@ -29,6 +43,10 @@ impl Storage for MemoryStorage {
Ok(()) Ok(())
} }
async fn tick(&self) -> anyhow::Result<()> {
Ok(())
}
async fn eligible(&self, _src: String) -> bool { async fn eligible(&self, _src: String) -> bool {
true true
} }
@ -46,7 +64,7 @@ impl Storage for MemoryStorage {
) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> { ) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> {
match self.items.get(&key) { match self.items.get(&key) {
Some(item) => { Some(item) => {
let bytes = tokio_util::bytes::Bytes::from(item.clone()); let bytes = tokio_util::bytes::Bytes::from(item.data.clone());
let stream = futures::stream::iter([Ok(bytes)]); let stream = futures::stream::iter([Ok(bytes)]);
Some(Box::pin(stream)) Some(Box::pin(stream))
@ -61,13 +79,14 @@ impl Storage for MemoryStorage {
mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>, mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if !self.items.contains_key(&key) { if !self.items.contains_key(&key) {
self.items.insert(key.clone(), Vec::new()); self.items.insert(key.clone(), MemoryStorageItem::new());
} }
while let Some(chunk) = stream.next().await { while let Some(chunk) = stream.next().await {
self.items self.items
.get_mut(&key) .get_mut(&key)
.unwrap() .unwrap()
.data
.append(&mut chunk?.to_vec()); .append(&mut chunk?.to_vec());
} }

View file

@ -16,7 +16,7 @@ pub struct MixedStorage {
impl MixedStorage { impl MixedStorage {
pub fn new(config: MixedStorageConfig) -> Self { pub fn new(config: MixedStorageConfig) -> Self {
Self { Self {
disk: DiskStorage::new(config.clone().into()), disk: DiskStorage::new(config.to_owned().into()),
memory: MemoryStorage::new(config.into()), memory: MemoryStorage::new(config.into()),
} }
} }
@ -36,6 +36,10 @@ impl Storage for MixedStorage {
Ok(()) Ok(())
} }
async fn tick(&self) -> anyhow::Result<()> {
Ok(())
}
async fn eligible(&self, _src: String) -> bool { async fn eligible(&self, _src: String) -> bool {
true true
} }

View file

@ -19,6 +19,7 @@ use self::{disk::DiskStorage, memory::MemoryStorage, mixed::MixedStorage};
pub trait Storage { pub trait Storage {
async fn init(&mut self) -> anyhow::Result<()>; async fn init(&mut self) -> anyhow::Result<()>;
async fn eligible(&self, src: String) -> bool; async fn eligible(&self, src: String) -> bool;
async fn tick(&self) -> anyhow::Result<()>;
async fn delete(&mut self, key: String) -> anyhow::Result<()>; async fn delete(&mut self, key: String) -> anyhow::Result<()>;
async fn retrieve( async fn retrieve(
&self, &self,
@ -50,6 +51,14 @@ impl StoragePool {
Ok(()) Ok(())
} }
pub async fn tick(&mut self) -> anyhow::Result<()> {
for item in &mut self.storages {
item.tick().await?;
}
Ok(())
}
pub async fn retrieve( pub async fn retrieve(
&self, &self,
src: String, src: String,