import { AdaptorSubscribeTypeFn } from "../types.ts"; import Adaptor from "./adaptor.ts"; import { Codec, connect, ConnectionOptions, NatsConnection, StringCodec, Subscription, } from "nats"; export default class NatsAdaptor implements Adaptor { private options: ConnectionOptions; private nc?: NatsConnection; private callbacks: Record; private sc: Codec; private sub?: Subscription; constructor(options: ConnectionOptions) { this.options = options; this.sc = StringCodec(); this.callbacks = {}; } async listen(serviceName: string) { this.nc = await connect(this.options); this.sub = this.nc.subscribe(`${serviceName}.*`); (async (sub: Subscription) => { for await (const msg of sub) { const res = await this.callbacks[msg.subject](this.sc.decode(msg.data)); msg.respond(this.sc.encode(res)); } })(this.sub); } subscribe(subject: string, fn: AdaptorSubscribeTypeFn) { this.callbacks[subject] = fn; } async request(subject: string, req: string): Promise { if (this.nc) { const msg = await this.nc.request(subject, this.sc.encode(req)); return this.sc.decode(msg.data); } throw new Error("nats connection is not initialized"); } async stop(): Promise { await this.nc?.drain(); } }