nano-service/src/adaptors/nats.ts
2024-03-20 20:21:35 +01:00

58 lines
1.3 KiB
TypeScript

import {
type Codec,
type ConnectionOptions,
type NatsConnection,
StringCodec,
type Subscription,
connect,
} from "nats";
import type { AdaptorSubscribeTypeFn } from "../types.ts";
import type Adaptor from "./adaptor.ts";
export default class NatsAdaptor implements Adaptor {
private options: ConnectionOptions;
private nc?: NatsConnection;
private callbacks: Record<string, AdaptorSubscribeTypeFn>;
private sc: Codec<string>;
private sub?: Subscription;
constructor(options: ConnectionOptions) {
this.options = options;
this.sc = StringCodec();
this.callbacks = {};
}
async listen(serviceName: string) {
this.nc = await connect(this.options);
this.sub = this.nc.subscribe(`${serviceName}.*`);
(async (sub: Subscription) => {
for await (const msg of sub) {
const res = await this.callbacks[msg.subject](this.sc.decode(msg.data));
msg.respond(this.sc.encode(res));
}
})(this.sub);
}
subscribe(subject: string, fn: AdaptorSubscribeTypeFn) {
this.callbacks[subject] = fn;
}
async request(subject: string, req: string): Promise<string> {
if (this.nc) {
const msg = await this.nc.request(subject, this.sc.encode(req));
return this.sc.decode(msg.data);
}
throw new Error("nats connection is not initialized");
}
async stop(): Promise<void> {
this.sub?.unsubscribe();
await this.nc?.drain();
}
}