add first iter of mem storage + add crypt mod

This commit is contained in:
qpismont 2023-12-28 23:25:51 +01:00
parent 02bbc48b15
commit 2f3c998894
11 changed files with 294 additions and 242 deletions

3
.gitignore vendored
View file

@ -1,3 +1,4 @@
/target /target
config.yml config.yml
.env .env
.vscode

152
Cargo.lock generated
View file

@ -38,54 +38,6 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "anstream"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87"
[[package]]
name = "anstyle-parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
dependencies = [
"windows-sys",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628"
dependencies = [
"anstyle",
"windows-sys",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.75" version = "1.0.75"
@ -275,52 +227,6 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "clap"
version = "4.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1"
dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim",
]
[[package]]
name = "clap_derive"
version = "4.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.3" version = "0.9.3"
@ -479,6 +385,21 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "futures"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.29" version = "0.3.29"
@ -486,6 +407,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink",
] ]
[[package]] [[package]]
@ -494,6 +416,17 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
[[package]]
name = "futures-executor"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.29" version = "0.3.29"
@ -529,6 +462,7 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro", "futures-macro",
@ -808,17 +742,16 @@ dependencies = [
"async-trait", "async-trait",
"axum", "axum",
"axum-macros", "axum-macros",
"clap", "base64",
"dotenvy", "dotenvy",
"futures",
"magic-crypt", "magic-crypt",
"queryst",
"regex", "regex",
"reqwest", "reqwest",
"sentry", "sentry",
"serde", "serde",
"serde_regex", "serde_regex",
"serde_yaml", "serde_yaml",
"thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
@ -1181,19 +1114,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "queryst"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1cbeb75ac695daf201ca2d66d9c684f873b135f28af4f2c79952478cab3b9d9"
dependencies = [
"lazy_static",
"percent-encoding",
"regex",
"serde",
"serde_json",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.33" version = "1.0.33"
@ -1645,12 +1565,6 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.38" version = "2.0.38"
@ -2012,12 +1926,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.5.0" version = "1.5.0"

View file

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
futures = "0.3"
tokio = { version = "1.35", features = ["full", "tracing"] } tokio = { version = "1.35", features = ["full", "tracing"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
@ -16,13 +17,11 @@ reqwest = { version = "0.11", features = ["stream"] }
anyhow = "1.0" anyhow = "1.0"
sentry = "0.32" sentry = "0.32"
dotenvy = "0.15" dotenvy = "0.15"
thiserror = "1.0"
axum-macros = "0.4" axum-macros = "0.4"
magic-crypt = "3.1" magic-crypt = "3.1"
async-trait = "0.1" async-trait = "0.1"
queryst = "3"
clap = { version = "4.4.10", features = ["derive"] }
serde = "1.0" serde = "1.0"
serde_yaml = "0.9" serde_yaml = "0.9"
base64 = "0.21"
regex = "1.10" regex = "1.10"
serde_regex = "1.1" serde_regex = "1.1"

View file

@ -1,25 +1,22 @@
use std::str::FromStr;
use anyhow::anyhow; use anyhow::anyhow;
use serde::{Deserialize, de::DeserializeOwned}; use serde::Deserialize;
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct MemoryStorageConfig { pub struct MemoryStorageConfig {
pub max_size: usize, pub max_size: usize,
pub ttl: usize pub ttl: usize,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct DiskStorageConfig { pub struct DiskStorageConfig {
pub path: String, pub path: String,
pub max_size: usize pub max_size: usize,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
pub enum StorageStrategyConfig { pub enum StorageStrategyConfig {
Memory(MemoryStorageConfig), Memory(MemoryStorageConfig),
Disk(DiskStorageConfig) Disk(DiskStorageConfig),
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -27,19 +24,21 @@ pub struct StorageConfig {
pub strategy: StorageStrategyConfig, pub strategy: StorageStrategyConfig,
#[serde(with = "serde_regex")] #[serde(with = "serde_regex")]
pub regex: regex::Regex pub regex: regex::Regex,
} }
#[derive(Deserialize, Default)] #[derive(Deserialize, Default)]
pub struct YAMLConfig { pub struct YAMLConfig {
pub storages: Vec<StorageConfig>, pub storages: Vec<StorageConfig>,
pub secret_key: Option<String> pub secret_key: Option<String>,
} }
pub async fn load() -> YAMLConfig { pub async fn load() -> YAMLConfig {
tokio::fs::read("config.yml") tokio::fs::read("config.yml")
.await .await
.map_err(|err| anyhow!(err)) .map_err(|err| anyhow!(err))
.and_then(|content| serde_yaml::from_slice::<YAMLConfig>(&content).map_err(|err| anyhow!(err))) .and_then(|content| {
serde_yaml::from_slice::<YAMLConfig>(&content).map_err(|err| anyhow!(err))
})
.unwrap_or_default() .unwrap_or_default()
} }

19
src/crypt.rs Normal file
View file

@ -0,0 +1,19 @@
use std::io::Write;
use base64::engine::general_purpose;
use magic_crypt::{new_magic_crypt, MagicCryptTrait};
pub fn decryt<I>(value: I, secret_key: String) -> anyhow::Result<String>
where
I: AsRef<str>,
{
let mc = new_magic_crypt!(&secret_key, 256);
Ok(mc.decrypt_base64_to_string(value)?)
}
pub fn compute_key(from: String) -> String {
let mut enc = base64::write::EncoderStringWriter::new(&general_purpose::URL_SAFE_NO_PAD);
enc.write_all(from.as_bytes()).unwrap();
enc.into_inner()
}

View file

@ -1,28 +1,15 @@
use anyhow::anyhow; use axum::{routing::get, Router};
use axum::{ use tokio::sync::Mutex;
http::StatusCode,
routing::get, Router, extract::{Path, State}, response::IntoResponse,
};
use magic_crypt::{new_magic_crypt, MagicCryptTrait}; use std::sync::Arc;
use tokio_stream::{self as stream, StreamExt};
use tokio_util::{bytes::Bytes};
use tracing::{warn};
use clap::Parser;
use std::{sync::Arc, path::PathBuf}; use crate::{state::AppState, storages::StoragePool};
use crate::storages::StoragePool;
mod config; mod config;
mod storages; mod crypt;
mod routes; mod routes;
mod state; mod state;
mod storages;
struct AppState {
secret_key: String,
authorized_commands: Vec<String>
}
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -32,59 +19,20 @@ async fn main() -> anyhow::Result<()> {
let secret_key = config.secret_key.clone(); let secret_key = config.secret_key.clone();
let storage_pool = StoragePool::from_config(config.storages); let storage_pool = StoragePool::from_config(config.storages);
let app_state = Arc::new(state::AppState::new(storage_pool, secret_key)); let app_state = AppState::new(storage_pool, secret_key);
println!("{}", config.secret_key.clone().unwrap_or_default());
let app = Router::new() let app = Router::new()
.route("/unsecure/*src", get(routes::handle_unsecure)) .route(
.route("/secure/*data", get(routes::handle_secure)) "/*src",
.with_state(app_state); match app_state.secret_key {
Some(_) => get(routes::handle_secure),
None => get(routes::handle_unsecure),
},
)
.with_state(Arc::new(Mutex::new(app_state)));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?; axum::serve(listener, app).await?;
Ok(()) Ok(())
} }
/*async fn handle_secure(Path(data): Path<String>, State(state): State<Arc<AppState>>) -> Result<impl IntoResponse, MyAnyhow> {
let mc = new_magic_crypt!(&state.secret_key, 256);
let decrypt = mc.decrypt_base64_to_string(data)?;
let split = decrypt.splitn(2, "/").map(String::from).collect::<Vec<String>>();
let opts = split.get(0).ok_or_else(|| anyhow::anyhow!("opts not found"))?;
let src = split.get(1).ok_or_else(|| anyhow::anyhow!("src not found"))?;
let bytes = handle(opts.clone(), src.clone(), state).await?;
Ok(axum::body::Body::from(bytes))
}
async fn handle_unsecure(Path((opts, src)): Path<(String, String)>, State(state): State<Arc<AppState>>) -> Result<impl IntoResponse, MyAnyhow> {
let bytes = handle(opts, src, state).await?;
Ok(axum::body::Body::from(bytes))
}*/
/*async fn handle(_opts: String, src: String) -> anyhow::Result<impl Stream<Item = Result<Bytes, reqwest::Error>>> {
let client = reqwest::Client::new();
let res = client.get(src).send().await?;
let stream = res.bytes_stream();
Ok(stream)
}*/
/*async fn handle(opts: String, src: String, app_state: Arc<AppState>) -> anyhow::Result<Bytes, MyAnyhow> {
let qs_opts = queryst::parse(&opts).map_err(|err| anyhow::anyhow!("{}", err.message))?;
let client = reqwest::Client::new();
let res = client.get(src.clone()).send().await?;
let bytes = res.bytes().await?;
if let Some(storage) = &app_state.storage {
}
Ok(bytes)
}*/

View file

@ -1,37 +1,66 @@
use std::{sync::Arc}; use std::{pin::Pin, sync::Arc};
use axum::{extract::{Path, State}, response::IntoResponse, http::StatusCode}; use axum::{
use magic_crypt::{new_magic_crypt, MagicCryptTrait}; extract::{Path, State},
http::StatusCode,
response::IntoResponse,
};
use futures::TryStreamExt;
use tokio::sync::Mutex;
use tokio_stream::Stream; use tokio_stream::Stream;
use tokio_util::bytes::Bytes; use tokio_util::bytes::Bytes;
use crate::state::AppState; use crate::{crypt, state::AppState};
pub async fn handle_secure(
Path(data): Path<String>,
state: State<Arc<Mutex<AppState>>>,
) -> Result<impl IntoResponse, MyAnyhow> {
let app_state = state.lock().await;
let decrypt = crypt::decryt(data, app_state.secret_key.clone().unwrap())?;
std::mem::drop(app_state);
pub async fn handle_secure(Path(data): Path<String>, State(state): State<Arc<AppState>>) -> Result<impl IntoResponse, MyAnyhow> { let bytes = handle(decrypt, state).await?;
let mc = new_magic_crypt!(&state.secret_key.clone().unwrap(), 256);
let decrypt = mc.decrypt_base64_to_string(data)?;
let split = decrypt.splitn(2, "/").map(String::from).collect::<Vec<String>>();
let opts = split.get(0).ok_or_else(|| anyhow::anyhow!("opts not found"))?;
let src = split.get(1).ok_or_else(|| anyhow::anyhow!("src not found"))?;
let bytes = handle(opts.clone(), src.clone()).await?;
Ok(axum::body::Body::from_stream(bytes)) Ok(axum::body::Body::from_stream(bytes))
} }
pub async fn handle_unsecure(Path((opts, src)): Path<(String, String)>, State(state): State<Arc<AppState>>) -> Result<impl IntoResponse, MyAnyhow> { pub async fn handle_unsecure(
let bytes = handle(opts, src).await?; Path(src): Path<String>,
state: State<Arc<Mutex<AppState>>>,
) -> Result<impl IntoResponse, MyAnyhow> {
let bytes = handle(src, state).await?;
Ok(axum::body::Body::from_stream(bytes)) Ok(axum::body::Body::from_stream(bytes))
} }
async fn handle(_opts: String, src: String) -> anyhow::Result<impl Stream<Item = Result<Bytes, reqwest::Error>>> { async fn handle(
src: String,
state: State<Arc<Mutex<AppState>>>,
) -> anyhow::Result<impl Stream<Item = Result<Bytes, anyhow::Error>> + Send> {
let mut app_state = state.lock().await;
match app_state.storage_pool.retrieve(src.clone()).await {
Some(stream) => Ok(stream),
None => {
let save_stream = fetch(src.clone()).await?;
app_state
.storage_pool
.save(src.clone(), save_stream)
.await?;
Ok(app_state.storage_pool.retrieve(src).await.unwrap())
}
}
}
async fn fetch(
src: String,
) -> anyhow::Result<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let res = client.get(src).send().await?; let res = client.get(src).send().await?;
let stream = res.bytes_stream(); let stream = res.bytes_stream().map_err(|err| anyhow::anyhow!(err));
Ok(stream) Ok(Box::pin(stream))
} }
pub struct MyAnyhow(anyhow::Error); pub struct MyAnyhow(anyhow::Error);
@ -42,8 +71,11 @@ impl IntoResponse for MyAnyhow {
} }
} }
impl <T>From<T> for MyAnyhow where T: Into<anyhow::Error> { impl<T> From<T> for MyAnyhow
where
T: Into<anyhow::Error>,
{
fn from(value: T) -> Self { fn from(value: T) -> Self {
Self(value.into()) Self(value.into())
} }
} }

View file

@ -2,11 +2,14 @@ use crate::storages::StoragePool;
pub struct AppState { pub struct AppState {
pub secret_key: Option<String>, pub secret_key: Option<String>,
pub storage_pool: StoragePool pub storage_pool: StoragePool,
} }
impl AppState { impl AppState {
pub fn new(storage_pool: StoragePool, secret_key: Option<String>) -> Self { pub fn new(storage_pool: StoragePool, secret_key: Option<String>) -> Self {
Self { secret_key, storage_pool } Self {
secret_key,
storage_pool,
}
} }
} }

View file

@ -1,9 +1,14 @@
use std::pin::Pin;
use crate::config::DiskStorageConfig; use crate::config::DiskStorageConfig;
use async_trait::async_trait;
use tokio_stream::Stream;
use tokio_util::bytes::Bytes;
use super::Storage; use super::Storage;
pub struct DiskStorage { pub struct DiskStorage {
config: DiskStorageConfig config: DiskStorageConfig,
} }
impl DiskStorage { impl DiskStorage {
@ -12,4 +17,28 @@ impl DiskStorage {
} }
} }
impl Storage for DiskStorage {} #[async_trait]
impl Storage for DiskStorage {
async fn eligible(&self, src: String) -> bool {
false
}
async fn delete(&mut self, key: String) -> anyhow::Result<()> {
Ok(())
}
async fn retrieve(
&self,
key: String,
) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> {
None
}
async fn save(
&mut self,
key: String,
stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
) -> anyhow::Result<()> {
todo!()
}
}

View file

@ -1,15 +1,74 @@
use std::{collections::HashMap, pin::Pin};
use anyhow::Ok;
use async_trait::async_trait;
use tokio_stream::{Stream, StreamExt};
use tokio_util::bytes::Bytes;
use crate::config::MemoryStorageConfig; use crate::config::MemoryStorageConfig;
use super::Storage; use super::Storage;
pub struct MemoryStorage { pub struct MemoryStorage {
config: MemoryStorageConfig config: MemoryStorageConfig,
items: HashMap<String, Vec<u8>>,
} }
impl MemoryStorage { impl MemoryStorage {
pub fn new(config: MemoryStorageConfig) -> Self { pub fn new(config: MemoryStorageConfig) -> Self {
Self { config } Self {
config,
items: HashMap::new(),
}
} }
} }
impl Storage for MemoryStorage {} #[async_trait]
impl Storage for MemoryStorage {
async fn eligible(&self, src: String) -> bool {
true
}
async fn delete(&mut self, key: String) -> anyhow::Result<()> {
self.items
.remove(&key)
.ok_or_else(|| anyhow::anyhow!(""))
.map(|_| ())
}
async fn retrieve(
&self,
key: String,
) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> {
match self.items.get(&key) {
Some(item) => {
let bytes = tokio_util::bytes::Bytes::from(item.clone());
let stream = futures::stream::iter([Ok(bytes)]);
Some(Box::pin(stream))
}
None => None,
}
}
async fn save(
&mut self,
key: String,
mut stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
) -> anyhow::Result<()> {
if !self.items.contains_key(&key) {
self.items.insert(key.clone(), Vec::new());
}
while let Some(chunk) = stream.next().await {
self.items
.get_mut(&key)
.unwrap()
.append(&mut chunk?.to_vec());
}
println!("{} saved", key);
Ok(())
}
}

View file

@ -1,28 +1,83 @@
pub mod memory;
pub mod disk; pub mod disk;
pub mod memory;
use std::pin::Pin;
use async_trait::async_trait; use async_trait::async_trait;
use tokio_stream::Stream;
use tokio_util::bytes::Bytes;
use crate::config::StorageConfig; use crate::{
config::{StorageConfig, StorageStrategyConfig},
crypt,
};
use self::{memory::MemoryStorage, disk::DiskStorage}; use self::{disk::DiskStorage, memory::MemoryStorage};
#[async_trait] #[async_trait]
pub trait Storage {} pub trait Storage {
async fn eligible(&self, src: String) -> bool;
async fn delete(&mut self, key: String) -> anyhow::Result<()>;
async fn retrieve(
&self,
key: String,
) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>>;
async fn save(
&mut self,
key: String,
stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
) -> anyhow::Result<()>;
}
pub struct StoragePool { pub struct StoragePool {
storages: Vec<Box<dyn Storage + Sync + Send>> storages: Vec<Box<dyn Storage + Sync + Send>>,
} }
impl StoragePool { impl StoragePool {
pub fn from_config(items: Vec<StorageConfig>) -> Self { pub fn from_config(items: Vec<StorageConfig>) -> Self {
Self { storages: items.into_iter().map(create_storage).collect() } Self {
storages: items.into_iter().map(create_storage).collect(),
}
}
pub async fn retrieve(
&self,
src: String,
) -> Option<Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>> {
let mut stream = None;
let key = crypt::compute_key(src);
for item in &self.storages {
if let Some(storage_stream) = item.retrieve(key.clone()).await {
stream = Some(storage_stream);
break;
}
}
stream
}
pub async fn save(
&mut self,
src: String,
stream: Pin<Box<dyn Stream<Item = Result<Bytes, anyhow::Error>> + Send>>,
) -> anyhow::Result<()> {
let key = crypt::compute_key(src);
for item in self.storages.iter_mut() {
if item.eligible(key.clone()).await {
item.save(key, stream).await?;
break;
}
}
Ok(())
} }
} }
fn create_storage(item: StorageConfig) -> Box<dyn Storage + Sync + Send> { fn create_storage(item: StorageConfig) -> Box<dyn Storage + Sync + Send> {
match item.strategy { match item.strategy {
crate::config::StorageMethodConfig::Memory(config) => Box::new(MemoryStorage::new(config)), StorageStrategyConfig::Memory(config) => Box::new(MemoryStorage::new(config)),
crate::config::StorageMethodConfig::Disk(config) => Box::new(DiskStorage::new(config)), StorageStrategyConfig::Disk(config) => Box::new(DiskStorage::new(config)),
} }
} }