nano-service/src/service.ts
2023-10-19 22:53:31 +02:00

66 lines
No EOL
1.9 KiB
TypeScript

import Adaptor from "./adaptors/adaptor";
import { RequestError } from "./error";
import { Request, Response } from "./messages";
import { RouteSubscribeTypeFn } from "./types";
export default class Service {
private name: string;
private adaptors: Record<string, Adaptor>;
constructor(name: string) {
this.name = name;
this.adaptors = {};
}
public addAdaptor(name: string, adaptor: Adaptor) {
this.adaptors[name] = adaptor;
}
public async subscribe<T, U>(adaptor: string, subject: string, fn: RouteSubscribeTypeFn<T, U>) {
this.adaptors[adaptor].subscribe(`${this.name}.${subject}`, async (rawReq) => {
const msg: Request<T> = JSON.parse(rawReq);
const res = await fn({ data: msg.data, from: this.name });
return JSON.stringify(res);
});
}
public async request<T, U>(adaptor: string, req: Request<T>): Promise<Response<U>> {
if(!this.adaptors[adaptor]) {
throw new Error(`${adaptor} adaptor not exist`);
}
const rawReq = JSON.stringify(req);
try {
const rawRes = await this.adaptors[adaptor].request(`${req.service}.${req.subject}`, rawReq);
const res: Response<U> = JSON.parse(rawRes);
if(res.statusCode < 200 || res.statusCode >= 599) {
throw new RequestError("error while request", res.statusCode);
}
return res;
} catch(err) {
if(err instanceof RequestError) {
throw err;
} else {
throw new RequestError("unexpected error", 500);
}
}
}
public async listen() {
for(const index in this.adaptors) {
await this.adaptors[index].listen(this.name);
}
}
public async stop() {
for(const index in this.adaptors) {
await this.adaptors[index].stop();
}
}
}