import { Socket as Engine, installTimerFunctions, } from "engine.io-client"; import { Socket } from "./socket.js"; import * as parser from "socket.io-parser"; import { on } from "./on.js"; import { Backoff } from "./contrib/backo2.js"; import { Emitter, } from "@socket.io/component-emitter"; import debugModule from "debug"; // debug() const debug = debugModule("socket.io-client:manager"); // debug() export class Manager extends Emitter { constructor(uri, opts) { var _a; super(); this.nsps = {}; this.subs = []; if (uri && "object" === typeof uri) { opts = uri; uri = undefined; } opts = opts || {}; opts.path = opts.path || "/socket.io"; this.opts = opts; installTimerFunctions(this, opts); this.reconnection(opts.reconnection !== false); this.reconnectionAttempts(opts.reconnectionAttempts || Infinity); this.reconnectionDelay(opts.reconnectionDelay || 1000); this.reconnectionDelayMax(opts.reconnectionDelayMax || 5000); this.randomizationFactor((_a = opts.randomizationFactor) !== null && _a !== void 0 ? _a : 0.5); this.backoff = new Backoff({ min: this.reconnectionDelay(), max: this.reconnectionDelayMax(), jitter: this.randomizationFactor(), }); this.timeout(null == opts.timeout ? 20000 : opts.timeout); this._readyState = "closed"; this.uri = uri; const _parser = opts.parser || parser; this.encoder = new _parser.Encoder(); this.decoder = new _parser.Decoder(); this._autoConnect = opts.autoConnect !== false; if (this._autoConnect) this.open(); } reconnection(v) { if (!arguments.length) return this._reconnection; this._reconnection = !!v; return this; } reconnectionAttempts(v) { if (v === undefined) return this._reconnectionAttempts; this._reconnectionAttempts = v; return this; } reconnectionDelay(v) { var _a; if (v === undefined) return this._reconnectionDelay; this._reconnectionDelay = v; (_a = this.backoff) === null || _a === void 0 ? void 0 : _a.setMin(v); return this; } randomizationFactor(v) { var _a; if (v === undefined) return this._randomizationFactor; this._randomizationFactor = v; (_a = this.backoff) === null || _a === void 0 ? void 0 : _a.setJitter(v); return this; } reconnectionDelayMax(v) { var _a; if (v === undefined) return this._reconnectionDelayMax; this._reconnectionDelayMax = v; (_a = this.backoff) === null || _a === void 0 ? void 0 : _a.setMax(v); return this; } timeout(v) { if (!arguments.length) return this._timeout; this._timeout = v; return this; } /** * Starts trying to reconnect if reconnection is enabled and we have not * started reconnecting yet * * @private */ maybeReconnectOnOpen() { // Only try to reconnect if it's the first time we're connecting if (!this._reconnecting && this._reconnection && this.backoff.attempts === 0) { // keeps reconnection from firing twice for the same reconnection loop this.reconnect(); } } /** * Sets the current transport `socket`. * * @param {Function} fn - optional, callback * @return self * @public */ open(fn) { debug("readyState %s", this._readyState); if (~this._readyState.indexOf("open")) return this; debug("opening %s", this.uri); this.engine = new Engine(this.uri, this.opts); const socket = this.engine; const self = this; this._readyState = "opening"; this.skipReconnect = false; // emit `open` const openSubDestroy = on(socket, "open", function () { self.onopen(); fn && fn(); }); // emit `error` const errorSub = on(socket, "error", (err) => { debug("error"); self.cleanup(); self._readyState = "closed"; this.emitReserved("error", err); if (fn) { fn(err); } else { // Only do this if there is no fn to handle the error self.maybeReconnectOnOpen(); } }); if (false !== this._timeout) { const timeout = this._timeout; debug("connect attempt will timeout after %d", timeout); if (timeout === 0) { openSubDestroy(); // prevents a race condition with the 'open' event } // set timer const timer = this.setTimeoutFn(() => { debug("connect attempt timed out after %d", timeout); openSubDestroy(); socket.close(); // @ts-ignore socket.emit("error", new Error("timeout")); }, timeout); if (this.opts.autoUnref) { timer.unref(); } this.subs.push(function subDestroy() { clearTimeout(timer); }); } this.subs.push(openSubDestroy); this.subs.push(errorSub); return this; } /** * Alias for open() * * @return self * @public */ connect(fn) { return this.open(fn); } /** * Called upon transport open. * * @private */ onopen() { debug("open"); // clear old subs this.cleanup(); // mark as open this._readyState = "open"; this.emitReserved("open"); // add new subs const socket = this.engine; this.subs.push(on(socket, "ping", this.onping.bind(this)), on(socket, "data", this.ondata.bind(this)), on(socket, "error", this.onerror.bind(this)), on(socket, "close", this.onclose.bind(this)), on(this.decoder, "decoded", this.ondecoded.bind(this))); } /** * Called upon a ping. * * @private */ onping() { this.emitReserved("ping"); } /** * Called with data. * * @private */ ondata(data) { this.decoder.add(data); } /** * Called when parser fully decodes a packet. * * @private */ ondecoded(packet) { this.emitReserved("packet", packet); } /** * Called upon socket error. * * @private */ onerror(err) { debug("error", err); this.emitReserved("error", err); } /** * Creates a new socket for the given `nsp`. * * @return {Socket} * @public */ socket(nsp, opts) { let socket = this.nsps[nsp]; if (!socket) { socket = new Socket(this, nsp, opts); this.nsps[nsp] = socket; } return socket; } /** * Called upon a socket close. * * @param socket * @private */ _destroy(socket) { const nsps = Object.keys(this.nsps); for (const nsp of nsps) { const socket = this.nsps[nsp]; if (socket.active) { debug("socket %s is still active, skipping close", nsp); return; } } this._close(); } /** * Writes a packet. * * @param packet * @private */ _packet(packet) { debug("writing packet %j", packet); const encodedPackets = this.encoder.encode(packet); for (let i = 0; i < encodedPackets.length; i++) { this.engine.write(encodedPackets[i], packet.options); } } /** * Clean up transport subscriptions and packet buffer. * * @private */ cleanup() { debug("cleanup"); this.subs.forEach((subDestroy) => subDestroy()); this.subs.length = 0; this.decoder.destroy(); } /** * Close the current socket. * * @private */ _close() { debug("disconnect"); this.skipReconnect = true; this._reconnecting = false; this.onclose("forced close"); if (this.engine) this.engine.close(); } /** * Alias for close() * * @private */ disconnect() { return this._close(); } /** * Called upon engine close. * * @private */ onclose(reason, description) { debug("closed due to %s", reason); this.cleanup(); this.backoff.reset(); this._readyState = "closed"; this.emitReserved("close", reason, description); if (this._reconnection && !this.skipReconnect) { this.reconnect(); } } /** * Attempt a reconnection. * * @private */ reconnect() { if (this._reconnecting || this.skipReconnect) return this; const self = this; if (this.backoff.attempts >= this._reconnectionAttempts) { debug("reconnect failed"); this.backoff.reset(); this.emitReserved("reconnect_failed"); this._reconnecting = false; } else { const delay = this.backoff.duration(); debug("will wait %dms before reconnect attempt", delay); this._reconnecting = true; const timer = this.setTimeoutFn(() => { if (self.skipReconnect) return; debug("attempting reconnect"); this.emitReserved("reconnect_attempt", self.backoff.attempts); // check again for the case socket closed in above events if (self.skipReconnect) return; self.open((err) => { if (err) { debug("reconnect attempt error"); self._reconnecting = false; self.reconnect(); this.emitReserved("reconnect_error", err); } else { debug("reconnect success"); self.onreconnect(); } }); }, delay); if (this.opts.autoUnref) { timer.unref(); } this.subs.push(function subDestroy() { clearTimeout(timer); }); } } /** * Called upon successful reconnect. * * @private */ onreconnect() { const attempt = this.backoff.attempts; this._reconnecting = false; this.backoff.reset(); this.emitReserved("reconnect", attempt); } }