From d1bee1421364f3b1d0b02dac2e5eebd7305a8cd9 Mon Sep 17 00:00:00 2001 From: qpismont Date: Sun, 14 Jul 2024 22:22:10 +0200 Subject: [PATCH] add tick function for memory storage --- src/storages/memory.rs | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/storages/memory.rs b/src/storages/memory.rs index 338d230..376c97e 100644 --- a/src/storages/memory.rs +++ b/src/storages/memory.rs @@ -1,7 +1,8 @@ -use std::{collections::HashMap, pin::Pin}; - use anyhow::Ok; use async_trait::async_trait; +use std::time::{Duration, SystemTime}; +use std::{collections::HashMap, pin::Pin}; +use tokio::time::Instant; use tokio_stream::{Stream, StreamExt}; use tokio_util::bytes::Bytes; @@ -9,9 +10,14 @@ use crate::config::MemoryStorageConfig; use super::Storage; +struct MemoryStorageItem { + data: Vec, + added_at: Instant, +} + pub struct MemoryStorage { config: MemoryStorageConfig, - items: HashMap>, + items: HashMap, } impl MemoryStorage { @@ -34,6 +40,10 @@ impl Storage for MemoryStorage { } async fn tick(&mut self) -> anyhow::Result<()> { + let ttl = self.config.ttl; + self.items + .retain(|_, elt| elt.added_at.elapsed().as_secs() < ttl as u64); + Ok(()) } @@ -50,7 +60,7 @@ impl Storage for MemoryStorage { ) -> Option> + Send>>> { match self.items.get(&key) { Some(item) => { - let bytes = tokio_util::bytes::Bytes::from(item.clone()); + let bytes = tokio_util::bytes::Bytes::from(item.data.clone()); let stream = futures::stream::iter([Ok(bytes)]); Some(Box::pin(stream)) @@ -65,7 +75,13 @@ impl Storage for MemoryStorage { mut stream: Pin> + Send>>, ) -> anyhow::Result<()> { if !self.items.contains_key(&key) { - self.items.insert(key.clone(), Vec::new()); + self.items.insert( + key.clone(), + MemoryStorageItem { + data: Vec::new(), + added_at: Instant::now(), + }, + ); } println!("save {}", key); @@ -74,6 +90,7 @@ impl Storage for MemoryStorage { self.items .get_mut(&key) .unwrap() + .data .append(&mut chunk?.to_vec()); }