Browse Source

Update reconnect logic

Painor 1 year ago
parent
commit
1afe603521

+ 1 - 1
gramjs/Helpers.ts

@@ -417,7 +417,7 @@ export function getMinBigInt(
  * @param max
  * @param max
  * @returns {number}
  * @returns {number}
  */
  */
-export function getRandomInt(min: number, max: number) {
+export function getRandomInt(min: number, max: number): number {
     min = Math.ceil(min);
     min = Math.ceil(min);
     max = Math.floor(max);
     max = Math.floor(max);
     return Math.floor(Math.random() * (max - min + 1)) + min;
     return Math.floor(Math.random() * (max - min + 1)) + min;

+ 1 - 1
gramjs/Version.ts

@@ -1 +1 @@
-export const version = "2.17.8";
+export const version = "2.18.4";

+ 15 - 3
gramjs/client/TelegramClient.ts

@@ -1350,8 +1350,9 @@ export class TelegramClient extends TelegramBaseClient {
     //endregion
     //endregion
     /** @hidden */
     /** @hidden */
     async _handleReconnect() {
     async _handleReconnect() {
+        this._log.info("Handling reconnect!");
         try {
         try {
-            await this.getMe();
+            const res = await this.getMe();
         } catch (e) {
         } catch (e) {
             this._log.error(`Error while trying to reconnect`);
             this._log.error(`Error while trying to reconnect`);
             if (this._log.canSend(LogLevel.ERROR)) {
             if (this._log.canSend(LogLevel.ERROR)) {
@@ -1390,7 +1391,11 @@ export class TelegramClient extends TelegramBaseClient {
             socket: this.networkSocket,
             socket: this.networkSocket,
             testServers: this.testServers,
             testServers: this.testServers,
         });
         });
-        if (!(await this._sender.connect(connection))) {
+        if (!(await this._sender.connect(connection, false))) {
+            if (!this._loopStarted) {
+                _updateLoop(this);
+                this._loopStarted = true;
+            }
             return;
             return;
         }
         }
         this.session.setAuthKey(this._sender.authKey);
         this.session.setAuthKey(this._sender.authKey);
@@ -1404,7 +1409,12 @@ export class TelegramClient extends TelegramBaseClient {
                 query: this._initRequest,
                 query: this._initRequest,
             })
             })
         );
         );
-        _updateLoop(this);
+        if (!this._loopStarted) {
+            _updateLoop(this);
+            this._loopStarted = true;
+        }
+        this._connectedDeferred.resolve();
+        this._isSwitchingDc = false;
     }
     }
     //endregion
     //endregion
     // region Working with different connections/Data Centers
     // region Working with different connections/Data Centers
@@ -1418,7 +1428,9 @@ export class TelegramClient extends TelegramBaseClient {
         await this._sender!.authKey.setKey(undefined);
         await this._sender!.authKey.setKey(undefined);
         this.session.setAuthKey(undefined);
         this.session.setAuthKey(undefined);
         this.session.save();
         this.session.save();
+        this._isSwitchingDc = true;
         await this._disconnect();
         await this._disconnect();
+        this._sender = undefined;
         return await this.connect();
         return await this.connect();
     }
     }
 
 

+ 6 - 2
gramjs/client/dialogs.ts

@@ -111,9 +111,13 @@ export class _DialogsIter extends RequestIter {
         for (const m of r.messages) {
         for (const m of r.messages) {
             let message = m as unknown as Api.Message;
             let message = m as unknown as Api.Message;
             try {
             try {
-                // todo make sure this never fails
-                message._finishInit(this.client, entities, undefined);
+                if (message && "_finishInit" in message) {
+                    // todo make sure this never fails
+                    message._finishInit(this.client, entities, undefined);
+                }
             } catch (e) {
             } catch (e) {
+                console.log("msg", message);
+
                 this.client._log.error(
                 this.client._log.error(
                     "Got error while trying to finish init message with id " +
                     "Got error while trying to finish init message with id " +
                         m.id
                         m.id

+ 1 - 0
gramjs/client/messages.ts

@@ -794,6 +794,7 @@ export async function sendMessage(
                 "The message cannot be empty unless a file is provided"
                 "The message cannot be empty unless a file is provided"
             );
             );
         }
         }
+
         request = new Api.messages.SendMessage({
         request = new Api.messages.SendMessage({
             peer: entity,
             peer: entity,
             message: message.toString(),
             message: message.toString(),

+ 32 - 20
gramjs/client/telegramBaseClient.ts

@@ -23,6 +23,8 @@ import {
 import { Semaphore } from "async-mutex";
 import { Semaphore } from "async-mutex";
 import { LogLevel } from "../extensions/Logger";
 import { LogLevel } from "../extensions/Logger";
 import { isBrowser, isNode } from "../platform";
 import { isBrowser, isNode } from "../platform";
+import Deferred from "../extensions/Deferred";
+import Timeout = NodeJS.Timeout;
 
 
 const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec
 const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec
 const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
 const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
@@ -221,6 +223,8 @@ export abstract class TelegramBaseClient {
     /** @hidden */
     /** @hidden */
     _destroyed: boolean;
     _destroyed: boolean;
     /** @hidden */
     /** @hidden */
+    _isSwitchingDc: boolean;
+    /** @hidden */
     protected _proxy?: ProxyInterface;
     protected _proxy?: ProxyInterface;
     /** @hidden */
     /** @hidden */
     _semaphore: Semaphore;
     _semaphore: Semaphore;
@@ -230,6 +234,7 @@ export abstract class TelegramBaseClient {
     public testServers: boolean;
     public testServers: boolean;
     /** @hidden */
     /** @hidden */
     public networkSocket: typeof PromisedNetSockets | typeof PromisedWebSockets;
     public networkSocket: typeof PromisedNetSockets | typeof PromisedWebSockets;
+    _connectedDeferred: Deferred<void>;
 
 
     constructor(
     constructor(
         session: string | Session,
         session: string | Session,
@@ -315,6 +320,8 @@ export abstract class TelegramBaseClient {
         this._loopStarted = false;
         this._loopStarted = false;
         this._reconnecting = false;
         this._reconnecting = false;
         this._destroyed = false;
         this._destroyed = false;
+        this._isSwitchingDc = false;
+        this._connectedDeferred = new Deferred();
 
 
         // parse mode
         // parse mode
         this._parseMode = MarkdownParser;
         this._parseMode = MarkdownParser;
@@ -354,27 +361,31 @@ export abstract class TelegramBaseClient {
     async disconnect() {
     async disconnect() {
         await this._disconnect();
         await this._disconnect();
         await Promise.all(
         await Promise.all(
-            Object.values(this._exportedSenderPromises).map(
-                (promise: Promise<MTProtoSender>) => {
-                    return (
-                        promise &&
-                        promise.then((sender: MTProtoSender) => {
-                            if (sender) {
-                                return sender.disconnect();
-                            }
-                            return undefined;
-                        })
-                    );
-                }
-            )
+            Object.values(this._exportedSenderPromises)
+                .map((promises) => {
+                    return Object.values(promises).map((promise: any) => {
+                        return (
+                            promise &&
+                            promise.then((sender: MTProtoSender) => {
+                                if (sender) {
+                                    return sender.disconnect();
+                                }
+                                return undefined;
+                            })
+                        );
+                    });
+                })
+                .flat()
         );
         );
 
 
-        this._exportedSenderPromises = new Map<
-            number,
-            Promise<MTProtoSender>
-        >();
-
-        // TODO cancel hanging promises
+        Object.values(this._exportedSenderReleaseTimeouts).forEach(
+            (timeouts) => {
+                Object.values(timeouts).forEach((releaseTimeout: any) => {
+                    clearTimeout(releaseTimeout);
+                });
+            }
+        );
+        this._exportedSenderPromises.clear();
     }
     }
 
 
     get disconnected() {
     get disconnected() {
@@ -435,7 +446,8 @@ export abstract class TelegramBaseClient {
                         proxy: this._proxy,
                         proxy: this._proxy,
                         testServers: this.testServers,
                         testServers: this.testServers,
                         socket: this.networkSocket,
                         socket: this.networkSocket,
-                    })
+                    }),
+                    false
                 );
                 );
 
 
                 if (this.session.dcId !== dcId && !sender._authenticated) {
                 if (this.session.dcId !== dcId && !sender._authenticated) {

+ 96 - 27
gramjs/client/updates.ts

@@ -1,18 +1,25 @@
 import type { EventBuilder } from "../events/common";
 import type { EventBuilder } from "../events/common";
 import { Api } from "../tl";
 import { Api } from "../tl";
 import type { TelegramClient } from "../";
 import type { TelegramClient } from "../";
-import bigInt from "big-integer";
 import { UpdateConnectionState } from "../network";
 import { UpdateConnectionState } from "../network";
 import type { Raw } from "../events";
 import type { Raw } from "../events";
 import { utils } from "../index";
 import { utils } from "../index";
 import { getRandomInt, returnBigInt, sleep } from "../Helpers";
 import { getRandomInt, returnBigInt, sleep } from "../Helpers";
 import { LogLevel } from "../extensions/Logger";
 import { LogLevel } from "../extensions/Logger";
+import Timeout = NodeJS.Timeout;
 
 
 const PING_INTERVAL = 9000; // 9 sec
 const PING_INTERVAL = 9000; // 9 sec
 const PING_TIMEOUT = 10000; // 10 sec
 const PING_TIMEOUT = 10000; // 10 sec
 const PING_FAIL_ATTEMPTS = 3;
 const PING_FAIL_ATTEMPTS = 3;
 const PING_FAIL_INTERVAL = 100; // ms
 const PING_FAIL_INTERVAL = 100; // ms
 const PING_DISCONNECT_DELAY = 60000; // 1 min
 const PING_DISCONNECT_DELAY = 60000; // 1 min
+// An unusually long interval is a sign of returning from background mode...
+const PING_INTERVAL_TO_WAKE_UP = 5000; // 5 sec
+// ... so we send a quick "wake-up" ping to confirm than connection was dropped ASAP
+const PING_WAKE_UP_TIMEOUT = 3000; // 3 sec
+// We also send a warning to the user even a bit more quickly
+const PING_WAKE_UP_WARNING_TIMEOUT = 1000; // 1 sec
+
 /**
 /**
  If this exception is raised in any of the handlers for a given event,
  If this exception is raised in any of the handlers for a given event,
  it will stop the execution of all other registered event handlers.
  it will stop the execution of all other registered event handlers.
@@ -167,7 +174,9 @@ export async function _dispatchUpdate(
                     if (e instanceof StopPropagation) {
                     if (e instanceof StopPropagation) {
                         break;
                         break;
                     }
                     }
-                    console.error(e);
+                    if (client._log.canSend(LogLevel.ERROR)) {
+                        console.error(e);
+                    }
                 }
                 }
             }
             }
         }
         }
@@ -175,38 +184,92 @@ export async function _dispatchUpdate(
 }
 }
 
 
 /** @hidden */
 /** @hidden */
-export async function _updateLoop(client: TelegramClient): Promise<void> {
-    while (client.connected) {
+export async function _updateLoop(client: TelegramClient) {
+    let lastPongAt;
+    while (!client._destroyed) {
+        await sleep(PING_INTERVAL);
+        if (client._sender!.isReconnecting || client._isSwitchingDc) {
+            lastPongAt = undefined;
+            continue;
+        }
+
         try {
         try {
-            await sleep(60 * 1000);
-            if (!client._sender?._transportConnected()) {
+            const ping = () => {
+                return client._sender!.send(
+                    new Api.PingDelayDisconnect({
+                        pingId: returnBigInt(
+                            getRandomInt(
+                                Number.MIN_SAFE_INTEGER,
+                                Number.MAX_SAFE_INTEGER
+                            )
+                        ),
+                        disconnectDelay: PING_DISCONNECT_DELAY,
+                    })
+                );
+            };
+
+            const pingAt = Date.now();
+            const lastInterval = lastPongAt ? pingAt - lastPongAt : undefined;
+
+            if (!lastInterval || lastInterval < PING_INTERVAL_TO_WAKE_UP) {
+                await attempts(
+                    () => timeout(ping, PING_TIMEOUT),
+                    PING_FAIL_ATTEMPTS,
+                    PING_FAIL_INTERVAL
+                );
+            } else {
+                let wakeUpWarningTimeout: Timeout | undefined = setTimeout(
+                    () => {
+                        _handleUpdate(
+                            client,
+                            UpdateConnectionState.disconnected
+                        );
+                        wakeUpWarningTimeout = undefined;
+                    },
+                    PING_WAKE_UP_WARNING_TIMEOUT
+                );
+
+                await timeout(ping, PING_WAKE_UP_TIMEOUT);
+
+                if (wakeUpWarningTimeout) {
+                    clearTimeout(wakeUpWarningTimeout);
+                    wakeUpWarningTimeout = undefined;
+                }
+
+                _handleUpdate(client, UpdateConnectionState.connected);
+            }
+
+            lastPongAt = Date.now();
+        } catch (err) {
+            // eslint-disable-next-line no-console
+            if (client._log.canSend(LogLevel.ERROR)) {
+                console.error(err);
+            }
+
+            lastPongAt = undefined;
+
+            if (client._sender!.isReconnecting || client._isSwitchingDc) {
                 continue;
                 continue;
             }
             }
-            await client.invoke(
-                new Api.Ping({
-                    pingId: bigInt(
-                        getRandomInt(
-                            Number.MIN_SAFE_INTEGER,
-                            Number.MAX_SAFE_INTEGER
-                        )
-                    ),
-                })
-            );
-        } catch (e) {
-            return;
+            console.log("recoonecting");
+            client._sender!.reconnect();
         }
         }
-        client.session.save();
-        if (
-            new Date().getTime() - (client._lastRequest || 0) >
-            30 * 60 * 1000
-        ) {
+
+        // We need to send some content-related request at least hourly
+        // for Telegram to keep delivering updates, otherwise they will
+        // just stop even if we're connected. Do so every 30 minutes.
+
+        if (Date.now() - (client._lastRequest || 0) > 30 * 60 * 1000) {
             try {
             try {
                 await client.invoke(new Api.updates.GetState());
                 await client.invoke(new Api.updates.GetState());
             } catch (e) {
             } catch (e) {
                 // we don't care about errors here
                 // we don't care about errors here
             }
             }
+
+            lastPongAt = undefined;
         }
         }
     }
     }
+    await client.disconnect();
 }
 }
 
 
 /** @hidden */
 /** @hidden */
@@ -227,9 +290,15 @@ async function attempts(cb: CallableFunction, times: number, pause: number) {
 }
 }
 
 
 /** @hidden */
 /** @hidden */
-function timeout(promise: Promise<any>, ms: number) {
+function timeout(cb: any, ms: any) {
+    let isResolved = false;
+
     return Promise.race([
     return Promise.race([
-        promise,
-        sleep(ms).then(() => Promise.reject(new Error("TIMEOUT"))),
-    ]);
+        cb(),
+        sleep(ms).then(() =>
+            isResolved ? undefined : Promise.reject(new Error("TIMEOUT"))
+        ),
+    ]).finally(() => {
+        isResolved = true;
+    });
 }
 }

+ 2 - 0
gramjs/client/users.ts

@@ -35,6 +35,8 @@ export async function invoke<R extends Api.AnyRequest>(
         );
         );
     }
     }
 
 
+    await client._connectedDeferred.promise;
+
     await request.resolve(client, utils);
     await request.resolve(client, utils);
     client._lastRequest = new Date().getTime();
     client._lastRequest = new Date().getTime();
     let attempt: number;
     let attempt: number;

+ 22 - 0
gramjs/extensions/Deferred.ts

@@ -0,0 +1,22 @@
+export default class Deferred<T = void> {
+    promise: Promise<T>;
+
+    reject!: (reason?: any) => void;
+
+    resolve!: (value: T | PromiseLike<T>) => void;
+
+    constructor() {
+        this.promise = new Promise((resolve, reject) => {
+            this.reject = reject;
+            this.resolve = resolve;
+        });
+    }
+
+    static resolved(): Deferred<void>;
+    static resolved<T>(value: T): Deferred<T>;
+    static resolved<T>(value?: T): Deferred<T | void> {
+        const deferred = new Deferred<T | void>();
+        deferred.resolve(value);
+        return deferred;
+    }
+}

+ 88 - 16
gramjs/extensions/MessagePacker.ts

@@ -3,6 +3,13 @@ import { TLMessage } from "../tl/core";
 import { BinaryWriter } from "./BinaryWriter";
 import { BinaryWriter } from "./BinaryWriter";
 import type { MTProtoState } from "../network/MTProtoState";
 import type { MTProtoState } from "../network/MTProtoState";
 import type { RequestState } from "../network/RequestState";
 import type { RequestState } from "../network/RequestState";
+const USE_INVOKE_AFTER_WITH = new Set([
+    "messages.SendMessage",
+    "messages.SendMedia",
+    "messages.SendMultiMedia",
+    "messages.ForwardMessages",
+    "messages.SendInlineBotResult",
+]);
 
 
 export class MessagePacker {
 export class MessagePacker {
     private _state: MTProtoState;
     private _state: MTProtoState;
@@ -27,27 +34,96 @@ export class MessagePacker {
         return this._queue;
         return this._queue;
     }
     }
 
 
-    append(state: RequestState) {
-        this._queue.push(state);
+    append(state?: RequestState, setReady = true, atStart = false) {
+        // We need to check if there is already a `USE_INVOKE_AFTER_WITH` request
+        if (state && USE_INVOKE_AFTER_WITH.has(state.request.className)) {
+            if (atStart) {
+                // Assign `after` for the previously first `USE_INVOKE_AFTER_WITH` request
+                for (let i = 0; i < this._queue.length; i++) {
+                    if (
+                        USE_INVOKE_AFTER_WITH.has(
+                            this._queue[i]?.request.className
+                        )
+                    ) {
+                        this._queue[i].after = state;
+                        break;
+                    }
+                }
+            } else {
+                // Assign after for the previous `USE_INVOKE_AFTER_WITH` request
+                for (let i = this._queue.length - 1; i >= 0; i--) {
+                    if (
+                        USE_INVOKE_AFTER_WITH.has(
+                            this._queue[i]?.request.className
+                        )
+                    ) {
+                        state.after = this._queue[i];
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (atStart) {
+            this._queue.unshift(state);
+        } else {
+            this._queue.push(state);
+        }
+
+        if (setReady && this.setReady) {
+            this.setReady(true);
+        }
+
+        // 1658238041=MsgsAck, we don't care about MsgsAck here because they never resolve anyway.
+        if (state && state.request.CONSTRUCTOR_ID !== 1658238041) {
+            this._pendingStates.push(state);
+            state
+                .promise! // Using finally causes triggering `unhandledrejection` event
+                .catch(() => {})
+                .finally(() => {
+                    this._pendingStates = this._pendingStates.filter(
+                        (s) => s !== state
+                    );
+                });
+        }
+    }
 
 
+    prepend(states: RequestState[]) {
+        states.reverse().forEach((state) => {
+            this.append(state, false, true);
+        });
         if (this.setReady) {
         if (this.setReady) {
             this.setReady(true);
             this.setReady(true);
         }
         }
     }
     }
 
 
     extend(states: RequestState[]) {
     extend(states: RequestState[]) {
-        for (const state of states) {
-            this.append(state);
+        states.forEach((state) => {
+            this.append(state, false);
+        });
+        if (this.setReady) {
+            this.setReady(true);
         }
         }
     }
     }
+    clear() {
+        this._queue = [];
+        this.append(undefined);
+    }
 
 
-    async get() {
+    async wait() {
         if (!this._queue.length) {
         if (!this._queue.length) {
             this._ready = new Promise((resolve) => {
             this._ready = new Promise((resolve) => {
                 this.setReady = resolve;
                 this.setReady = resolve;
             });
             });
             await this._ready;
             await this._ready;
         }
         }
+    }
+
+    async get() {
+        if (!this._queue[this._queue.length - 1]) {
+            this._queue = this._queue.filter(Boolean);
+            return undefined;
+        }
 
 
         let data;
         let data;
         let buffer = new BinaryWriter(Buffer.alloc(0));
         let buffer = new BinaryWriter(Buffer.alloc(0));
@@ -59,12 +135,19 @@ export class MessagePacker {
             batch.length <= MessageContainer.MAXIMUM_LENGTH
             batch.length <= MessageContainer.MAXIMUM_LENGTH
         ) {
         ) {
             const state = this._queue.shift();
             const state = this._queue.shift();
+            if (!state) {
+                continue;
+            }
+
             size += state.data.length + TLMessage.SIZE_OVERHEAD;
             size += state.data.length + TLMessage.SIZE_OVERHEAD;
             if (size <= MessageContainer.MAXIMUM_SIZE) {
             if (size <= MessageContainer.MAXIMUM_SIZE) {
                 let afterId;
                 let afterId;
                 if (state.after) {
                 if (state.after) {
                     afterId = state.after.msgId;
                     afterId = state.after.msgId;
                 }
                 }
+                if (state.after) {
+                    afterId = state.after.msgId;
+                }
                 state.msgId = await this._state.writeDataAsMessage(
                 state.msgId = await this._state.writeDataAsMessage(
                     buffer,
                     buffer,
                     state.data,
                     state.data,
@@ -114,15 +197,4 @@ export class MessagePacker {
         data = buffer.getValue();
         data = buffer.getValue();
         return { batch, data };
         return { batch, data };
     }
     }
-    rejectAll() {
-        this._pendingStates.forEach((requestState) => {
-            requestState.reject(
-                new Error(
-                    "Disconnect (caused from " +
-                        requestState?.request?.className +
-                        ")"
-                )
-            );
-        });
-    }
 }
 }

+ 35 - 0
gramjs/extensions/PendingState.ts

@@ -0,0 +1,35 @@
+import { RequestState } from "../network/RequestState";
+import bigInt from "big-integer";
+
+export class PendingState {
+    _pending: Map<string, RequestState>;
+    constructor() {
+        this._pending = new Map();
+    }
+
+    set(msgId: bigInt.BigInteger, state: RequestState) {
+        this._pending.set(msgId.toString(), state);
+    }
+
+    get(msgId: bigInt.BigInteger) {
+        return this._pending.get(msgId.toString());
+    }
+
+    getAndDelete(msgId: bigInt.BigInteger) {
+        const state = this.get(msgId);
+        this.delete(msgId);
+        return state;
+    }
+
+    values() {
+        return Array.from(this._pending.values());
+    }
+
+    delete(msgId: bigInt.BigInteger) {
+        this._pending.delete(msgId.toString());
+    }
+
+    clear() {
+        this._pending.clear();
+    }
+}

+ 288 - 337
gramjs/network/MTProtoSender.ts

@@ -14,8 +14,7 @@
 import { AuthKey } from "../crypto/AuthKey";
 import { AuthKey } from "../crypto/AuthKey";
 import { MTProtoState } from "./MTProtoState";
 import { MTProtoState } from "./MTProtoState";
 
 
-import { BinaryReader, Logger } from "../extensions";
-import { MessagePacker } from "../extensions";
+import { BinaryReader, Logger, MessagePacker } from "../extensions";
 import { GZIPPacked, MessageContainer, RPCResult, TLMessage } from "../tl/core";
 import { GZIPPacked, MessageContainer, RPCResult, TLMessage } from "../tl/core";
 import { Api } from "../tl";
 import { Api } from "../tl";
 import bigInt from "big-integer";
 import bigInt from "big-integer";
@@ -25,20 +24,19 @@ import { doAuthentication } from "./Authenticator";
 import { MTProtoPlainSender } from "./MTProtoPlainSender";
 import { MTProtoPlainSender } from "./MTProtoPlainSender";
 import {
 import {
     BadMessageError,
     BadMessageError,
-    TypeNotFoundError,
     InvalidBufferError,
     InvalidBufferError,
-    SecurityError,
+    RPCError,
     RPCMessageToError,
     RPCMessageToError,
+    SecurityError,
+    TypeNotFoundError,
 } from "../errors";
 } from "../errors";
 import { Connection, UpdateConnectionState } from "./";
 import { Connection, UpdateConnectionState } from "./";
 import type { TelegramClient } from "..";
 import type { TelegramClient } from "..";
 import { LogLevel } from "../extensions/Logger";
 import { LogLevel } from "../extensions/Logger";
 import { Mutex } from "async-mutex";
 import { Mutex } from "async-mutex";
-import {
-    pseudoCancellable,
-    CancellablePromise,
-    Cancellation,
-} from "real-cancellable-promise";
+import { CancellablePromise } from "real-cancellable-promise";
+import { PendingState } from "../extensions/PendingState";
+import MsgsAck = Api.MsgsAck;
 
 
 interface DEFAULT_OPTIONS {
 interface DEFAULT_OPTIONS {
     logger: any;
     logger: any;
@@ -80,7 +78,7 @@ export class MTProtoSender {
     private _connectTimeout: null;
     private _connectTimeout: null;
     private _autoReconnect: boolean;
     private _autoReconnect: boolean;
     private readonly _authKeyCallback: any;
     private readonly _authKeyCallback: any;
-    private readonly _updateCallback: (
+    public _updateCallback: (
         client: TelegramClient,
         client: TelegramClient,
         update: UpdateConnectionState
         update: UpdateConnectionState
     ) => void;
     ) => void;
@@ -88,6 +86,7 @@ export class MTProtoSender {
     private readonly _senderCallback: any;
     private readonly _senderCallback: any;
     private readonly _isMainSender: boolean;
     private readonly _isMainSender: boolean;
     _userConnected: boolean;
     _userConnected: boolean;
+    isReconnecting: boolean;
     _reconnecting: boolean;
     _reconnecting: boolean;
     _disconnected: boolean;
     _disconnected: boolean;
     private _sendLoopHandle: any;
     private _sendLoopHandle: any;
@@ -95,7 +94,7 @@ export class MTProtoSender {
     readonly authKey: AuthKey;
     readonly authKey: AuthKey;
     private readonly _state: MTProtoState;
     private readonly _state: MTProtoState;
     private _sendQueue: MessagePacker;
     private _sendQueue: MessagePacker;
-    private _pendingState: Map<string, RequestState>;
+    private _pendingState: PendingState;
     private readonly _pendingAck: Set<any>;
     private readonly _pendingAck: Set<any>;
     private readonly _lastAcks: any[];
     private readonly _lastAcks: any[];
     private readonly _handlers: any;
     private readonly _handlers: any;
@@ -151,6 +150,8 @@ export class MTProtoSender {
         this.isConnecting = false;
         this.isConnecting = false;
         this._authenticated = false;
         this._authenticated = false;
         this._userConnected = false;
         this._userConnected = false;
+        this.isReconnecting = false;
+
         this._reconnecting = false;
         this._reconnecting = false;
         this._disconnected = true;
         this._disconnected = true;
 
 
@@ -179,7 +180,7 @@ export class MTProtoSender {
         /**
         /**
          * Sent states are remembered until a response is received.
          * Sent states are remembered until a response is received.
          */
          */
-        this._pendingState = new Map<string, any>();
+        this._pendingState = new PendingState();
 
 
         /**
         /**
          * Responses must be acknowledged, and we can also batch these.
          * Responses must be acknowledged, and we can also batch these.
@@ -240,20 +241,48 @@ export class MTProtoSender {
     /**
     /**
      * Connects to the specified given connection using the given auth key.
      * Connects to the specified given connection using the given auth key.
      */
      */
-    async connect(connection: Connection): Promise<boolean> {
-        const release = await this._connectMutex.acquire();
-        try {
-            if (this._userConnected) {
-                this._log.info("User is already connected!");
-                return false;
+    async connect(connection: Connection, force: boolean): Promise<boolean> {
+        this.userDisconnected = false;
+        if (this._userConnected && !force) {
+            this._log.info("User is already connected!");
+            return false;
+        }
+        this.isConnecting = true;
+        this._connection = connection;
+
+        for (let attempt = 0; attempt < this._retries; attempt++) {
+            try {
+                await this._connect();
+                if (this._updateCallback) {
+                    this._updateCallback(
+                        this._client,
+                        new UpdateConnectionState(
+                            UpdateConnectionState.connected
+                        )
+                    );
+                }
+                break;
+            } catch (err) {
+                if (this._updateCallback && attempt === 0) {
+                    this._updateCallback(
+                        this._client,
+                        new UpdateConnectionState(
+                            UpdateConnectionState.disconnected
+                        )
+                    );
+                }
+                this._log.error(
+                    `WebSocket connection failed attempt: ${attempt + 1}`
+                );
+                if (this._log.canSend(LogLevel.ERROR)) {
+                    console.error(err);
+                }
+                await sleep(this._delay);
             }
             }
-            this._connection = connection;
-            await this._connect();
-            this._userConnected = true;
-            return true;
-        } finally {
-            release();
         }
         }
+        this.isConnecting = false;
+
+        return true;
     }
     }
 
 
     isConnected() {
     isConnected() {
@@ -273,14 +302,9 @@ export class MTProtoSender {
      * all pending requests, and closes the send and receive loops.
      * all pending requests, and closes the send and receive loops.
      */
      */
     async disconnect() {
     async disconnect() {
-        const release = await this._connectMutex.acquire();
-        try {
-            await this._disconnect();
-        } catch (e: any) {
-            this._log.error(e);
-        } finally {
-            release();
-        }
+        this.userDisconnected = true;
+        this._log.warn("Disconnecting...");
+        await this._disconnect();
     }
     }
 
 
     /**
     /**
@@ -308,17 +332,17 @@ export class MTProtoSender {
      * @param request
      * @param request
      * @returns {RequestState}
      * @returns {RequestState}
      */
      */
-    send(request: Api.AnyRequest): any {
-        if (!this._userConnected) {
-            throw new Error(
-                "Cannot send requests while disconnected. You need to call .connect()"
-            );
-        }
+    send(request: Api.AnyRequest) {
         const state = new RequestState(request);
         const state = new RequestState(request);
+        this._log.debug(`Send ${request.className}`);
         this._sendQueue.append(state);
         this._sendQueue.append(state);
         return state.promise;
         return state.promise;
     }
     }
 
 
+    addStateToQueue(state: RequestState) {
+        this._sendQueue.append(state);
+    }
+
     /**
     /**
      * Performs the actual connection, retrying, generating the
      * Performs the actual connection, retrying, generating the
      * authorization key if necessary, and starting the send and
      * authorization key if necessary, and starting the send and
@@ -327,109 +351,73 @@ export class MTProtoSender {
      * @private
      * @private
      */
      */
     async _connect() {
     async _connect() {
-        this._log.info(
-            "Connecting to {0} using {1}"
-                .replace("{0}", this._connection!.toString())
-                .replace("{1}", this._connection!.socket.toString())
-        );
-        let connected = false;
-        for (let attempt = 0; attempt < this._retries; attempt++) {
-            if (!connected) {
-                connected = await this._tryConnect(attempt);
-                if (!connected) {
-                    continue;
-                }
-            }
-            if (!this.authKey.getKey()) {
-                try {
-                    if (!(await this._tryGenAuthKey(attempt))) {
-                        continue;
-                    }
-                } catch (err) {
-                    this._log.warn(
-                        `Connection error ${attempt} during auth_key gen`
-                    );
-                    if (this._log.canSend(LogLevel.ERROR)) {
-                        console.error(err);
-                    }
-                    await this._connection!.disconnect();
-                    connected = false;
-                    await sleep(this._delay);
-                    continue;
-                }
-            } else {
-                this._authenticated = true;
-                this._log.debug("Already have an auth key ...");
-            }
-            break;
-        }
-        if (!connected) {
-            throw new Error(
-                `Connection to telegram failed after ${this._retries} time(s)`
+        const connection = this._connection!;
+
+        if (!connection.isConnected()) {
+            this._log.info(
+                "Connecting to {0}...".replace("{0}", connection.toString())
             );
             );
+            await connection.connect();
+            this._log.debug("Connection success!");
         }
         }
+
         if (!this.authKey.getKey()) {
         if (!this.authKey.getKey()) {
-            const error = new Error(
-                `auth key generation failed after ${this._retries} time(s)`
-            );
-            await this._disconnect(error);
-            throw error;
-        }
+            const plain = new MTProtoPlainSender(connection, this._log);
+            this._log.debug("New auth_key attempt ...");
+            const res = await doAuthentication(plain, this._log);
+            this._log.debug("Generated new auth_key successfully");
+            await this.authKey.setKey(res.authKey);
+
+            this._state.timeOffset = res.timeOffset;
 
 
+            if (this._authKeyCallback) {
+                await this._authKeyCallback(this.authKey, this._dcId);
+            }
+        } else {
+            this._authenticated = true;
+            this._log.debug("Already have an auth key ...");
+        }
         this._userConnected = true;
         this._userConnected = true;
+        this.isReconnecting = false;
 
 
-        this._log.debug("Starting receive loop");
-        this._recvLoopHandle = this._recvLoop();
+        if (!this._sendLoopHandle) {
+            this._log.debug("Starting send loop");
+            this._sendLoopHandle = this._sendLoop();
+        }
 
 
-        this._log.debug("Starting send loop");
-        this._sendLoopHandle = this._sendLoop();
+        if (!this._recvLoopHandle) {
+            this._log.debug("Starting receive loop");
+            this._recvLoopHandle = this._recvLoop();
+        }
+
+        // _disconnected only completes after manual disconnection
+        // or errors after which the sender cannot continue such
+        // as failing to reconnect or any unexpected error.
 
 
         this._log.info(
         this._log.info(
-            "Connection to %s complete!".replace(
-                "%s",
-                this._connection!.toString()
-            )
+            "Connection to %s complete!".replace("%s", connection.toString())
         );
         );
     }
     }
+    async _disconnect() {
+        const connection = this._connection;
+        if (this._updateCallback) {
+            this._updateCallback(
+                this._client,
+                new UpdateConnectionState(UpdateConnectionState.disconnected)
+            );
+        }
 
 
-    async _disconnect(error?: Error) {
-        if (!this._connection) {
+        if (connection === undefined) {
             this._log.info("Not disconnecting (already have no connection)");
             this._log.info("Not disconnecting (already have no connection)");
             return;
             return;
         }
         }
 
 
         this._log.info(
         this._log.info(
-            "Disconnecting from %s...".replace(
-                "%s",
-                this._connection!.toString()
-            )
+            "Disconnecting from %s...".replace("%s", connection.toString())
         );
         );
         this._userConnected = false;
         this._userConnected = false;
-        try {
-            this._log.debug("Closing current connection...");
-            await this._connection!.disconnect();
-        } finally {
-            this._log.debug(
-                `Cancelling ${this._pendingState.size} pending message(s)...`
-            );
-            for (const state of this._pendingState.values()) {
-                if (error && !state.result) {
-                    state.reject(error);
-                } else {
-                    state.reject("disconnected");
-                }
-            }
-
-            this._pendingState.clear();
-            this._cancelLoops();
-            this._log.info(
-                "Disconnecting from %s complete!".replace(
-                    "%s",
-                    this._connection!.toString()
-                )
-            );
-            this._connection = undefined;
-        }
+        this._log.debug("Closing current connection...");
+        await connection.disconnect();
     }
     }
 
 
     _cancelLoops() {
     _cancelLoops() {
@@ -445,92 +433,119 @@ export class MTProtoSender {
      * @private
      * @private
      */
      */
     async _sendLoop() {
     async _sendLoop() {
-        this._cancelSend = false;
-        while (
-            this._userConnected &&
-            !this._reconnecting &&
-            !this._cancelSend
-        ) {
-            if (this._pendingAck.size) {
-                const ack = new RequestState(
-                    new Api.MsgsAck({ msgIds: Array(...this._pendingAck) })
-                );
-                this._sendQueue.append(ack);
-                this._lastAcks.push(ack);
-                if (this._lastAcks.length >= 10) {
-                    this._lastAcks.shift();
+        // Retry previous pending requests
+        this._sendQueue.prepend(this._pendingState.values());
+        this._pendingState.clear();
+
+        while (this._userConnected && !this.isReconnecting) {
+            const appendAcks = () => {
+                if (this._pendingAck.size) {
+                    const ack = new RequestState(
+                        new MsgsAck({ msgIds: Array(...this._pendingAck) })
+                    );
+                    this._sendQueue.append(ack);
+                    this._lastAcks.push(ack);
+                    if (this._lastAcks.length >= 10) {
+                        this._lastAcks.shift();
+                    }
+                    this._pendingAck.clear();
                 }
                 }
-                this._pendingAck.clear();
-            }
+            };
+
+            appendAcks();
+
             this._log.debug(
             this._log.debug(
-                "Waiting for messages to send..." + this._reconnecting
+                `Waiting for messages to send... ${this.isReconnecting}`
             );
             );
             // TODO Wait for the connection send queue to be empty?
             // TODO Wait for the connection send queue to be empty?
             // This means that while it's not empty we can wait for
             // This means that while it's not empty we can wait for
             // more messages to be added to the send queue.
             // more messages to be added to the send queue.
+            await this._sendQueue.wait();
+
+            // If we've had new ACKs appended while waiting for messages to send, add them to queue
+            appendAcks();
 
 
             const res = await this._sendQueue.get();
             const res = await this._sendQueue.get();
+
+            this._log.debug(`Got ${res?.batch.length} message(s) to send`);
+
+            if (this.isReconnecting) {
+                this._log.debug("Reconnecting");
+                this._sendLoopHandle = undefined;
+                return;
+            }
+
             if (!res) {
             if (!res) {
                 continue;
                 continue;
             }
             }
             let { data } = res;
             let { data } = res;
-
             const { batch } = res;
             const { batch } = res;
             this._log.debug(
             this._log.debug(
                 `Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`
                 `Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`
             );
             );
+            this._log.debug(
+                `Sending   ${batch.map((m) => m.request.className)}`
+            );
 
 
             data = await this._state.encryptMessageData(data);
             data = await this._state.encryptMessageData(data);
 
 
+            try {
+                await this._connection!.send(data);
+            } catch (e) {
+                this._log.debug(`Connection closed while sending data ${e}`);
+                if (this._log.canSend(LogLevel.DEBUG)) {
+                    console.error(e);
+                }
+                this._sendLoopHandle = undefined;
+                return;
+            }
             for (const state of batch) {
             for (const state of batch) {
                 if (!Array.isArray(state)) {
                 if (!Array.isArray(state)) {
                     if (state.request.classType === "request") {
                     if (state.request.classType === "request") {
-                        this._pendingState.set(state.msgId.toString(), state);
+                        this._pendingState.set(state.msgId, state);
                     }
                     }
                 } else {
                 } else {
                     for (const s of state) {
                     for (const s of state) {
                         if (s.request.classType === "request") {
                         if (s.request.classType === "request") {
-                            this._pendingState.set(s.msgId.toString(), s);
+                            this._pendingState.set(s.msgId, s);
                         }
                         }
                     }
                     }
                 }
                 }
             }
             }
-            try {
-                await this._connection!.send(data);
-            } catch (e: any) {
-                this._log.error(e);
-                this._log.info("Connection closed while sending data");
-                this._startReconnecting(e);
-                return;
-            }
-
             this._log.debug("Encrypted messages put in a queue to be sent");
             this._log.debug("Encrypted messages put in a queue to be sent");
         }
         }
+
+        this._sendLoopHandle = undefined;
     }
     }
 
 
     async _recvLoop() {
     async _recvLoop() {
         let body;
         let body;
         let message;
         let message;
 
 
-        while (this._userConnected && !this._reconnecting) {
+        while (this._userConnected && !this.isReconnecting) {
             this._log.debug("Receiving items from the network...");
             this._log.debug("Receiving items from the network...");
             try {
             try {
-                this.cancellableRecvLoopPromise = pseudoCancellable(
-                    this._connection!.recv()
-                );
-                body = await this.cancellableRecvLoopPromise;
-            } catch (e: any) {
-                if (e instanceof Cancellation) {
-                    return;
+                body = await this._connection!.recv();
+            } catch (e) {
+                /** when the server disconnects us we want to reconnect */
+                if (!this.userDisconnected) {
+                    this._log.warn("Connection closed while receiving data");
+
+                    if (this._log.canSend(LogLevel.WARN)) {
+                        console.error(e);
+                    }
+                    this.reconnect();
                 }
                 }
-                this._log.error(e);
-                this._log.warn("Connection closed while receiving data...");
-                this._startReconnecting(e);
+                this._recvLoopHandle = undefined;
                 return;
                 return;
             }
             }
+
             try {
             try {
                 message = await this._state.decryptMessageData(body);
                 message = await this._state.decryptMessageData(body);
-            } catch (e: any) {
+            } catch (e) {
+                this._log.debug(
+                    `Error while receiving items from the network ${e}`
+                );
                 if (e instanceof TypeNotFoundError) {
                 if (e instanceof TypeNotFoundError) {
                     // Received object which we don't know how to deserialize
                     // Received object which we don't know how to deserialize
                     this._log.info(
                     this._log.info(
@@ -547,24 +562,7 @@ export class MTProtoSender {
                 } else if (e instanceof InvalidBufferError) {
                 } else if (e instanceof InvalidBufferError) {
                     // 404 means that the server has "forgotten" our auth key and we need to create a new one.
                     // 404 means that the server has "forgotten" our auth key and we need to create a new one.
                     if (e.code === 404) {
                     if (e.code === 404) {
-                        this._log.warn(
-                            `Broken authorization key for dc ${this._dcId}; resetting`
-                        );
-                        if (this._updateCallback && this._isMainSender) {
-                            this._updateCallback(
-                                this._client,
-                                new UpdateConnectionState(
-                                    UpdateConnectionState.broken
-                                )
-                            );
-                        } else if (
-                            this._onConnectionBreak &&
-                            !this._isMainSender
-                        ) {
-                            // Deletes the current sender from the object
-                            this._onConnectionBreak(this._dcId);
-                        }
-                        await this._disconnect(e);
+                        this._handleBadAuthKey();
                     } else {
                     } else {
                         // this happens sometimes when telegram is having some internal issues.
                         // this happens sometimes when telegram is having some internal issues.
                         // reconnecting should be enough usually
                         // reconnecting should be enough usually
@@ -572,27 +570,63 @@ export class MTProtoSender {
                         this._log.warn(
                         this._log.warn(
                             `Invalid buffer ${e.code} for dc ${this._dcId}`
                             `Invalid buffer ${e.code} for dc ${this._dcId}`
                         );
                         );
-                        this._startReconnecting(e);
+                        this.reconnect();
                     }
                     }
+                    this._recvLoopHandle = undefined;
                     return;
                     return;
                 } else {
                 } else {
                     this._log.error("Unhandled error while receiving data");
                     this._log.error("Unhandled error while receiving data");
-                    this._log.error(e);
-                    this._startReconnecting(e);
+                    if (this._log.canSend(LogLevel.ERROR)) {
+                        console.log(e);
+                    }
+                    this.reconnect();
+                    this._recvLoopHandle = undefined;
                     return;
                     return;
                 }
                 }
             }
             }
             try {
             try {
                 await this._processMessage(message);
                 await this._processMessage(message);
-            } catch (e: any) {
-                this._log.error("Unhandled error while processing data");
-                this._log.error(e);
+            } catch (e) {
+                // `RPCError` errors except for 'AUTH_KEY_UNREGISTERED' should be handled by the client
+                if (e instanceof RPCError) {
+                    if (
+                        e.message === "AUTH_KEY_UNREGISTERED" ||
+                        e.message === "SESSION_REVOKED"
+                    ) {
+                        // 'AUTH_KEY_UNREGISTERED' for the main sender is thrown when unauthorized and should be ignored
+                        this._handleBadAuthKey(true);
+                    }
+                } else {
+                    this._log.error("Unhandled error while receiving data");
+                    if (this._log.canSend(LogLevel.ERROR)) {
+                        console.log(e);
+                    }
+                }
             }
             }
         }
         }
+
+        this._recvLoopHandle = undefined;
     }
     }
 
 
     // Response Handlers
     // Response Handlers
+    _handleBadAuthKey(shouldSkipForMain: boolean = false) {
+        if (shouldSkipForMain && this._isMainSender) {
+            return;
+        }
 
 
+        this._log.warn(
+            `Broken authorization key for dc ${this._dcId}, resetting...`
+        );
+
+        if (this._isMainSender && this._updateCallback) {
+            this._updateCallback(
+                this._client,
+                new UpdateConnectionState(UpdateConnectionState.broken)
+            );
+        } else if (!this._isMainSender && this._onConnectionBreak) {
+            this._onConnectionBreak(this._dcId);
+        }
+    }
     /**
     /**
      * Adds the given message to the list of messages that must be
      * Adds the given message to the list of messages that must be
      * acknowledged and dispatches control to different ``_handle_*``
      * acknowledged and dispatches control to different ``_handle_*``
@@ -621,25 +655,23 @@ export class MTProtoSender {
      * @private
      * @private
      */
      */
     _popStates(msgId: bigInt.BigInteger) {
     _popStates(msgId: bigInt.BigInteger) {
-        let state = this._pendingState.get(msgId.toString());
+        const state = this._pendingState.getAndDelete(msgId);
         if (state) {
         if (state) {
-            this._pendingState.delete(msgId.toString());
             return [state];
             return [state];
         }
         }
 
 
         const toPop = [];
         const toPop = [];
 
 
-        for (const state of this._pendingState.values()) {
-            if (state.containerId && state.containerId.equals(msgId)) {
-                toPop.push(state.msgId);
+        for (const pendingState of this._pendingState.values()) {
+            if (pendingState.containerId?.equals(msgId)) {
+                toPop.push(pendingState.msgId!);
             }
             }
         }
         }
 
 
         if (toPop.length) {
         if (toPop.length) {
             const temp = [];
             const temp = [];
             for (const x of toPop) {
             for (const x of toPop) {
-                temp.push(this._pendingState.get(x!.toString()));
-                this._pendingState.delete(x!.toString());
+                temp.push(this._pendingState.getAndDelete(x));
             }
             }
             return temp;
             return temp;
         }
         }
@@ -652,7 +684,6 @@ export class MTProtoSender {
 
 
         return [];
         return [];
     }
     }
-
     /**
     /**
      * Handles the result for Remote Procedure Calls:
      * Handles the result for Remote Procedure Calls:
      * rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
      * rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
@@ -662,14 +693,9 @@ export class MTProtoSender {
      * @private
      * @private
      */
      */
     _handleRPCResult(message: TLMessage) {
     _handleRPCResult(message: TLMessage) {
-        const RPCResult = message.obj;
-        const state = this._pendingState.get(RPCResult.reqMsgId.toString());
-        if (state) {
-            this._pendingState.delete(RPCResult.reqMsgId.toString());
-        }
-        this._log.debug(
-            `Handling RPC result for message ${RPCResult.reqMsgId}`
-        );
+        const result = message.obj;
+        const state = this._pendingState.getAndDelete(result.reqMsgId);
+        this._log.debug(`Handling RPC result for message ${result.reqMsgId}`);
 
 
         if (!state) {
         if (!state) {
             // TODO We should not get responses to things we never sent
             // TODO We should not get responses to things we never sent
@@ -677,36 +703,42 @@ export class MTProtoSender {
             // See #658, #759 and #958. They seem to happen in a container
             // See #658, #759 and #958. They seem to happen in a container
             // which contain the real response right after.
             // which contain the real response right after.
             try {
             try {
-                const reader = new BinaryReader(RPCResult.body);
+                const reader = new BinaryReader(result.body);
                 if (!(reader.tgReadObject() instanceof Api.upload.File)) {
                 if (!(reader.tgReadObject() instanceof Api.upload.File)) {
-                    throw new Error("Not an upload.File");
+                    throw new TypeNotFoundError(0, Buffer.alloc(0));
                 }
                 }
-            } catch (e: any) {
-                this._log.error(e);
+            } catch (e) {
                 if (e instanceof TypeNotFoundError) {
                 if (e instanceof TypeNotFoundError) {
                     this._log.info(
                     this._log.info(
-                        `Received response without parent request: ${RPCResult.body}`
+                        `Received response without parent request: ${result.body}`
                     );
                     );
                     return;
                     return;
-                } else {
-                    throw e;
                 }
                 }
+
+                throw e;
             }
             }
             return;
             return;
         }
         }
-        if (RPCResult.error && state.msgId) {
-            const error = RPCMessageToError(RPCResult.error, state.request);
+
+        if (result.error) {
+            // eslint-disable-next-line new-cap
+            const error = RPCMessageToError(result.error, state.request);
             this._sendQueue.append(
             this._sendQueue.append(
-                new RequestState(new Api.MsgsAck({ msgIds: [state.msgId] }))
+                new RequestState(new MsgsAck({ msgIds: [state.msgId!] }))
             );
             );
             state.reject(error);
             state.reject(error);
+            throw error;
         } else {
         } else {
             try {
             try {
-                const reader = new BinaryReader(RPCResult.body);
+                const reader = new BinaryReader(result.body);
                 const read = state.request.readResult(reader);
                 const read = state.request.readResult(reader);
+                this._log.debug(
+                    `Handling RPC result ${read?.constructor?.name}`
+                );
                 state.resolve(read);
                 state.resolve(read);
-            } catch (e) {
-                state.reject(e);
+            } catch (err) {
+                state.reject(err);
+                throw err;
             }
             }
         }
         }
     }
     }
@@ -877,34 +909,9 @@ export class MTProtoSender {
     }
     }
 
 
     /**
     /**
-     * Handles a server acknowledge about our messages. Normally
-     * these can be ignored except in the case of ``auth.logOut``:
-     *
-     *     auth.logOut#5717da40 = Bool;
-     *
-     * Telegram doesn't seem to send its result so we need to confirm
-     * it manually. No other request is known to have this behaviour.
-
-     * Since the ID of sent messages consisting of a container is
-     * never returned (unless on a bad notification), this method
-     * also removes containers messages when any of their inner
-     * messages are acknowledged.
-
-     * @param message
-     * @returns {Promise<void>}
-     * @private
+     * Handles a server acknowledge about our messages. Normally these can be ignored
      */
      */
-    async _handleAck(message: TLMessage) {
-        const ack = message.obj;
-        this._log.debug(`Handling acknowledge for ${ack.msgIds}`);
-        for (const msgId of ack.msgIds) {
-            const state = this._pendingState.get(msgId);
-            if (state && state.request instanceof Api.auth.LogOut) {
-                this._pendingState.delete(msgId);
-                state.resolve(true);
-            }
-        }
-    }
+    _handleAck() {}
 
 
     /**
     /**
      * Handles future salt results, which don't come inside a
      * Handles future salt results, which don't come inside a
@@ -916,13 +923,10 @@ export class MTProtoSender {
      * @private
      * @private
      */
      */
     async _handleFutureSalts(message: TLMessage) {
     async _handleFutureSalts(message: TLMessage) {
-        // TODO save these salts and automatically adjust to the
-        // correct one whenever the salt in use expires.
         this._log.debug(`Handling future salts for message ${message.msgId}`);
         this._log.debug(`Handling future salts for message ${message.msgId}`);
-        const state = this._pendingState.get(message.msgId.toString());
+        const state = this._pendingState.getAndDelete(message.msgId);
 
 
         if (state) {
         if (state) {
-            this._pendingState.delete(message.msgId.toString());
             state.resolve(message.obj);
             state.resolve(message.obj);
         }
         }
     }
     }
@@ -952,106 +956,53 @@ export class MTProtoSender {
      * @private
      * @private
      */
      */
     async _handleMsgAll(message: TLMessage) {}
     async _handleMsgAll(message: TLMessage) {}
-
-    async _reconnect(lastError?: any) {
-        this._log.debug("Closing current connection...");
-        await this._connection!.disconnect();
-        this._cancelLoops();
-
-        this._reconnecting = false;
-        this._state.reset();
-        let attempt;
-        let ok = true;
-        for (attempt = 0; attempt < this._retries; attempt++) {
-            try {
-                await this._connect();
-                await sleep(1000);
-                this._sendQueue.extend([...this._pendingState.values()]);
-                this._pendingState.clear();
-                if (this._autoReconnectCallback) {
-                    this._autoReconnectCallback();
-                }
-                break;
-            } catch (err: any) {
-                if (attempt == this._retries - 1) {
-                    ok = false;
-                }
-                if (err instanceof InvalidBufferError) {
-                    if (err.code === 404) {
-                        this._log.warn(
-                            `Broken authorization key for dc ${this._dcId}; resetting`
-                        );
-                        await this.authKey.setKey(undefined);
-                        if (this._authKeyCallback) {
-                            await this._authKeyCallback(undefined);
-                        }
-                        ok = false;
-                        break;
-                    } else {
-                        // this happens sometimes when telegram is having some internal issues.
-                        // since the data we sent and received is probably wrong now.
-                        this._log.warn(
-                            `Invalid buffer ${err.code} for dc ${this._dcId}`
-                        );
-                    }
-                }
-                this._log.error(
-                    `Unexpected exception reconnecting on attempt ${attempt}`
-                );
-                await sleep(this._delay);
-                lastError = err;
-            }
-        }
-        if (!ok) {
-            this._log.error(`Automatic reconnection failed ${attempt} time(s)`);
-            await this._disconnect(lastError ? lastError : undefined);
-        }
-    }
-
-    async _tryConnect(attempt: number) {
-        try {
-            this._log.debug(`Connection attempt ${attempt}...`);
-            await this._connection!.connect();
-            this._log.debug("Connection success!");
-            return true;
-        } catch (err) {
-            this._log.warn(`Attempt ${attempt} at connecting failed`);
-            if (this._log.canSend(LogLevel.ERROR)) {
-                console.error(err);
-            }
-            await sleep(this._delay);
-            return false;
+    reconnect() {
+        if (this._userConnected && !this.isReconnecting) {
+            this.isReconnecting = true;
+            // we want to wait a second between each reconnect try to not flood the server with reconnects
+            // in case of internal server issues.
+            sleep(1000).then(() => {
+                this._log.info("Started reconnecting");
+                this._reconnect();
+            });
         }
         }
     }
     }
 
 
-    async _tryGenAuthKey(attempt: number) {
-        const plain = new MTProtoPlainSender(this._connection, this._log);
+    async _reconnect() {
+        this._log.debug("Closing current connection...");
         try {
         try {
-            this._log.debug(`New auth_key attempt ${attempt}...`);
-            this._log.debug("New auth_key attempt ...");
-            const res = await doAuthentication(plain, this._log);
-            this._log.debug("Generated new auth_key successfully");
-            await this.authKey.setKey(res.authKey);
-            this._state.timeOffset = res.timeOffset;
-            if (this._authKeyCallback) {
-                await this._authKeyCallback(this.authKey, this._dcId);
-            }
-            this._log.debug("auth_key generation success!");
-            return true;
+            this._log.warn("[Reconnect] Closing current connection...");
+            await this._disconnect();
         } catch (err) {
         } catch (err) {
-            this._log.warn(`Attempt ${attempt} at generating auth key failed`);
+            this._log.warn("Error happened while disconnecting");
             if (this._log.canSend(LogLevel.ERROR)) {
             if (this._log.canSend(LogLevel.ERROR)) {
                 console.error(err);
                 console.error(err);
             }
             }
-            return false;
         }
         }
-    }
 
 
-    private _startReconnecting(error: Error) {
-        this._log.info(`Starting reconnect...`);
-        if (this._userConnected && !this._reconnecting) {
-            this._reconnecting = true;
-            this._reconnect(error);
+        this._sendQueue.clear();
+        this._state.reset();
+        const connection = this._connection!;
+
+        // For some reason reusing existing connection caused stuck requests
+        // @ts-ignore
+        const newConnection = new connection.constructor({
+            ip: connection._ip,
+            port: connection._port,
+            dcId: connection._dcId,
+            loggers: connection._log,
+            proxy: connection._proxy,
+            testServers: connection._testServers,
+            socket: this._client.networkSocket,
+        });
+        await this.connect(newConnection, true);
+
+        this.isReconnecting = false;
+        this._sendQueue.prepend(this._pendingState.values());
+        this._pendingState.clear();
+
+        if (this._autoReconnectCallback) {
+            await this._autoReconnectCallback();
         }
         }
     }
     }
 }
 }

+ 23 - 3
gramjs/network/RequestState.ts

@@ -1,4 +1,6 @@
 import bigInt from "big-integer";
 import bigInt from "big-integer";
+import Deferred from "../extensions/Deferred";
+import { Api } from "../tl";
 
 
 export class RequestState {
 export class RequestState {
     public containerId?: bigInt.BigInteger;
     public containerId?: bigInt.BigInteger;
@@ -7,19 +9,37 @@ export class RequestState {
     public data: Buffer;
     public data: Buffer;
     public after: any;
     public after: any;
     public result: undefined;
     public result: undefined;
-    promise: Promise<unknown>;
+    public finished: Deferred;
+    public promise: Promise<unknown> | undefined;
     // @ts-ignore
     // @ts-ignore
     public resolve: (value?: any) => void;
     public resolve: (value?: any) => void;
     // @ts-ignore
     // @ts-ignore
     public reject: (reason?: any) => void;
     public reject: (reason?: any) => void;
 
 
-    constructor(request: any, after = undefined) {
+    constructor(request: Api.AnyRequest | Api.MsgsAck | Api.MsgsStateInfo) {
         this.containerId = undefined;
         this.containerId = undefined;
         this.msgId = undefined;
         this.msgId = undefined;
         this.request = request;
         this.request = request;
         this.data = request.getBytes();
         this.data = request.getBytes();
-        this.after = after;
+        this.after = undefined;
         this.result = undefined;
         this.result = undefined;
+        this.finished = new Deferred();
+
+        this.resetPromise();
+    }
+
+    isReady() {
+        if (!this.after) {
+            return true;
+        }
+
+        return this.after.finished.promise;
+    }
+
+    resetPromise() {
+        // Prevent stuck await
+        this.reject?.();
+
         this.promise = new Promise((resolve, reject) => {
         this.promise = new Promise((resolve, reject) => {
             this.resolve = resolve;
             this.resolve = resolve;
             this.reject = reject;
             this.reject = reject;

+ 27 - 38
gramjs/network/connection/Connection.ts

@@ -48,7 +48,6 @@ class Connection {
     protected _obfuscation: any;
     protected _obfuscation: any;
     _sendArray: AsyncQueue;
     _sendArray: AsyncQueue;
     _recvArray: AsyncQueue;
     _recvArray: AsyncQueue;
-    recvCancel?: CancellablePromise<any>;
     sendCancel?: CancellablePromise<any>;
     sendCancel?: CancellablePromise<any>;
     socket: PromisedNetSockets | PromisedWebSockets;
     socket: PromisedNetSockets | PromisedWebSockets;
     public _testServers: boolean;
     public _testServers: boolean;
@@ -91,24 +90,19 @@ class Connection {
         await this._connect();
         await this._connect();
         this._connected = true;
         this._connected = true;
 
 
-        this._sendTask = this._sendLoop();
+        if (!this._sendTask) {
+            this._sendTask = this._sendLoop();
+        }
         this._recvTask = this._recvLoop();
         this._recvTask = this._recvLoop();
     }
     }
 
 
-    _cancelLoops() {
-        this.recvCancel!.cancel();
-        this.sendCancel!.cancel();
-    }
-
     async disconnect() {
     async disconnect() {
-        this._connected = false;
-        this._cancelLoops();
-
-        try {
-            await this.socket.close();
-        } catch (e) {
-            this._log.error("error while closing socket connection");
+        if (!this._connected) {
+            return;
         }
         }
+
+        this._connected = false;
+        void this._recvArray.push(undefined);
     }
     }
 
 
     async send(data: Buffer) {
     async send(data: Buffer) {
@@ -121,7 +115,8 @@ class Connection {
     async recv() {
     async recv() {
         while (this._connected) {
         while (this._connected) {
             const result = await this._recvArray.pop();
             const result = await this._recvArray.pop();
-            if (result && result.length) {
+            // null = sentinel value = keep trying
+            if (result) {
                 return result;
                 return result;
             }
             }
         }
         }
@@ -131,44 +126,38 @@ class Connection {
     async _sendLoop() {
     async _sendLoop() {
         try {
         try {
             while (this._connected) {
             while (this._connected) {
-                this.sendCancel = pseudoCancellable(this._sendArray.pop());
-                const data = await this.sendCancel;
+                const data = await this._sendArray.pop();
                 if (!data) {
                 if (!data) {
-                    continue;
+                    this._sendTask = undefined;
+                    return;
                 }
                 }
                 await this._send(data);
                 await this._send(data);
             }
             }
-        } catch (e: any) {
-            if (e instanceof Cancellation) {
-                return;
-            }
+        } catch (e) {
             this._log.info("The server closed the connection while sending");
             this._log.info("The server closed the connection while sending");
-            await this.disconnect();
         }
         }
     }
     }
 
 
+    isConnected() {
+        return this._connected;
+    }
+
     async _recvLoop() {
     async _recvLoop() {
         let data;
         let data;
         while (this._connected) {
         while (this._connected) {
             try {
             try {
-                this.recvCancel = pseudoCancellable(this._recv());
-                data = await this.recvCancel;
-            } catch (e: any) {
-                if (e instanceof Cancellation) {
-                    return;
-                }
-                this._log.info("The server closed the connection");
-                await this.disconnect();
-                if (!this._recvArray._queue.length) {
-                    await this._recvArray.push(undefined);
+                data = await this._recv();
+                if (!data) {
+                    throw new Error("no data received");
                 }
                 }
-                break;
-            }
-            try {
-                await this._recvArray.push(data);
             } catch (e) {
             } catch (e) {
-                break;
+                this._log.info("connection closed");
+                // await this._recvArray.push()
+
+                this.disconnect();
+                return;
             }
             }
+            await this._recvArray.push(data);
         }
         }
     }
     }
 
 

+ 12 - 1
publish_npm.js

@@ -82,6 +82,12 @@ fs.writeFileSync(
   JSON.stringify(packageJSON, null, "  "),
   JSON.stringify(packageJSON, null, "  "),
   "utf8"
   "utf8"
 );
 );
+fs.writeFileSync(
+  "gramjs/Version.ts",
+  `export const version = "${packageJSON.version}";`,
+  "utf8"
+);
+
 renameFiles("tempBrowser", "rename");
 renameFiles("tempBrowser", "rename");
 
 
 const npmi = exec("npm i");
 const npmi = exec("npm i");
@@ -146,6 +152,11 @@ npmi.on("close", (code) => {
       JSON.stringify(packageJSON, null, "  "),
       JSON.stringify(packageJSON, null, "  "),
       "utf8"
       "utf8"
     );
     );
+    fs.writeFileSync(
+      "gramjs/Version.ts",
+      `export const version = "${packageJSON.version}";`,
+      "utf8"
+    );
 
 
     const npmi = exec("npm i");
     const npmi = exec("npm i");
     npmi.on("close", (code) => {
     npmi.on("close", (code) => {
@@ -162,7 +173,7 @@ npmi.on("close", (code) => {
           fs.copyFileSync("gramjs/tl/api.d.ts", "dist/tl/api.d.ts");
           fs.copyFileSync("gramjs/tl/api.d.ts", "dist/tl/api.d.ts");
           fs.copyFileSync("gramjs/define.d.ts", "dist/define.d.ts");
           fs.copyFileSync("gramjs/define.d.ts", "dist/define.d.ts");
           renameFiles("dist", "delete");
           renameFiles("dist", "delete");
-          const npm_publish = exec("npm publish", { cwd: "dist" });
+          const npm_publish = exec("npm publish --tag next", { cwd: "dist" });
           npm_publish.stdout.on("data", function (data) {
           npm_publish.stdout.on("data", function (data) {
             console.log(data.toString());
             console.log(data.toString());
           });
           });

+ 1 - 1
tsconfig.json

@@ -2,7 +2,7 @@
   "compilerOptions": {
   "compilerOptions": {
     "module": "commonjs",
     "module": "commonjs",
     "target": "es2017",
     "target": "es2017",
-    "lib": ["dom", "es7"],
+    "lib": ["dom", "es7", "ES2019"],
     "sourceMap": false,
     "sourceMap": false,
     "downlevelIteration": true,
     "downlevelIteration": true,
     "allowJs": true,
     "allowJs": true,