use anyhow::Ok; use async_trait::async_trait; use std::time::{Duration, SystemTime}; use std::{collections::HashMap, pin::Pin}; use tokio::time::Instant; use tokio_stream::{Stream, StreamExt}; use tokio_util::bytes::Bytes; use crate::config::MemoryStorageConfig; use super::Storage; struct MemoryStorageItem { data: Vec, added_at: Instant, } pub struct MemoryStorage { config: MemoryStorageConfig, items: HashMap, } impl MemoryStorage { pub fn new(config: MemoryStorageConfig) -> Self { Self { config, items: HashMap::new(), } } } #[async_trait] impl Storage for MemoryStorage { async fn init(&mut self) -> anyhow::Result<()> { Ok(()) } async fn eligible(&self, _src: String) -> bool { true } async fn tick(&mut self) -> anyhow::Result<()> { let ttl = self.config.ttl; self.items .retain(|_, elt| elt.added_at.elapsed().as_secs() < ttl as u64); Ok(()) } async fn delete(&mut self, key: String) -> anyhow::Result<()> { self.items .remove(&key) .ok_or_else(|| anyhow::anyhow!("")) .map(|_| ()) } async fn retrieve( &self, key: String, ) -> Option> + Send>>> { match self.items.get(&key) { Some(item) => { let bytes = tokio_util::bytes::Bytes::from(item.data.clone()); let stream = futures::stream::iter([Ok(bytes)]); Some(Box::pin(stream)) } None => None, } } async fn save( &mut self, key: String, mut stream: Pin> + Send>>, ) -> anyhow::Result<()> { if !self.items.contains_key(&key) { self.items.insert( key.clone(), MemoryStorageItem { data: Vec::new(), added_at: Instant::now(), }, ); } println!("save {}", key); while let Some(chunk) = stream.next().await { self.items .get_mut(&key) .unwrap() .data .append(&mut chunk?.to_vec()); } println!("{} saved", key); Ok(()) } }