first commit
This commit is contained in:
commit
97fb66c4f0
8 changed files with 1828 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
/target
|
8
.idea/.gitignore
vendored
Normal file
8
.idea/.gitignore
vendored
Normal file
|
@ -0,0 +1,8 @@
|
|||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
8
.idea/modules.xml
Normal file
8
.idea/modules.xml
Normal file
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/nano-service-rs.iml" filepath="$PROJECT_DIR$/.idea/nano-service-rs.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
11
.idea/nano-service-rs.iml
Normal file
11
.idea/nano-service-rs.iml
Normal file
|
@ -0,0 +1,11 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="EMPTY_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$">
|
||||
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/target" />
|
||||
</content>
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
6
.idea/vcs.xml
Normal file
6
.idea/vcs.xml
Normal file
|
@ -0,0 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
1491
Cargo.lock
generated
Normal file
1491
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
16
Cargo.toml
Normal file
16
Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "nano-service-rs"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.36", features = ["full"] }
|
||||
async-nats = "0.34"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
anyhow = "1.0"
|
||||
validator = { version = "0.17", features = ["derive"] }
|
||||
futures = "0.3"
|
||||
async-trait = "0.1"
|
||||
tokio-util = "0.7"
|
287
src/lib.rs
Normal file
287
src/lib.rs
Normal file
|
@ -0,0 +1,287 @@
|
|||
use std::{collections::HashMap, pin::Pin, sync::Arc};
|
||||
|
||||
use async_nats::{Message, ToServerAddrs};
|
||||
use futures::{Future, StreamExt};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::{bytes::Bytes, sync::CancellationToken};
|
||||
use validator::Validate;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct Request {
|
||||
from: String,
|
||||
body: Bytes
|
||||
}
|
||||
|
||||
impl Request {
|
||||
pub fn parse_body<T>(&self) -> Result<T, Response> where T: DeserializeOwned + Validate {
|
||||
let value: T = serde_json::from_slice(&self.body)?;
|
||||
value.validate().map_err(|err| Response::with_body(err.to_string(), 400).unwrap())?;
|
||||
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub fn with_body<T>(body: T, from: &str) -> Result<Self, Response> where T: Serialize + Validate {
|
||||
let serialized_body = serde_json::to_vec(&body)?;
|
||||
|
||||
Ok(Self { from: String::from(from), body: serialized_body.into() })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct Response {
|
||||
code: u16,
|
||||
body: Bytes
|
||||
}
|
||||
|
||||
impl Response {
|
||||
pub fn with_body<T>(body: T, code: u16) -> Result<Self, Response> where T: Serialize {
|
||||
let serialized_body = serde_json::to_vec(&body)?;
|
||||
Ok(Self { body: serialized_body.into(), code })
|
||||
}
|
||||
|
||||
pub fn parse_body<T>(&self) -> Result<T, Response> where T: DeserializeOwned {
|
||||
Ok(serde_json::from_slice(&self.body)?)
|
||||
}
|
||||
}
|
||||
|
||||
impl <T>From<T> for Response where T: Into<anyhow::Error> {
|
||||
fn from(value: T) -> Self {
|
||||
let anyhow_err: anyhow::Error = value.into();
|
||||
|
||||
Response::with_body(anyhow_err.to_string(), 500).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Response {
|
||||
fn default() -> Self {
|
||||
Self { code: 200, body: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
type HandlerFn = Box<dyn Fn(Request) -> Pin<Box<dyn Future<Output = Result<Option<Response>, Response>> + Send>> + Send + Sync>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Service {
|
||||
name: String,
|
||||
client: Arc<async_nats::Client>,
|
||||
subscription: Arc<Mutex<async_nats::Subscriber>>,
|
||||
handlers: HashMap<String, Arc<HandlerFn>>,
|
||||
cancel_token: CancellationToken
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub async fn connect<T>(name: &str, addrs: T, cancel_token: Option<CancellationToken>) -> anyhow::Result<Self> where T: ToServerAddrs {
|
||||
let client = async_nats::connect(addrs).await?;
|
||||
let subscription = client.subscribe(format!("{}.*", name)).await?;
|
||||
|
||||
Ok(Self {
|
||||
name: String::from(name),
|
||||
client: Arc::new(client),
|
||||
subscription: Arc::new(Mutex::new(subscription)),
|
||||
handlers: HashMap::new(),
|
||||
cancel_token: cancel_token.unwrap_or_default()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn handle(&mut self, subject: &str, handle: HandlerFn) {
|
||||
self.handlers.insert(String::from(subject), Arc::new(handle));
|
||||
}
|
||||
|
||||
pub async fn request<T>(&self, subject: &str, body: T) -> Result<Response, Response> where T: Serialize + Validate {
|
||||
let request = Request::with_body(body, &self.name)?;
|
||||
let serialized_request = serde_json::to_vec(&request)?;
|
||||
|
||||
let nats_response = self.client.request(String::from(subject), serialized_request.into()).await?;
|
||||
let service_response: Response = serde_json::from_slice(&nats_response.payload)?;
|
||||
|
||||
match service_response.code >= 200 && service_response.code <= 299 {
|
||||
true => Ok(service_response),
|
||||
false => Err(service_response),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn listen(&self) -> anyhow::Result<()> {
|
||||
let mut subscription = self.subscription.try_lock().map_err(|_| anyhow::anyhow!("service already listening"))?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = subscription.next() => {
|
||||
if let Some(msg) = msg {
|
||||
match self.handle_raw_msg(&msg).await {
|
||||
Ok(_) => {},
|
||||
Err(err) => println!("{}", err)
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = self.cancel_token.cancelled() => {
|
||||
subscription.unsubscribe().await?;
|
||||
self.client.flush().await?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn close(&self) {
|
||||
self.cancel_token.cancel();
|
||||
}
|
||||
|
||||
async fn handle_raw_msg(&self, msg: &Message) -> anyhow::Result<()> {
|
||||
let reply = msg.reply.clone();
|
||||
|
||||
match reply {
|
||||
Some(reply) => {
|
||||
if let Err(err) = self.apply_handler(msg).await {
|
||||
let serialized_request = serde_json::to_vec(&err)?;
|
||||
let _ = self.client.publish(reply, serialized_request.into()).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
},
|
||||
None => Err(anyhow::anyhow!("recieve msg with empty reply subject"))
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_handler(&self, msg: &Message) -> Result<(), Response> {
|
||||
let handler = self.find_handler(&msg.subject)?;
|
||||
let reply = msg.reply.clone().unwrap();
|
||||
|
||||
let request: Request = serde_json::from_slice(&msg.payload)?;
|
||||
let client = self.client.clone();
|
||||
tokio::spawn(async move {
|
||||
let res = (*handler)(request).await;
|
||||
|
||||
let msg = match res {
|
||||
Ok(res) => res.unwrap_or(Response::default()),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
let serialized_response = serde_json::to_vec(&msg).unwrap(); // todo: remove this unwrap
|
||||
client.publish(reply, serialized_response.into()).await
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn find_handler(&self, subject: &str) -> Result<Arc<HandlerFn>, Response> {
|
||||
let subject = subject.split('.')
|
||||
.last()
|
||||
.ok_or(Response::with_body(String::from("subject not exist"), 400)?)?;
|
||||
|
||||
self.handlers.get(&String::from(subject)).cloned()
|
||||
.ok_or(Response::with_body(String::from("handler not found"), 404)?)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use validator::Validate;
|
||||
use crate::{Response, Service};
|
||||
|
||||
#[derive(Serialize, Deserialize, Validate, Clone)]
|
||||
struct AccountLogin {
|
||||
#[validate(length(min = 4))]
|
||||
username: String,
|
||||
password: String
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct AccountLoginResponse {
|
||||
username: String,
|
||||
inserted_at: String
|
||||
}
|
||||
|
||||
async fn setup() -> anyhow::Result<Service> {
|
||||
let mut service = Service::connect("accounts", "nats://127.0.0.1:4222", None).await?;
|
||||
|
||||
service.handle("login", Box::new(|req| Box::pin(async move {
|
||||
let req_body: AccountLogin = req.parse_body()?;
|
||||
|
||||
Ok(Some(Response::with_body(AccountLoginResponse {username: req_body.username, inserted_at: String::from("2024-03-18")}, 201)?))
|
||||
})));
|
||||
|
||||
service.handle("return_error", Box::new(|_| Box::pin(async move {
|
||||
Err(Response::with_body("aie aie aie :/", 500)?)
|
||||
})));
|
||||
|
||||
let service_clone = service.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = service_clone.listen().await;
|
||||
});
|
||||
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn request_test() {
|
||||
let service = setup().await.unwrap();
|
||||
|
||||
let req_body = AccountLogin { username: String::from("Paul"), password: String::from("Azerty") };
|
||||
let res = service.request("accounts.login", req_body.clone()).await.unwrap();
|
||||
let res_body: AccountLoginResponse = res.parse_body().unwrap();
|
||||
|
||||
assert_eq!("Paul", res_body.username);
|
||||
assert_eq!("2024-03-18", res_body.inserted_at);
|
||||
assert_eq!(201, res.code);
|
||||
|
||||
service.close();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic]
|
||||
async fn bad_subject_request_test() {
|
||||
let service = setup().await.unwrap();
|
||||
|
||||
let req_body = AccountLogin { username: String::from("Paul"), password: String::from("Azerty") };
|
||||
let res = service.request("accounts#login", req_body.clone()).await;
|
||||
|
||||
service.close();
|
||||
|
||||
res.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic]
|
||||
async fn unknow_handler_request_test() {
|
||||
let service = setup().await.unwrap();
|
||||
|
||||
let req_body = AccountLogin { username: String::from("Paul"), password: String::from("Azerty") };
|
||||
let res = service.request("accounts.return_error", req_body.clone()).await;
|
||||
|
||||
service.close();
|
||||
|
||||
res.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic]
|
||||
async fn return_error_request_test() {
|
||||
let service = setup().await.unwrap();
|
||||
|
||||
let req_body = AccountLogin { username: String::from("Paul"), password: String::from("Azerty") };
|
||||
let res = service.request("accounts.login_not_exist", req_body.clone()).await;
|
||||
|
||||
service.close();
|
||||
|
||||
res.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic]
|
||||
async fn invalid_body_request_test() {
|
||||
let service = setup().await.unwrap();
|
||||
|
||||
let req_body = AccountLogin { username: String::from("Eva"), password: String::from("Azerty") };
|
||||
let res = service.request("accounts.login", req_body.clone()).await;
|
||||
|
||||
service.close();
|
||||
|
||||
res.unwrap();
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue