Explorar el Código

Refactor reconnects (WIP)

Painor hace 2 años
padre
commit
72c3778171

+ 7 - 23
gramjs/client/TelegramClient.ts

@@ -1380,11 +1380,6 @@ export class TelegramClient extends TelegramBaseClient {
                 autoReconnectCallback: this._handleReconnect.bind(this),
             });
         }
-        // set defaults vars
-        this._sender.userDisconnected = false;
-        this._sender._userConnected = false;
-        this._sender._reconnecting = false;
-        this._sender._disconnected = true;
 
         const connection = new this._connection({
             ip: this.session.serverAddress,
@@ -1395,17 +1390,12 @@ export class TelegramClient extends TelegramBaseClient {
             socket: this.networkSocket,
             testServers: this.testServers,
         });
-        const newConnection = await this._sender.connect(connection);
-        if (!newConnection) {
-            // we're already connected so no need to reset auth key.
-            if (!this._loopStarted) {
-                _updateLoop(this);
-                this._loopStarted = true;
-            }
+        if (!(await this._sender.connect(connection))) {
             return;
         }
-
         this.session.setAuthKey(this._sender.authKey);
+        this.session.save();
+
         this._initRequest.query = new Api.help.GetConfig();
         this._log.info(`Using LAYER ${LAYER} for initial connect`);
         await this._sender.send(
@@ -1414,14 +1404,8 @@ export class TelegramClient extends TelegramBaseClient {
                 query: this._initRequest,
             })
         );
-
-        if (!this._loopStarted) {
-            _updateLoop(this);
-            this._loopStarted = true;
-        }
-        this._reconnecting = false;
+        _updateLoop(this);
     }
-
     //endregion
     // region Working with different connections/Data Centers
     /** @hidden */
@@ -1433,9 +1417,9 @@ export class TelegramClient extends TelegramBaseClient {
         // so it's not valid anymore. Set to undefined to force recreating it.
         await this._sender!.authKey.setKey(undefined);
         this.session.setAuthKey(undefined);
-        this._reconnecting = true;
-        await this.disconnect();
-        return this.connect();
+        this.session.save();
+        await this._disconnect();
+        return await this.connect();
     }
 
     /**

+ 7 - 3
gramjs/client/telegramBaseClient.ts

@@ -352,9 +352,7 @@ export abstract class TelegramBaseClient {
     }
 
     async disconnect() {
-        if (this._sender) {
-            await this._sender.disconnect();
-        }
+        await this._disconnect();
         await Promise.all(
             Object.values(this._exportedSenderPromises).map(
                 (promise: Promise<MTProtoSender>) => {
@@ -375,12 +373,18 @@ export abstract class TelegramBaseClient {
             number,
             Promise<MTProtoSender>
         >();
+
+        // TODO cancel hanging promises
     }
 
     get disconnected() {
         return !this._sender || this._sender._disconnected;
     }
 
+    async _disconnect() {
+        await this._sender?.disconnect();
+    }
+
     /**
      * Disconnects all senders and removes all handlers
      * Disconnect is safer as it will not remove your event handlers

+ 17 - 43
gramjs/client/updates.ts

@@ -19,6 +19,7 @@ const PING_DISCONNECT_DELAY = 60000; // 1 min
  It can be seen as the ``StopIteration`` in a for loop but for events.
  */
 export class StopPropagation extends Error {}
+
 /** @hidden */
 export function on(client: TelegramClient, event?: EventBuilder) {
     return (f: { (event: any): void }) => {
@@ -175,52 +176,26 @@ export async function _dispatchUpdate(
 
 /** @hidden */
 export async function _updateLoop(client: TelegramClient): Promise<void> {
-    while (!client._destroyed) {
-        await sleep(PING_INTERVAL);
-        if (client._reconnecting || client._sender!.userDisconnected) {
-            continue;
-        }
-        if (client._destroyed) {
-            return;
-        }
-
+    while (client.connected) {
         try {
-            await attempts(
-                () => {
-                    return timeout(
-                        client._sender!.send(
-                            new Api.PingDelayDisconnect({
-                                pingId: bigInt(
-                                    getRandomInt(
-                                        Number.MIN_SAFE_INTEGER,
-                                        Number.MAX_SAFE_INTEGER
-                                    )
-                                ),
-                                disconnectDelay: PING_DISCONNECT_DELAY,
-                            })
-                        ),
-                        PING_TIMEOUT
-                    );
-                },
-                PING_FAIL_ATTEMPTS,
-                PING_FAIL_INTERVAL
-            );
-        } catch (err: any) {
-            // eslint-disable-next-line no-console
-            if (client._reconnecting || client._sender!.userDisconnected) {
+            await sleep(60 * 1000);
+            if (!client._sender?._transportConnected()) {
                 continue;
             }
-
-            await client.disconnect();
-
-            await client.connect();
+            await client.invoke(
+                new Api.Ping({
+                    pingId: bigInt(
+                        getRandomInt(
+                            Number.MIN_SAFE_INTEGER,
+                            Number.MAX_SAFE_INTEGER
+                        )
+                    ),
+                })
+            );
+        } catch (e) {
+            return;
         }
-
-        // We need to send some content-related request at least hourly
-        // for Telegram to keep delivering updates, otherwise they will
-        // just stop even if we're connected. Do so every 30 minutes.
-
-        // TODO Call getDifference instead since it's more relevant
+        client.session.save();
         if (
             new Date().getTime() - (client._lastRequest || 0) >
             30 * 60 * 1000
@@ -232,7 +207,6 @@ export async function _updateLoop(client: TelegramClient): Promise<void> {
             }
         }
     }
-    await client.disconnect();
 }
 
 /** @hidden */

+ 3 - 0
gramjs/extensions/CancelHelper.ts

@@ -0,0 +1,3 @@
+export class CancelHelper {
+    length = 1;
+}

+ 0 - 26
gramjs/extensions/MessagePacker.ts

@@ -4,15 +4,6 @@ import { BinaryWriter } from "./BinaryWriter";
 import type { MTProtoState } from "../network/MTProtoState";
 import type { RequestState } from "../network/RequestState";
 
-const USE_INVOKE_AFTER_WITH = [
-    "messages.SendMessage",
-    "messages.SendMedia",
-    "messages.SendMultiMedia",
-    "messages.ForwardMessages",
-    "messages.SendInlineBotResult",
-    "users.GetUsers",
-];
-
 export class MessagePacker {
     private _state: MTProtoState;
     private _pendingStates: RequestState[];
@@ -42,18 +33,6 @@ export class MessagePacker {
         if (this.setReady) {
             this.setReady(true);
         }
-        // we don't want msg acks here
-        if (state && state.request.CONSTRUCTOR_ID != 1658238041) {
-            this._pendingStates.push(state);
-            state.promise
-                // Using finally causes triggering `unhandledrejection` event
-                .catch(() => {})
-                .finally(() => {
-                    this._pendingStates = this._pendingStates.filter(
-                        (s) => s !== state
-                    );
-                });
-        }
     }
 
     extend(states: RequestState[]) {
@@ -69,16 +48,11 @@ export class MessagePacker {
             });
             await this._ready;
         }
-        if (!this._queue[this._queue.length - 1]) {
-            this._queue = [];
-            return;
-        }
         let data;
         let buffer = new BinaryWriter(Buffer.alloc(0));
 
         const batch = [];
         let size = 0;
-
         while (
             this._queue.length &&
             batch.length <= MessageContainer.MAXIMUM_LENGTH

+ 1 - 0
gramjs/extensions/index.ts

@@ -5,3 +5,4 @@ export { PromisedWebSockets } from "./PromisedWebSockets";
 export { PromisedNetSockets } from "./PromisedNetSockets";
 export { MessagePacker } from "./MessagePacker";
 export { AsyncQueue } from "./AsyncQueue";
+export { CancelHelper } from "./CancelHelper";

+ 245 - 154
gramjs/network/MTProtoSender.ts

@@ -13,12 +13,8 @@
  */
 import { AuthKey } from "../crypto/AuthKey";
 import { MTProtoState } from "./MTProtoState";
-import {
-    BinaryReader,
-    Logger,
-    PromisedNetSockets,
-    PromisedWebSockets,
-} from "../extensions";
+
+import { BinaryReader, CancelHelper, Logger } from "../extensions";
 import { MessagePacker } from "../extensions";
 import { GZIPPacked, MessageContainer, RPCResult, TLMessage } from "../tl/core";
 import { Api } from "../tl";
@@ -37,6 +33,7 @@ import {
 import { Connection, UpdateConnectionState } from "./";
 import type { TelegramClient } from "..";
 import { LogLevel } from "../extensions/Logger";
+import { Mutex } from "async-mutex";
 
 interface DEFAULT_OPTIONS {
     logger: any;
@@ -103,6 +100,12 @@ export class MTProtoSender {
     isConnecting: boolean;
     _authenticated: boolean;
     private _securityChecks: boolean;
+    private _connectMutex: Mutex;
+    private _recvCancelPromise: Promise<CancelHelper>;
+    private _recvCancelResolve?: (value: CancelHelper) => void;
+    private _sendCancelPromise: Promise<CancelHelper>;
+    private _sendCancelResolve?: (value: CancelHelper) => void;
+    private _cancelSend: boolean;
 
     /**
      * @param authKey
@@ -113,6 +116,7 @@ export class MTProtoSender {
             ...MTProtoSender.DEFAULT_OPTIONS,
             ...opts,
         };
+        this._cancelSend = false;
         this._connection = undefined;
         this._log = args.logger;
         this._dcId = args.dcId;
@@ -129,6 +133,14 @@ export class MTProtoSender {
         this._onConnectionBreak = args.onConnectionBreak;
         this._securityChecks = args.securityChecks;
 
+        this._connectMutex = new Mutex();
+
+        this._recvCancelPromise = new Promise((resolve) => {
+            this._recvCancelResolve = resolve;
+        });
+        this._sendCancelPromise = new Promise((resolve) => {
+            this._sendCancelResolve = resolve;
+        });
         /**
          * whether we disconnected ourself or telegram did it.
          */
@@ -232,59 +244,39 @@ export class MTProtoSender {
     /**
      * Connects to the specified given connection using the given auth key.
      */
-    async connect(connection: Connection, force?: boolean) {
-        if (this._userConnected && !force) {
-            this._log.info("User is already connected!");
-            return false;
-        }
-        this.isConnecting = true;
-        this._connection = connection;
-
-        for (let attempt = 0; attempt < this._retries; attempt++) {
-            try {
-                await this._connect();
-                if (this._updateCallback) {
-                    this._updateCallback(
-                        this._client,
-                        new UpdateConnectionState(
-                            UpdateConnectionState.connected
-                        )
-                    );
-                }
-                break;
-            } catch (err) {
-                if (this._updateCallback && attempt === 0) {
-                    this._updateCallback(
-                        this._client,
-                        new UpdateConnectionState(
-                            UpdateConnectionState.disconnected
-                        )
-                    );
-                }
-                this._log.error(
-                    `WebSocket connection failed attempt: ${attempt + 1}`
-                );
-                if (this._log.canSend(LogLevel.ERROR)) {
-                    console.error(err);
-                }
-                await sleep(this._delay);
+    async connect(connection: Connection): Promise<boolean> {
+        const release = await this._connectMutex.acquire();
+        try {
+            if (this._userConnected) {
+                this._log.info("User is already connected!");
+                return false;
             }
+            this._connection = connection;
+            await this._connect();
+            this._userConnected = true;
+            return true;
+        } finally {
+            release();
         }
-        this.userDisconnected = false;
-        this.isConnecting = false;
-        return true;
     }
 
     isConnected() {
         return this._userConnected;
     }
 
+    _transportConnected() {
+        return (
+            !this._reconnecting &&
+            this._connection &&
+            this._connection._connected
+        );
+    }
+
     /**
      * Cleanly disconnects the instance from the network, cancels
      * all pending requests, and closes the send and receive loops.
      */
     async disconnect() {
-        this.userDisconnected = true;
         await this._disconnect();
     }
 
@@ -337,43 +329,58 @@ export class MTProtoSender {
                 .replace("{0}", this._connection!.toString())
                 .replace("{1}", this._connection!.socket.toString())
         );
-        await this._connection!.connect();
-        this._log.debug("Connection success!");
-
-        if (!this.authKey.getKey()) {
-            const plain = new MTProtoPlainSender(this._connection, this._log);
-            this._log.debug("New auth_key attempt ...");
-            const res = await doAuthentication(plain, this._log);
-            this._log.debug("Generated new auth_key successfully");
-            await this.authKey.setKey(res.authKey);
-
-            this._state.timeOffset = res.timeOffset;
-
-            /**
-             * This is *EXTREMELY* important since we don't control
-             * external references to the authorization key, we must
-             * notify whenever we change it. This is crucial when we
-             * switch to different data centers.
-             */
-            if (this._authKeyCallback) {
-                await this._authKeyCallback(this.authKey, this._dcId);
+        let connected = false;
+        for (let attempt = 0; attempt < this._retries; attempt++) {
+            if (!connected) {
+                connected = await this._tryConnect(attempt);
+                if (!connected) {
+                    continue;
+                }
             }
-        } else {
-            this._authenticated = true;
-            this._log.debug("Already have an auth key ...");
+            if (!this.authKey.getKey()) {
+                try {
+                    if (!(await this._tryGenAuthKey(attempt))) {
+                        continue;
+                    }
+                } catch (err) {
+                    this._log.warn(
+                        `Connection error ${attempt} during auth_key gen`
+                    );
+                    if (this._log.canSend(LogLevel.ERROR)) {
+                        console.error(err);
+                    }
+                    await this._connection!.disconnect();
+                    connected = false;
+                    await sleep(this._delay);
+                    continue;
+                }
+            } else {
+                this._authenticated = true;
+                this._log.debug("Already have an auth key ...");
+            }
+            break;
+        }
+        if (!connected) {
+            throw new Error(
+                `Connection to telegram failed after ${this._retries} time(s)`
+            );
+        }
+        if (!this.authKey.getKey()) {
+            const error = new Error(
+                `auth key generation failed after ${this._retries} time(s)`
+            );
+            await this._disconnect(error);
+            throw error;
         }
+
         this._userConnected = true;
-        this._reconnecting = false;
+
         this._log.debug("Starting receive loop");
         this._recvLoopHandle = this._recvLoop();
 
         this._log.debug("Starting send loop");
         this._sendLoopHandle = this._sendLoop();
 
-        // _disconnected only completes after manual disconnection
-        // or errors after which the sender cannot continue such
-        // as failing to reconnect or any unexpected error.
-
         this._log.info(
             "Connection to %s complete!".replace(
                 "%s",
@@ -382,19 +389,11 @@ export class MTProtoSender {
         );
     }
 
-    async _disconnect(error = null) {
-        this._sendQueue.rejectAll();
-
-        if (this._connection === null) {
+    async _disconnect(error?: Error) {
+        if (!this._connection) {
             this._log.info("Not disconnecting (already have no connection)");
-            return;
-        }
-        if (this._updateCallback) {
-            this._updateCallback(
-                this._client,
-                new UpdateConnectionState(UpdateConnectionState.disconnected)
-            );
         }
+
         this._log.info(
             "Disconnecting from %s...".replace(
                 "%s",
@@ -402,8 +401,43 @@ export class MTProtoSender {
             )
         );
         this._userConnected = false;
-        this._log.debug("Closing current connection...");
-        await this._connection!.disconnect();
+        try {
+            this._log.debug("Closing current connection...");
+            await this._connection!.disconnect();
+        } finally {
+            this._log.debug(
+                `Cancelling ${this._pendingState.size} pending message(s)...`
+            );
+            for (const state of this._pendingState.values()) {
+                if (error && !state.result) {
+                    state.reject(error);
+                } else {
+                    state.reject("disconnected");
+                }
+            }
+
+            this._pendingState.clear();
+            this._cancelLoops();
+            this._log.info(
+                "Disconnecting from %s...".replace(
+                    "%s",
+                    this._connection!.toString()
+                )
+            );
+            this._connection = undefined;
+        }
+    }
+
+    _cancelLoops() {
+        this._cancelSend = true;
+        this._recvCancelResolve!(new CancelHelper());
+        this._sendCancelResolve!(new CancelHelper());
+        this._recvCancelPromise = new Promise((resolve) => {
+            this._recvCancelResolve = resolve;
+        });
+        this._sendCancelPromise = new Promise((resolve) => {
+            this._sendCancelResolve = resolve;
+        });
     }
 
     /**
@@ -414,9 +448,12 @@ export class MTProtoSender {
      * @private
      */
     async _sendLoop() {
-        this._sendQueue = new MessagePacker(this._state, this._log);
-
-        while (this._userConnected && !this._reconnecting) {
+        this._cancelSend = false;
+        while (
+            this._userConnected &&
+            !this._reconnecting &&
+            !this._cancelSend
+        ) {
             if (this._pendingAck.size) {
                 const ack = new RequestState(
                     new Api.MsgsAck({ msgIds: Array(...this._pendingAck) })
@@ -434,17 +471,13 @@ export class MTProtoSender {
             // TODO Wait for the connection send queue to be empty?
             // This means that while it's not empty we can wait for
             // more messages to be added to the send queue.
+
             const res = await this._sendQueue.get();
-            if (this._reconnecting) {
-                this._log.debug("Reconnecting. will stop loop");
-                return;
-            }
 
-            if (!res) {
-                this._log.debug("Empty result. will not stop loop");
-                continue;
-            }
+            // TODO fix later?
+            // @ts-ignore
             let { data } = res;
+            // @ts-ignore
             const { batch } = res;
             this._log.debug(
                 `Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`
@@ -452,13 +485,6 @@ export class MTProtoSender {
 
             data = await this._state.encryptMessageData(data);
 
-            try {
-                await this._connection!.send(data);
-            } catch (e: any) {
-                this._log.error(e);
-                this._log.info("Connection closed while sending data");
-                return;
-            }
             for (const state of batch) {
                 if (!Array.isArray(state)) {
                     if (state.request.classType === "request") {
@@ -472,6 +498,15 @@ export class MTProtoSender {
                     }
                 }
             }
+            try {
+                await this._connection!.send(data);
+            } catch (e: any) {
+                this._log.error(e);
+                this._log.info("Connection closed while sending data");
+                this._startReconnecting(e);
+                return;
+            }
+
             this._log.debug("Encrypted messages put in a queue to be sent");
         }
     }
@@ -483,14 +518,17 @@ export class MTProtoSender {
         while (this._userConnected && !this._reconnecting) {
             this._log.debug("Receiving items from the network...");
             try {
-                body = await this._connection!.recv();
-            } catch (e: any) {
-                /** when the server disconnects us we want to reconnect */
-                if (!this.userDisconnected) {
-                    this._log.error(e);
-                    this._log.warn("Connection closed while receiving data");
-                    this.reconnect();
+                body = await Promise.race([
+                    this._connection!.recv(),
+                    this._recvCancelPromise,
+                ]);
+                if (body instanceof CancelHelper) {
+                    return;
                 }
+            } catch (e: any) {
+                this._log.error(e);
+                this._log.warn("Connection closed while receiving data...");
+                this._startReconnecting(e);
                 return;
             }
             try {
@@ -529,6 +567,7 @@ export class MTProtoSender {
                             // Deletes the current sender from the object
                             this._onConnectionBreak(this._dcId);
                         }
+                        await this._disconnect(e);
                     } else {
                         // this happens sometimes when telegram is having some internal issues.
                         // reconnecting should be enough usually
@@ -536,13 +575,13 @@ export class MTProtoSender {
                         this._log.warn(
                             `Invalid buffer ${e.code} for dc ${this._dcId}`
                         );
-                        this.reconnect();
+                        this._startReconnecting(e);
                     }
                     return;
                 } else {
                     this._log.error("Unhandled error while receiving data");
                     this._log.error(e);
-                    this.reconnect();
+                    this._startReconnecting(e);
                     return;
                 }
             }
@@ -913,53 +952,105 @@ export class MTProtoSender {
      */
     async _handleMsgAll(message: TLMessage) {}
 
-    reconnect() {
-        if (this._userConnected && !this._reconnecting) {
-            this._reconnecting = true;
-            // TODO Should we set this?
-            // this._user_connected = false
-            // we want to wait a second between each reconnect try to not flood the server with reconnects
-            // in case of internal server issues.
-            sleep(1000).then(() => {
-                this._log.info("Started reconnecting");
-                this._reconnect();
-            });
+    async _reconnect(lastError?: any) {
+        this._log.debug("Closing current connection...");
+        await this._connection!.disconnect();
+        this._cancelLoops();
+
+        this._reconnecting = false;
+        this._state.reset();
+        let attempt;
+        let ok = true;
+        for (attempt = 0; attempt < this._retries; attempt++) {
+            try {
+                await this._connect();
+                await sleep(1000);
+                this._sendQueue.extend([...this._pendingState.values()]);
+                this._pendingState.clear();
+                if (this._autoReconnectCallback) {
+                    this._autoReconnectCallback();
+                }
+                break;
+            } catch (err: any) {
+                if (attempt == this._retries - 1) {
+                    ok = false;
+                }
+                if (err instanceof InvalidBufferError) {
+                    if (err.code === 404) {
+                        this._log.warn(
+                            `Broken authorization key for dc ${this._dcId}; resetting`
+                        );
+                        await this.authKey.setKey(undefined);
+                        if (this._authKeyCallback) {
+                            await this._authKeyCallback(undefined);
+                        }
+                        ok = false;
+                        break;
+                    } else {
+                        // this happens sometimes when telegram is having some internal issues.
+                        // since the data we sent and received is probably wrong now.
+                        this._log.warn(
+                            `Invalid buffer ${err.code} for dc ${this._dcId}`
+                        );
+                    }
+                }
+                this._log.error(
+                    `Unexpected exception reconnecting on attempt ${attempt}`
+                );
+                await sleep(this._delay);
+                lastError = err;
+            }
+        }
+        if (!ok) {
+            this._log.error(`Automatic reconnection failed ${attempt} time(s)`);
+            await this._disconnect(lastError ? lastError : undefined);
         }
     }
 
-    async _reconnect() {
-        this._log.debug("Closing current connection...");
+    async _tryConnect(attempt: number) {
         try {
-            await this.disconnect();
-        } catch (err: any) {
-            this._log.warn(err);
+            this._log.debug(`Connection attempt ${attempt}...`);
+            await this._connection!.connect();
+            this._log.debug("Connection success!");
+            return true;
+        } catch (err) {
+            this._log.warn(`Attempt ${attempt} at connecting failed`);
+            if (this._log.canSend(LogLevel.ERROR)) {
+                console.error(err);
+            }
+            await sleep(this._delay);
+            return false;
         }
-        // @ts-ignore
-        this._sendQueue.append(undefined);
-        this._state.reset();
+    }
 
-        // For some reason reusing existing connection caused stuck requests
-        const constructor = this._connection!
-            .constructor as unknown as typeof Connection;
-        const socket = this._connection!.socket.constructor as
-            | typeof PromisedNetSockets
-            | typeof PromisedWebSockets;
-        const newConnection = new constructor({
-            ip: this._connection!._ip,
-            port: this._connection!._port,
-            dcId: this._connection!._dcId,
-            loggers: this._connection!._log,
-            proxy: this._connection!._proxy,
-            testServers: this._connection!._testServers,
-            socket: socket,
-        });
-        await this.connect(newConnection, true);
+    async _tryGenAuthKey(attempt: number) {
+        const plain = new MTProtoPlainSender(this._connection, this._log);
+        try {
+            this._log.debug(`New auth_key attempt ${attempt}...`);
+            this._log.debug("New auth_key attempt ...");
+            const res = await doAuthentication(plain, this._log);
+            this._log.debug("Generated new auth_key successfully");
+            await this.authKey.setKey(res.authKey);
+            this._state.timeOffset = res.timeOffset;
+            if (this._authKeyCallback) {
+                await this._authKeyCallback(this.authKey, this._dcId);
+            }
+            this._log.debug("auth_key generation success!");
+            return true;
+        } catch (err) {
+            this._log.warn(`Attempt ${attempt} at generating auth key failed`);
+            if (this._log.canSend(LogLevel.ERROR)) {
+                console.error(err);
+            }
+            return false;
+        }
+    }
 
-        this._reconnecting = false;
-        this._sendQueue.extend(Array.from(this._pendingState.values()));
-        this._pendingState = new Map<string, RequestState>();
-        if (this._autoReconnectCallback) {
-            this._autoReconnectCallback();
+    private _startReconnecting(error: Error) {
+        this._log.info(`Starting reconnect...`);
+        if (this._userConnected && !this._reconnecting) {
+            this._reconnecting = true;
+            this._reconnect(error);
         }
     }
 }

+ 60 - 21
gramjs/network/connection/Connection.ts

@@ -1,4 +1,5 @@
 import {
+    CancelHelper,
     Logger,
     PromisedNetSockets,
     PromisedWebSockets,
@@ -36,13 +37,18 @@ class Connection {
     _dcId: number;
     _log: Logger;
     _proxy?: ProxyInterface;
-    private _connected: boolean;
+    _connected: boolean;
     private _sendTask?: Promise<void>;
     private _recvTask?: Promise<void>;
     protected _codec: any;
     protected _obfuscation: any;
-    private readonly _sendArray: AsyncQueue;
-    private _recvArray: AsyncQueue;
+    _sendArray: AsyncQueue;
+    _recvArray: AsyncQueue;
+    private _recvCancelPromise: Promise<CancelHelper>;
+    private _recvCancelResolve?: (value: CancelHelper) => void;
+    private _sendCancelPromise: Promise<CancelHelper>;
+    private _sendCancelResolve?: (value: CancelHelper) => void;
+
     socket: PromisedNetSockets | PromisedWebSockets;
     public _testServers: boolean;
 
@@ -69,6 +75,13 @@ class Connection {
         this._recvArray = new AsyncQueue();
         this.socket = new socket(proxy);
         this._testServers = testServers;
+
+        this._recvCancelPromise = new Promise((resolve) => {
+            this._recvCancelResolve = resolve;
+        });
+        this._sendCancelPromise = new Promise((resolve) => {
+            this._sendCancelResolve = resolve;
+        });
     }
 
     async _connect() {
@@ -88,16 +101,30 @@ class Connection {
         this._recvTask = this._recvLoop();
     }
 
+    _cancelLoops() {
+        this._recvCancelResolve!(new CancelHelper());
+        this._sendCancelResolve!(new CancelHelper());
+        this._recvCancelPromise = new Promise((resolve) => {
+            this._recvCancelResolve = resolve;
+        });
+        this._sendCancelPromise = new Promise((resolve) => {
+            this._sendCancelResolve = resolve;
+        });
+    }
+
     async disconnect() {
         this._connected = false;
-        await this._recvArray.push(undefined);
-        await this.socket.close();
+        this._cancelLoops();
+
+        try {
+            await this.socket.close();
+        } catch (e) {
+            this._log.error("error while closing socket connection");
+        }
     }
 
     async send(data: Buffer) {
         if (!this._connected) {
-            // this will stop the current loop
-            await this._sendArray.push(undefined);
             throw new Error("Not connected");
         }
         await this._sendArray.push(data);
@@ -106,7 +133,6 @@ class Connection {
     async recv() {
         while (this._connected) {
             const result = await this._recvArray.pop();
-            // undefined = sentinel value = keep trying
             if (result && result.length) {
                 return result;
             }
@@ -115,18 +141,23 @@ class Connection {
     }
 
     async _sendLoop() {
-        // TODO handle errors
         try {
             while (this._connected) {
-                const data = await this._sendArray.pop();
+                const data = await Promise.race([
+                    this._sendCancelPromise,
+                    this._sendArray.pop(),
+                ]);
+                if (data instanceof CancelHelper) {
+                    break;
+                }
                 if (!data) {
-                    this._sendTask = undefined;
-                    return;
+                    continue;
                 }
                 await this._send(data);
             }
         } catch (e: any) {
             this._log.info("The server closed the connection while sending");
+            await this.disconnect();
         }
     }
 
@@ -134,18 +165,26 @@ class Connection {
         let data;
         while (this._connected) {
             try {
-                data = await this._recv();
-                if (!data) {
-                    throw new Error("no data received");
+                data = await Promise.race([
+                    this._recvCancelPromise,
+                    await this._recv(),
+                ]);
+                if (data instanceof CancelHelper) {
+                    return;
                 }
             } catch (e: any) {
-                this._log.info("connection closed");
-                //await this._recvArray.push()
-
-                this.disconnect();
-                return;
+                this._log.info("The server closed the connection");
+                await this.disconnect();
+                if (!this._recvArray._queue.length) {
+                    await this._recvArray.push(undefined);
+                }
+                break;
+            }
+            try {
+                await this._recvArray.push(data);
+            } catch (e) {
+                break;
             }
-            await this._recvArray.push(data);
         }
     }
 

+ 1 - 1
gramjs/sessions/StringSession.ts

@@ -5,7 +5,7 @@ import { AuthKey } from "../crypto/AuthKey";
 const CURRENT_VERSION = "1";
 
 export class StringSession extends MemorySession {
-    private readonly _key?: Buffer;
+    _key?: Buffer;
 
     /**
      * This session file can be easily saved and loaded as a string. According

+ 2 - 2
package-lock.json

@@ -1,12 +1,12 @@
 {
   "name": "telegram",
-  "version": "2.11.3",
+  "version": "2.12.0",
   "lockfileVersion": 2,
   "requires": true,
   "packages": {
     "": {
       "name": "telegram",
-      "version": "2.11.3",
+      "version": "2.12.0",
       "license": "MIT",
       "dependencies": {
         "@cryptography/aes": "^0.1.1",

+ 4 - 4
package.json

@@ -1,6 +1,6 @@
 {
   "name": "telegram",
-  "version": "2.11.5",
+  "version": "2.12.0",
   "description": "NodeJS/Browser MTProto API Telegram client library,",
   "main": "index.js",
   "types": "index.d.ts",
@@ -59,12 +59,12 @@
     "buffer": "^6.0.3",
     "htmlparser2": "^6.1.0",
     "mime": "^3.0.0",
+    "node-localstorage": "^2.2.1",
     "pako": "^2.0.3",
     "path-browserify": "^1.0.1",
+    "socks": "^2.6.2",
     "store2": "^2.13.0",
     "ts-custom-error": "^3.2.0",
-    "websocket": "^1.0.34",
-    "node-localstorage": "^2.2.1",
-    "socks": "^2.6.2"
+    "websocket": "^1.0.34"
   }
 }