140 lines
3.6 KiB
Rust
140 lines
3.6 KiB
Rust
use std::{collections::HashMap, path::PathBuf, pin::Pin, time::SystemTime};
|
|
|
|
use crate::config::DiskStorageConfig;
|
|
use async_trait::async_trait;
|
|
use futures::TryStreamExt;
|
|
use tokio::io::AsyncWriteExt;
|
|
use tokio_stream::{Stream, StreamExt};
|
|
use tokio_util::{bytes::Bytes, io::ReaderStream};
|
|
|
|
use super::Storage;
|
|
|
|
struct DiskStorageItem {
|
|
path: PathBuf,
|
|
created_at: SystemTime,
|
|
}
|
|
|
|
impl DiskStorageItem {
|
|
fn new(path: PathBuf, created_at: SystemTime) -> Self {
|
|
Self {
|
|
path: path,
|
|
created_at: created_at,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct DiskStorage {
|
|
config: DiskStorageConfig,
|
|
items: HashMap<String, DiskStorageItem>,
|
|
}
|
|
|
|
impl DiskStorage {
|
|
pub fn new(config: DiskStorageConfig) -> Self {
|
|
Self {
|
|
config,
|
|
items: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
pub async fn retrieve_all(
|
|
&self,
|
|
) -> anyhow::Result<
|
|
Vec<(
|
|
String,
|
|
Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
|
|
)>,
|
|
> {
|
|
let mut files = Vec::new();
|
|
|
|
for key in self.items.keys() {
|
|
files.push((key.clone(), self.retrieve(key.clone()).await.unwrap()));
|
|
}
|
|
|
|
Ok(files)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Storage for DiskStorage {
|
|
async fn init(&mut self) -> anyhow::Result<()> {
|
|
tokio::fs::create_dir_all(self.config.path.clone()).await?;
|
|
|
|
let mut saved_files = tokio::fs::read_dir(self.config.path.clone()).await?;
|
|
while let Some(file) = saved_files.next_entry().await? {
|
|
let key = file.file_name().to_string_lossy().to_string();
|
|
let path = file.path();
|
|
let metadata = file.metadata().await?;
|
|
|
|
self.items
|
|
.insert(key, DiskStorageItem::new(path, metadata.created()?));
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn tick(&mut self) -> anyhow::Result<()> {
|
|
let paths_deleted = self
|
|
.items
|
|
.iter()
|
|
.filter(|(_, item)| item.created_at.elapsed().unwrap().as_secs() >= self.config.ttl)
|
|
.map(|(_, item)| item.path.clone())
|
|
.collect::<Vec<PathBuf>>();
|
|
|
|
self.items
|
|
.retain(|_, item| item.created_at.elapsed().unwrap().as_secs() < self.config.ttl);
|
|
|
|
for path in paths_deleted {
|
|
tokio::fs::remove_file(path).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn eligible(&self, _src: String) -> bool {
|
|
true
|
|
}
|
|
|
|
async fn delete(&mut self, _key: String) -> anyhow::Result<()> {
|
|
Ok(())
|
|
}
|
|
|
|
async fn retrieve(
|
|
&self,
|
|
key: String,
|
|
) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> {
|
|
let file_path = compute_file_path(self.config.path.clone(), key);
|
|
|
|
match tokio::fs::File::open(file_path).await {
|
|
Ok(file) => {
|
|
let stream = ReaderStream::new(file).map_err(|_| anyhow::anyhow!(""));
|
|
Some(Box::pin(stream))
|
|
}
|
|
Err(_) => None,
|
|
}
|
|
}
|
|
|
|
async fn save(
|
|
&mut self,
|
|
key: String,
|
|
mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
|
|
) -> anyhow::Result<()> {
|
|
let file_path = compute_file_path(self.config.path.clone(), key);
|
|
let mut file = tokio::fs::OpenOptions::new()
|
|
.write(true)
|
|
.create(true)
|
|
.open(file_path)
|
|
.await?;
|
|
|
|
while let Some(chunk) = stream.next().await {
|
|
file.write_all(&chunk?).await?;
|
|
}
|
|
|
|
file.flush().await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn compute_file_path(base: PathBuf, key: String) -> PathBuf {
|
|
PathBuf::from_iter([base, key.into()])
|
|
}
|