diff --git a/src/proxy.ts b/src/proxy.ts index cbfa8eb..e0ed08d 100644 --- a/src/proxy.ts +++ b/src/proxy.ts @@ -1,4 +1,5 @@ import * as net from "net"; +import EventEmitter from "node:events"; import { Transform } from "node:stream"; import { DynamicModule, Injectable, Logger, Module } from "@nestjs/common"; @@ -15,6 +16,9 @@ interface Proxy { class TcpProxy implements Proxy { private readonly logger: Logger; private readonly server: net.Server; + private readonly eventEmitter = new EventEmitter(); + + private destroyed = false; public constructor( private readonly userKey: string, @@ -32,28 +36,41 @@ class TcpProxy implements Proxy { } public listen() { + if (this.destroyed) return Promise.resolve(); + return new Promise((res, rej) => { const server = this.server.listen(this.listeningPort, () => { this.logger.log("Started tcp proxy"); res(); }); + server.on("error", rej); }); } public destroy() { + if (this.destroyed) return Promise.resolve(); + + this.eventEmitter.emit("destroy"); + return new Promise((res, rej) => { this.server.close((err) => { if (err) rej(err); this.logger.log("Destroyed tcp proxy"); + this.destroyed = true; res(); }); }); } private getServer() { + const eventEmitter = this.eventEmitter; + function handleClient(this: TcpProxy, clientSocket: net.Socket) { - // TODO: add recovery mechanism + eventEmitter.once("destroy", () => { + clientSocket.destroy(); + }); + const retryConnection = () => { this.logger.log(`Retrying connection in ${500 / 1000} seconds...`); setTimeout(() => handleClient.bind(this)(clientSocket), 500); @@ -144,9 +161,7 @@ export class ProxyStorage { ); this.eventEmitter.on("disable-user", (userKey) => withErrorLogging(async () => { - await userStatsService.assert(userKey); - const user = await userFactory.get(userKey); - await this.delete(user.config.key); + await this.delete(userKey); }, this.logger) ); this.eventEmitter.on("update-user", (prev, curr) => @@ -186,6 +201,7 @@ export class ProxyStorage { } this.proxies[userKey] = proxy; + await proxy.listen(); return proxy;