Bläddra i källkod

Fix reconnects.
Fix download and upload workers.

painor 3 år sedan
förälder
incheckning
7474e57e1f

+ 11 - 3
gramjs/Helpers.ts

@@ -38,6 +38,7 @@ export function generateRandomBigInt() {
 export function escapeRegex(string: string) {
     return string.replace(/[-\/\\^$*+?.()|[\]{}]/g, "\\$&");
 }
+
 export function groupBy(list: any[], keyGetter: Function) {
     const map = new Map();
     list.forEach((item) => {
@@ -433,11 +434,18 @@ export const sleep = (ms: number) =>
     new Promise((resolve) => setTimeout(resolve, ms));
 
 /**
- * Checks if the obj is an array
- * @param obj
- * @returns {boolean}
+ * Helper to export two buffers of same length
+ * @returns {Buffer}
  */
 
+export function bufferXor(a: Buffer, b: Buffer) {
+    const res = [];
+    for (let i = 0; i < a.length; i++) {
+        res.push(a[i] ^ b[i]);
+    }
+    return Buffer.from(res);
+}
+
 // Taken from https://stackoverflow.com/questions/18638900/javascript-crc32/18639999#18639999
 function makeCRCTable() {
     let c;

+ 57 - 106
gramjs/client/TelegramClient.ts

@@ -20,9 +20,9 @@ import { MTProtoSender, UpdateConnectionState } from "../network";
 import { LAYER } from "../tl/AllTLObjects";
 import { betterConsoleLog, IS_NODE } from "../Helpers";
 import { DownloadMediaInterface } from "./downloads";
-import type { Message } from "../tl/custom/message";
+import type { Message } from "../tl/patched";
 import { NewMessage, NewMessageEvent } from "../events";
-import { _dispatchUpdate, _handleUpdate, _updateLoop } from "./updates";
+import { _handleUpdate, _updateLoop } from "./updates";
 import { Session } from "../sessions";
 import { inspect } from "util";
 import { Album, AlbumEvent } from "../events/Album";
@@ -1115,19 +1115,25 @@ export class TelegramClient extends TelegramBaseClient {
 
     async connect() {
         await this._initSession();
-
-        this._sender = new MTProtoSender(this.session.getAuthKey(), {
-            logger: this._log,
-            dcId: this.session.dcId || 4,
-            retries: this._connectionRetries,
-            delay: this._retryDelay,
-            autoReconnect: this._autoReconnect,
-            connectTimeout: this._timeout,
-            authKeyCallback: this._authKeyCallback.bind(this),
-            updateCallback: _handleUpdate.bind(this),
-            isMainSender: true,
-            client: this,
-        });
+        if (this._sender === undefined) {
+            this._sender = new MTProtoSender(this.session.getAuthKey(), {
+                logger: this._log,
+                dcId: this.session.dcId || 4,
+                retries: this._connectionRetries,
+                delay: this._retryDelay,
+                autoReconnect: this._autoReconnect,
+                connectTimeout: this._timeout,
+                authKeyCallback: this._authKeyCallback.bind(this),
+                updateCallback: _handleUpdate.bind(this),
+                isMainSender: true,
+                client: this,
+            });
+        }
+        // set defaults vars
+        this._sender.userDisconnected = false;
+        this._sender._userConnected = false;
+        this._sender._reconnecting = false;
+        this._sender._disconnected = true;
 
         const connection = new this._connection(
             this.session.serverAddress,
@@ -1135,20 +1141,17 @@ export class TelegramClient extends TelegramBaseClient {
             this.session.dcId,
             this._log
         );
-        if (
-            !(await this._sender.connect(
-                connection,
-                (updateState: UpdateConnectionState) => {
-                    _dispatchUpdate(this, {
-                        update: updateState,
-                    });
-                }
-            ))
-        ) {
+        const newConnection = await this._sender.connect(connection);
+        if (!newConnection) {
+            // we're already connected so no need to reset auth key.
+            if (!this._loopStarted) {
+                _updateLoop(this);
+                this._loopStarted = true;
+            }
             return;
         }
+
         this.session.setAuthKey(this._sender.authKey);
-        await this.session.save();
         this._initRequest.query = new Api.help.GetConfig();
         await this._sender.send(
             new Api.InvokeWithLayer({
@@ -1157,8 +1160,11 @@ export class TelegramClient extends TelegramBaseClient {
             })
         );
 
-        _dispatchUpdate(this, { update: new UpdateConnectionState(1) });
-        _updateLoop(this);
+        if (!this._loopStarted) {
+            _updateLoop(this);
+            this._loopStarted = true;
+        }
+        this._reconnecting = false;
     }
 
     //endregion
@@ -1170,72 +1176,23 @@ export class TelegramClient extends TelegramBaseClient {
         this.session.setDC(newDc, DC.ipAddress, DC.port);
         // authKey's are associated with a server, which has now changed
         // so it's not valid anymore. Set to undefined to force recreating it.
-        await this._sender!.authKey.setKey();
-        this.session.setAuthKey();
+        await this._sender!.authKey.setKey(undefined);
+        this.session.setAuthKey(undefined);
+        this._reconnecting = true;
         await this.disconnect();
         return this.connect();
     }
 
-    /** @hidden */
-    async _createExportedSender(
-        dcId: number,
-        retries: number
-    ): Promise<MTProtoSender> {
-        const dc = await this.getDC(dcId);
-        const sender = new MTProtoSender(this.session.getAuthKey(dcId), {
-            logger: this._log,
-            dcId: dcId,
-            retries: this._connectionRetries,
-            delay: this._retryDelay,
-            autoReconnect: this._autoReconnect,
-            connectTimeout: this._timeout,
-            authKeyCallback: this._authKeyCallback.bind(this),
-            isMainSender: dcId === this.session.dcId,
-            senderCallback: this._removeSender.bind(this),
-            client: this,
-        });
-        for (let i = 0; i < retries; i++) {
-            try {
-                await sender.connect(
-                    new this._connection(dc.ipAddress, dc.port, dcId, this._log)
-                );
-                if (this.session.dcId !== dcId) {
-                    this._log.info(
-                        `Exporting authorization for data center ${dc.ipAddress}`
-                    );
-                    const auth = await this.invoke(
-                        new Api.auth.ExportAuthorization({ dcId: dcId })
-                    );
-                    this._initRequest.query = new Api.auth.ImportAuthorization({
-                        id: auth.id,
-                        bytes: auth.bytes,
-                    });
-                    const req = new Api.InvokeWithLayer({
-                        layer: LAYER,
-                        query: this._initRequest,
-                    });
-                    await sender.send(req);
-                }
-                sender.dcId = dcId;
-                return sender;
-            } catch (e) {
-                // we can't create sender for our own main DC
-                if (e.errorMessage == "DC_ID_INVALID") {
-                    return this._sender!;
-                }
-                await sender.disconnect();
-            }
-        }
-        throw new Error("Could not create sender for DC " + dcId);
-    }
-
     /**
      * Returns the DC ip in case of node or the DC web address in case of browser.<br/>
      * This will do an API request to fill the cache if it's the first time it's called.
      * @param dcId The DC ID.
+     * @param downloadDC whether to use -1 DCs or not
+     * (These only support downloading/uploading and not creating a new AUTH key)
      */
     async getDC(
-        dcId: number
+        dcId: number,
+        downloadDC = false
     ): Promise<{ id: number; ipAddress: string; port: number }> {
         this._log.debug(`Getting DC ${dcId}`);
         if (!IS_NODE) {
@@ -1243,31 +1200,41 @@ export class TelegramClient extends TelegramBaseClient {
                 case 1:
                     return {
                         id: 1,
-                        ipAddress: "pluto.web.telegram.org",
+                        ipAddress: `pluto${
+                            downloadDC ? "-1" : ""
+                        }.web.telegram.org`,
                         port: 443,
                     };
                 case 2:
                     return {
                         id: 2,
-                        ipAddress: "venus.web.telegram.org",
+                        ipAddress: `venus${
+                            downloadDC ? "-1" : ""
+                        }.web.telegram.org`,
                         port: 443,
                     };
                 case 3:
                     return {
                         id: 3,
-                        ipAddress: "aurora.web.telegram.org",
+                        ipAddress: `aurora${
+                            downloadDC ? "-1" : ""
+                        }.web.telegram.org`,
                         port: 443,
                     };
                 case 4:
                     return {
                         id: 4,
-                        ipAddress: "vesta.web.telegram.org",
+                        ipAddress: `vesta${
+                            downloadDC ? "-1" : ""
+                        }.web.telegram.org`,
                         port: 443,
                     };
                 case 5:
                     return {
                         id: 5,
-                        ipAddress: "flora.web.telegram.org",
+                        ipAddress: `flora${
+                            downloadDC ? "-1" : ""
+                        }.web.telegram.org`,
                         port: 443,
                     };
                 default:
@@ -1296,23 +1263,6 @@ export class TelegramClient extends TelegramBaseClient {
         delete this._borrowedSenderPromises[dcId];
     }
 
-    /** @hidden */
-    async _borrowExportedSender(dcId: number, retries = this._requestRetries) {
-        this._log.debug(`Borrowing client for DC ${dcId}`);
-        let senderPromise = this._borrowedSenderPromises[dcId];
-        if (!senderPromise) {
-            senderPromise = this._createExportedSender(dcId, retries);
-            this._borrowedSenderPromises[dcId] = senderPromise;
-
-            senderPromise.then((sender: any) => {
-                if (!sender) {
-                    delete this._borrowedSenderPromises[dcId];
-                }
-            });
-        }
-        return senderPromise;
-    }
-
     /** @hidden */
     _getResponseMessage(req: any, result: any, inputChat: any) {
         return parseMethods._getResponseMessage(this, req, result, inputChat);
@@ -1329,5 +1279,6 @@ export class TelegramClient extends TelegramBaseClient {
     static get events() {
         return require("../events");
     }
+
     // endregion
 }

+ 0 - 1
gramjs/client/chats.ts

@@ -170,7 +170,6 @@ export class _ParticipantsIter extends RequestIter {
         this.requests = [];
         if (ty == helpers._EntityType.CHANNEL) {
             if (showTotal) {
-                console.log("called me!");
                 const channel = await this.client.invoke(
                     new Api.channels.GetFullChannel({
                         channel: entity,

+ 53 - 54
gramjs/client/downloads.ts

@@ -2,10 +2,9 @@ import { Api } from "../tl";
 import type { TelegramClient } from "./TelegramClient";
 import { getAppropriatedPartSize, strippedPhotoToJpg } from "../Utils";
 import { sleep } from "../Helpers";
-import { MTProtoSender } from "../network";
-import type { Message } from "../tl/custom/message";
+import { Message } from "../tl/patched";
 import { EntityLike } from "../define";
-import { utils } from "../";
+import { errors, utils } from "../";
 
 /**
  * progress callback that will be called each time a new chunk is downloaded.
@@ -66,6 +65,7 @@ const MIN_CHUNK_SIZE = 4096;
 const DEFAULT_CHUNK_SIZE = 64; // kb
 const ONE_MB = 1024 * 1024;
 const REQUEST_TIMEOUT = 15000;
+const DISCONNECT_SLEEP = 1000;
 
 /** @hidden */
 export async function downloadFile(
@@ -73,7 +73,8 @@ export async function downloadFile(
     inputLocation: Api.TypeInputFileLocation,
     fileParams: DownloadFileParams
 ) {
-    let { partSizeKb, fileSize, workers = 1, end } = fileParams;
+    let { partSizeKb, end } = fileParams;
+    const { fileSize, workers = 1 } = fileParams;
     const { dcId, progressCallback, start = 0 } = fileParams;
 
     end = end && end < fileSize ? end : fileSize - 1;
@@ -86,32 +87,13 @@ export async function downloadFile(
 
     const partSize = partSizeKb * 1024;
     const partsCount = end ? Math.ceil((end - start) / partSize) : 1;
+
     if (partSize % MIN_CHUNK_SIZE !== 0) {
         throw new Error(
             `The part size must be evenly divisible by ${MIN_CHUNK_SIZE}`
         );
     }
 
-    let sender: MTProtoSender;
-
-    if (dcId) {
-        try {
-            sender = await client._borrowExportedSender(dcId);
-            client._log.debug(`Finished creating sender for ${dcId}`);
-        } catch (e) {
-            // This should never raise
-            if (e.errorMessage === "DC_ID_INVALID") {
-                // Can't export a sender for the ID we are currently in
-                sender = client._sender!;
-            } else {
-                client._log.error(e);
-                throw e;
-            }
-        }
-    } else {
-        sender = client._sender!;
-    }
-
     client._log.info(`Downloading file in chunks of ${partSize} bytes`);
 
     const foreman = new Foreman(workers);
@@ -125,6 +107,10 @@ export async function downloadFile(
         progressCallback(progress);
     }
 
+    // Preload sender
+    await client.getSender(dcId);
+
+    // eslint-disable-next-line no-constant-condition
     while (true) {
         let limit = partSize;
         let isPrecise = false;
@@ -140,47 +126,58 @@ export async function downloadFile(
         await foreman.requestWorker();
 
         if (hasEnded) {
-            await foreman.releaseWorker();
+            foreman.releaseWorker();
             break;
         }
+        // eslint-disable-next-line no-loop-func
         promises.push(
-            (async () => {
-                try {
-                    const result = await Promise.race([
-                        await sender.send(
+            (async (offsetMemo: number) => {
+                // eslint-disable-next-line no-constant-condition
+                while (true) {
+                    let sender;
+                    try {
+                        sender = await client.getSender(dcId);
+                        const result = await sender.send(
                             new Api.upload.GetFile({
                                 location: inputLocation,
-                                offset,
+                                offset: offsetMemo,
                                 limit,
                                 precise: isPrecise || undefined,
                             })
-                        ),
-                        sleep(REQUEST_TIMEOUT).then(() =>
-                            Promise.reject(new Error("REQUEST_TIMEOUT"))
-                        ),
-                    ]);
-
-                    if (progressCallback) {
-                        if (progressCallback.isCanceled) {
-                            throw new Error("USER_CANCELED");
+                        );
+
+                        if (progressCallback) {
+                            if (progressCallback.isCanceled) {
+                                throw new Error("USER_CANCELED");
+                            }
+
+                            progress += 1 / partsCount;
+                            progressCallback(progress);
                         }
 
-                        progress += 1 / partsCount;
-                        progressCallback(progress);
-                    }
+                        if (!end && result.bytes.length < limit) {
+                            hasEnded = true;
+                        }
+
+                        foreman.releaseWorker();
+
+                        return result.bytes;
+                    } catch (err) {
+                        if (sender && !sender.isConnected()) {
+                            await sleep(DISCONNECT_SLEEP);
+                            continue;
+                        } else if (err instanceof errors.FloodWaitError) {
+                            await sleep(err.seconds * 1000);
+                            continue;
+                        }
+
+                        foreman.releaseWorker();
 
-                    if (!end && result.bytes.length < limit) {
                         hasEnded = true;
+                        throw err;
                     }
-
-                    return result.bytes;
-                } catch (err) {
-                    hasEnded = true;
-                    throw err;
-                } finally {
-                    foreman.releaseWorker();
                 }
-            })()
+            })(offset)
         );
 
         offset += limit;
@@ -189,7 +186,6 @@ export async function downloadFile(
             break;
         }
     }
-
     const results = await Promise.all(promises);
     const buffers = results.filter(Boolean);
     const totalLength = end ? end + 1 - start : undefined;
@@ -257,7 +253,11 @@ export async function downloadMedia(
 ): Promise<Buffer> {
     let date;
     let media;
-    if (messageOrMedia instanceof Api.Message) {
+
+    if (
+        messageOrMedia instanceof Message ||
+        messageOrMedia instanceof Api.Message
+    ) {
         media = messageOrMedia.media;
     } else {
         media = messageOrMedia;
@@ -265,7 +265,6 @@ export async function downloadMedia(
     if (typeof media == "string") {
         throw new Error("not implemented");
     }
-
     if (media instanceof Api.MessageMediaWebPage) {
         if (media.webpage instanceof Api.WebPage) {
             media = media.webpage.document || media.webpage.photo;

+ 183 - 2
gramjs/client/telegramBaseClient.ts

@@ -1,5 +1,5 @@
-import { version } from "../";
-import { IS_NODE } from "../Helpers";
+import { TelegramClient, version } from "../";
+import { IS_NODE, sleep } from "../Helpers";
 import {
     ConnectionTCPFull,
     ConnectionTCPObfuscated,
@@ -15,6 +15,10 @@ import type { ParseInterface } from "./messageParse";
 import type { EventBuilder } from "../events/common";
 import { MarkdownParser } from "../extensions/markdown";
 import { MTProtoSender } from "../network";
+import { LAYER } from "../tl/AllTLObjects";
+
+const EXPORTED_SENDER_RECONNECT_TIMEOUT = 1000; // 1 sec
+const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
 
 const DEFAULT_DC_ID = 4;
 const DEFAULT_IPV4_IP = IS_NODE ? "149.154.167.91" : "vesta.web.telegram.org";
@@ -175,6 +179,14 @@ export class TelegramBaseClient {
         string,
         [ReturnType<typeof setTimeout>, Api.TypeUpdate[]]
     >();
+    private _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();
+    private _exportedSenderReleaseTimeouts = new Map<
+        number,
+        ReturnType<typeof setTimeout>
+    >();
+    protected _loopStarted: boolean;
+    _reconnecting: boolean;
+    _destroyed: boolean;
 
     constructor(
         session: string | Session,
@@ -235,6 +247,11 @@ export class TelegramBaseClient {
         this._selfInputPeer = undefined;
         this.useWSS = clientParams.useWSS!;
         this._entityCache = new EntityCache();
+        // These will be set later
+        this._config = undefined;
+        this._loopStarted = false;
+        this._reconnecting = false;
+        this._destroyed = false;
 
         // parse mode
         this._parseMode = MarkdownParser;
@@ -272,12 +289,38 @@ export class TelegramBaseClient {
         if (this._sender) {
             await this._sender.disconnect();
         }
+        await Promise.all(
+            Object.values(this._exportedSenderPromises).map(
+                (promise: Promise<MTProtoSender>) => {
+                    return (
+                        promise &&
+                        promise.then((sender: MTProtoSender) => {
+                            if (sender) {
+                                return sender.disconnect();
+                            }
+                            return undefined;
+                        })
+                    );
+                }
+            )
+        );
+
+        this._exportedSenderPromises = new Map<
+            number,
+            Promise<MTProtoSender>
+        >();
     }
 
     get disconnected() {
         return !this._sender || this._sender._disconnected;
     }
 
+    /**
+     * Disconnects all senders and removes all handlers
+     * @remarks
+     * This will also delete your session (not log out) so be careful with usage.
+     * Disconnect is safer as it will do almost the same while keeping your session file/
+     */
     async destroy() {
         await Promise.all([
             this.disconnect(),
@@ -297,5 +340,143 @@ export class TelegramBaseClient {
         await this.session.save();
     }
 
+    async _cleanupExportedSender(dcId: number) {
+        if (this.session.dcId !== dcId) {
+            this.session.setAuthKey(undefined, dcId);
+        }
+        let sender = await this._exportedSenderPromises.get(dcId);
+        this._exportedSenderPromises.delete(dcId);
+        await sender?.disconnect();
+    }
+
+    async _connectSender(sender: MTProtoSender, dcId: number) {
+        // if we don't already have an auth key we want to use normal DCs not -1
+        const dc = await this.getDC(dcId, !!sender.authKey.getKey());
+
+        while (true) {
+            try {
+                await sender.connect(
+                    new this._connection(dc.ipAddress, dc.port, dcId, this._log)
+                );
+
+                if (this.session.dcId !== dcId && !sender._authenticated) {
+                    this._log.info(
+                        `Exporting authorization for data center ${dc.ipAddress}`
+                    );
+                    const auth = await this.invoke(
+                        new Api.auth.ExportAuthorization({ dcId: dcId })
+                    );
+                    this._initRequest.query = new Api.auth.ImportAuthorization({
+                        id: auth.id,
+                        bytes: auth.bytes,
+                    });
+                    const req = new Api.InvokeWithLayer({
+                        layer: LAYER,
+                        query: this._initRequest,
+                    });
+                    await sender.send(req);
+                    sender._authenticated = true;
+                }
+                sender.dcId = dcId;
+                sender.userDisconnected = false;
+
+                return sender;
+            } catch (err) {
+                if (err.errorMessage === "DC_ID_INVALID") {
+                    sender._authenticated = true;
+                    sender.userDisconnected = false;
+                    return sender;
+                }
+                if (this._log.canSend("error")) {
+                    console.error(err);
+                }
+
+                await sleep(1000);
+                await sender.disconnect();
+            }
+        }
+    }
+
+    async _borrowExportedSender(
+        dcId: number,
+        shouldReconnect?: boolean,
+        existingSender?: MTProtoSender
+    ): Promise<MTProtoSender> {
+        if (!this._exportedSenderPromises.get(dcId) || shouldReconnect) {
+            this._exportedSenderPromises.set(
+                dcId,
+                this._connectSender(
+                    existingSender || this._createExportedSender(dcId),
+                    dcId
+                )
+            );
+        }
+
+        let sender: MTProtoSender;
+        try {
+            sender = await this._exportedSenderPromises.get(dcId)!;
+
+            if (!sender.isConnected()) {
+                if (sender.isConnecting) {
+                    await sleep(EXPORTED_SENDER_RECONNECT_TIMEOUT);
+                    return this._borrowExportedSender(dcId, false, sender);
+                } else {
+                    return this._borrowExportedSender(dcId, true, sender);
+                }
+            }
+        } catch (err) {
+            if (this._log.canSend("error")) {
+                console.error(err);
+            }
+            return this._borrowExportedSender(dcId, true);
+        }
+
+        if (this._exportedSenderReleaseTimeouts.get(dcId)) {
+            clearTimeout(this._exportedSenderReleaseTimeouts.get(dcId)!);
+            this._exportedSenderReleaseTimeouts.delete(dcId);
+        }
+
+        this._exportedSenderReleaseTimeouts.set(
+            dcId,
+            setTimeout(() => {
+                this._exportedSenderReleaseTimeouts.delete(dcId);
+                sender.disconnect();
+            }, EXPORTED_SENDER_RELEASE_TIMEOUT)
+        );
+
+        return sender;
+    }
+
+    _createExportedSender(dcId: number) {
+        return new MTProtoSender(this.session.getAuthKey(dcId), {
+            logger: this._log,
+            dcId,
+            retries: this._connectionRetries,
+            delay: this._retryDelay,
+            autoReconnect: this._autoReconnect,
+            connectTimeout: this._timeout,
+            authKeyCallback: this._authKeyCallback.bind(this),
+            isMainSender: dcId === this.session.dcId,
+            onConnectionBreak: this._cleanupExportedSender.bind(this),
+            client: this as unknown as TelegramClient,
+        });
+    }
+
+    getSender(dcId: number): Promise<MTProtoSender> {
+        return dcId
+            ? this._borrowExportedSender(dcId)
+            : Promise.resolve(this._sender!);
+    }
+
     // endregion
+    async getDC(
+        dcId: number,
+        download: boolean
+    ): Promise<{ id: number; ipAddress: string; port: number }> {
+        throw new Error("Cannot be called from here!");
+    }
+
+    invoke<R extends Api.AnyRequest>(request: R): Promise<R["__response"]> {
+        throw new Error("Cannot be called from here!");
+    }
 }

+ 72 - 18
gramjs/client/updates.ts

@@ -1,11 +1,17 @@
 import type { EventBuilder } from "../events/common";
 import { Api } from "../tl";
-import { helpers } from "../";
 import type { TelegramClient } from "../";
 import bigInt from "big-integer";
 import { UpdateConnectionState } from "../network";
 import type { Raw } from "../events";
 import { utils } from "../index";
+import { getRandomInt, sleep } from "../Helpers";
+
+const PING_INTERVAL = 3000; // 3 sec
+const PING_TIMEOUT = 5000; // 5 sec
+const PING_FAIL_ATTEMPTS = 3;
+const PING_FAIL_INTERVAL = 100; // ms
+const PING_DISCONNECT_DELAY = 60000; // 1 min
 
 /** @hidden */
 export function on(client: TelegramClient, event?: EventBuilder) {
@@ -149,22 +155,42 @@ export async function _dispatchUpdate(
 
 /** @hidden */
 export async function _updateLoop(client: TelegramClient): Promise<void> {
-    while (client.connected) {
-        const rnd = helpers.getRandomInt(
-            Number.MIN_SAFE_INTEGER,
-            Number.MAX_SAFE_INTEGER
-        );
-        await helpers.sleep(1000 * 60);
-        // We don't care about the result we just want to send it every
-        // 60 seconds so telegram doesn't stop the connection
+    while (!client._destroyed) {
+        await sleep(PING_INTERVAL);
+        if (client._reconnecting) {
+            continue;
+        }
+
         try {
-            client._sender!.send(
-                new Api.Ping({
-                    pingId: bigInt(rnd),
-                })
+            await attempts(
+                () => {
+                    return timeout(
+                        client._sender!.send(
+                            new Api.PingDelayDisconnect({
+                                pingId: bigInt(
+                                    getRandomInt(
+                                        Number.MIN_SAFE_INTEGER,
+                                        Number.MAX_SAFE_INTEGER
+                                    )
+                                ),
+                                disconnectDelay: PING_DISCONNECT_DELAY,
+                            })
+                        ),
+                        PING_TIMEOUT
+                    );
+                },
+                PING_FAIL_ATTEMPTS,
+                PING_FAIL_INTERVAL
             );
-        } catch (e) {
-            //await client.disconnect()
+        } catch (err) {
+            // eslint-disable-next-line no-console
+            client._log.error(err);
+            if (client._reconnecting) {
+                continue;
+            }
+
+            await client.disconnect();
+            await client.connect();
         }
 
         // We need to send some content-related request at least hourly
@@ -173,12 +199,40 @@ export async function _updateLoop(client: TelegramClient): Promise<void> {
 
         // TODO Call getDifference instead since it's more relevant
         if (
-            !client._lastRequest ||
-            new Date().getTime() - client._lastRequest > 30 * 60 * 1000
+            new Date().getTime() - (client._lastRequest || 0) >
+            30 * 60 * 1000
         ) {
             try {
                 await client.invoke(new Api.updates.GetState());
-            } catch (e) {}
+            } catch (e) {
+                // we don't care about errors here
+            }
         }
     }
+    await client.disconnect();
+}
+
+/** @hidden */
+async function attempts(cb: CallableFunction, times: number, pause: number) {
+    for (let i = 0; i < times; i++) {
+        try {
+            // We need to `return await` here so it can be caught locally
+            return await cb();
+        } catch (err) {
+            if (i === times - 1) {
+                throw err;
+            }
+
+            await sleep(pause);
+        }
+    }
+    return undefined;
+}
+
+/** @hidden */
+function timeout(promise: Promise<any>, ms: number) {
+    return Promise.race([
+        promise,
+        sleep(ms).then(() => Promise.reject(new Error("TIMEOUT"))),
+    ]);
 }

+ 49 - 42
gramjs/client/uploads.ts

@@ -7,7 +7,7 @@ import { getAppropriatedPartSize } from "../Utils";
 import { EntityLike, FileLike, MarkupLike, MessageIDLike } from "../define";
 import path from "path";
 import { promises as fs } from "fs";
-import { utils } from "../index";
+import { errors, utils } from "../index";
 import { _parseMessageText } from "./messageParse";
 
 interface OnProgress {
@@ -58,6 +58,7 @@ export class CustomFile {
 const KB_TO_BYTES = 1024;
 const LARGE_FILE_THRESHOLD = 10 * 1024 * 1024;
 const UPLOAD_TIMEOUT = 15 * 1000;
+const DISCONNECT_SLEEP = 1000;
 
 /** @hidden */
 export async function uploadFile(
@@ -75,8 +76,8 @@ export async function uploadFile(
     const partCount = Math.floor((size + partSize - 1) / partSize);
     const buffer = Buffer.from(await fileToBuffer(file));
 
-    // We always upload from the DC we are in.
-    const sender = await client._borrowExportedSender(client.session.dcId);
+    // Make sure a new sender can be created before starting upload
+    await client.getSender(client.session.dcId);
 
     if (!workers || !size) {
         workers = 1;
@@ -91,7 +92,7 @@ export async function uploadFile(
     }
 
     for (let i = 0; i < partCount; i += workers) {
-        let sendingParts = [];
+        const sendingParts = [];
         let end = i + workers;
         if (end > partCount) {
             end = partCount;
@@ -100,51 +101,58 @@ export async function uploadFile(
         for (let j = i; j < end; j++) {
             const bytes = buffer.slice(j * partSize, (j + 1) * partSize);
 
+            // eslint-disable-next-line no-loop-func
             sendingParts.push(
-                (async () => {
-                    await sender.send(
-                        isLarge
-                            ? new Api.upload.SaveBigFilePart({
-                                  fileId,
-                                  filePart: j,
-                                  fileTotalParts: partCount,
-                                  bytes,
-                              })
-                            : new Api.upload.SaveFilePart({
-                                  fileId,
-                                  filePart: j,
-                                  bytes,
-                              })
-                    );
-
-                    if (onProgress) {
-                        if (onProgress.isCanceled) {
-                            throw new Error("USER_CANCELED");
+                (async (jMemo: number, bytesMemo: Buffer) => {
+                    while (true) {
+                        let sender;
+                        try {
+                            // We always upload from the DC we are in
+                            sender = await client.getSender(
+                                client.session.dcId
+                            );
+                            await sender.send(
+                                isLarge
+                                    ? new Api.upload.SaveBigFilePart({
+                                          fileId,
+                                          filePart: jMemo,
+                                          fileTotalParts: partCount,
+                                          bytes: bytesMemo,
+                                      })
+                                    : new Api.upload.SaveFilePart({
+                                          fileId,
+                                          filePart: jMemo,
+                                          bytes: bytesMemo,
+                                      })
+                            );
+                        } catch (err) {
+                            if (sender && !sender.isConnected()) {
+                                await sleep(DISCONNECT_SLEEP);
+                                continue;
+                            } else if (err instanceof errors.FloodWaitError) {
+                                await sleep(err.seconds * 1000);
+                                continue;
+                            }
+                            throw err;
                         }
 
-                        progress += 1 / partCount;
-                        onProgress(progress);
+                        if (onProgress) {
+                            if (onProgress.isCanceled) {
+                                throw new Error("USER_CANCELED");
+                            }
+
+                            progress += 1 / partCount;
+                            onProgress(progress);
+                        }
+                        break;
                     }
-                })()
+                })(j, bytes)
             );
         }
-        try {
-            await Promise.race([
-                await Promise.all(sendingParts),
-                sleep(UPLOAD_TIMEOUT * workers).then(() =>
-                    Promise.reject(new Error("TIMEOUT"))
-                ),
-            ]);
-        } catch (err) {
-            if (err.error === "TIMEOUT") {
-                console.warn("Upload timeout. Retrying...");
-                i -= workers;
-                continue;
-            }
 
-            throw err;
-        }
+        await Promise.all(sendingParts);
     }
+
     return isLarge
         ? new Api.InputFileBig({
               id: fileId,
@@ -158,7 +166,6 @@ export async function uploadFile(
               md5Checksum: "", // This is not a "flag", so not sure if we can make it optional.
           });
 }
-
 /**
  * Interface for sending files to a chat.
  */

+ 13 - 45
gramjs/crypto/RSA.ts

@@ -1,3 +1,4 @@
+import bigInt from "big-integer";
 import {
     generateRandomBytes,
     modExp,
@@ -5,62 +6,29 @@ import {
     readBufferFromBigInt,
     sha1,
 } from "../Helpers";
-import bigInt from "big-integer";
 
 const PUBLIC_KEYS = [
     {
-        fingerprint: [
-            40, 85, 94, 156, 117, 240, 61, 22, 65, 244, 169, 2, 33, 107, 232,
-            108, 2, 43, 180, 195,
-        ],
-        n: bigInt(
-            "24403446649145068056824081744112065346446136066297307473868293895086332508101251964919587745984311372853053253457835208829824428441874946556659953519213382748319518214765985662663680818277989736779506318868003755216402538945900388706898101286548187286716959100102939636333452457308619454821845196109544157601096359148241435922125602449263164512290854366930013825808102403072317738266383237191313714482187326643144603633877219028262697593882410403273959074350849923041765639673335775605842311578109726403165298875058941765362622936097839775380070572921007586266115476975819175319995527916042178582540628652481530373407"
-        ),
-        e: 65537,
-    },
-    {
-        fingerprint: [
-            140, 171, 9, 34, 146, 246, 166, 50, 10, 170, 229, 247, 155, 114, 28,
-            177, 29, 106, 153, 154,
-        ],
-        n: bigInt(
-            "25081407810410225030931722734886059247598515157516470397242545867550116598436968553551465554653745201634977779380884774534457386795922003815072071558370597290368737862981871277312823942822144802509055492512145589734772907225259038113414940384446493111736999668652848440655603157665903721517224934142301456312994547591626081517162758808439979745328030376796953660042629868902013177751703385501412640560275067171555763725421377065095231095517201241069856888933358280729674273422117201596511978645878544308102076746465468955910659145532699238576978901011112475698963666091510778777356966351191806495199073754705289253783"
-        ),
-        e: 65537,
-    },
-    {
-        fingerprint: [
-            243, 218, 109, 239, 16, 202, 176, 78, 167, 8, 255, 209, 120, 234,
-            205, 112, 111, 42, 91, 176,
-        ],
-        n: bigInt(
-            "22347337644621997830323797217583448833849627595286505527328214795712874535417149457567295215523199212899872122674023936713124024124676488204889357563104452250187725437815819680799441376434162907889288526863223004380906766451781702435861040049293189979755757428366240570457372226323943522935844086838355728767565415115131238950994049041950699006558441163206523696546297006014416576123345545601004508537089192869558480948139679182328810531942418921113328804749485349441503927570568778905918696883174575510385552845625481490900659718413892216221539684717773483326240872061786759868040623935592404144262688161923519030977"
-        ),
-        e: 65537,
-    },
-    {
-        fingerprint: [
-            128, 80, 214, 72, 77, 244, 98, 7, 201, 250, 37, 244, 227, 51, 96,
-            199, 182, 37, 224, 113,
-        ],
+        fingerprint: bigInt("-3414540481677951611"),
         n: bigInt(
-            "24573455207957565047870011785254215390918912369814947541785386299516827003508659346069416840622922416779652050319196701077275060353178142796963682024347858398319926119639265555410256455471016400261630917813337515247954638555325280392998950756512879748873422896798579889820248358636937659872379948616822902110696986481638776226860777480684653756042166610633513404129518040549077551227082262066602286208338952016035637334787564972991208252928951876463555456715923743181359826124083963758009484867346318483872552977652588089928761806897223231500970500186019991032176060579816348322451864584743414550721639495547636008351"
+            "2937959817066933702298617714945612856538843112005886376816255642404751219133084745514657634448776440866" +
+                "1701890505066208632169112269581063774293102577308490531282748465986139880977280302242772832972539403531" +
+                "3160108704012876427630091361567343395380424193887227773571344877461690935390938502512438971889287359033" +
+                "8945177273024525306296338410881284207988753897636046529094613963869149149606209957083647645485599631919" +
+                "2747663615955633778034897140982517446405334423701359108810182097749467210509584293428076654573384828809" +
+                "574217079944388301239431309115013843331317877374435868468779972014486325557807783825502498215169806323"
         ),
         e: 65537,
     },
 ];
 
-const _serverKeys = new Map();
+export const _serverKeys = new Map<
+    string,
+    { n: bigInt.BigInteger; e: number }
+>();
 
 PUBLIC_KEYS.forEach(({ fingerprint, ...keyInfo }) => {
-    _serverKeys.set(
-        readBigIntFromBuffer(
-            Buffer.from(fingerprint.slice(-8)),
-            true,
-            true
-        ).toString(),
-        keyInfo
-    );
+    _serverKeys.set(fingerprint.toString(), keyInfo);
 });
 
 /**

+ 2 - 2
gramjs/errors/Common.ts

@@ -57,8 +57,8 @@ export class InvalidChecksumError extends Error {
  * For instance, 404 means "forgotten/broken authorization key", while
  */
 export class InvalidBufferError extends Error {
-    private code?: number;
-    private payload: Buffer;
+    code?: number;
+    payload: Buffer;
 
     constructor(payload: Buffer) {
         let code = undefined;

+ 20 - 17
gramjs/extensions/MessagePacker.ts

@@ -14,7 +14,8 @@ const USE_INVOKE_AFTER_WITH = [
 ];
 
 export class MessagePacker {
-    private _state: any;
+    private _state: MTProtoState;
+    private _pendingStates: RequestState[];
     private _queue: any[];
     private _ready: Promise<unknown>;
     private setReady: ((value?: any) => void) | undefined;
@@ -23,6 +24,8 @@ export class MessagePacker {
     constructor(state: MTProtoState, logger: any) {
         this._state = state;
         this._queue = [];
+        this._pendingStates = [];
+
         this._ready = new Promise((resolve) => {
             this.setReady = resolve;
         });
@@ -34,27 +37,22 @@ export class MessagePacker {
     }
 
     append(state: RequestState) {
-        /* TODO later. still need fixes
-        // we need to check if there is already a request with the same name that we should send after.
-        if (USE_INVOKE_AFTER_WITH.includes(state.request.className)) {
-            // we now need to check if there is any request in queue already.
-            for (let i = this._queue.length - 1; i >= 0; i--) {
-                if (
-                    USE_INVOKE_AFTER_WITH.includes(
-                        this._queue[i].request.className
-                    )
-                ) {
-                    state.after = this._queue[i];
-                    break;
-                }
-            }
-        }
-        */
         this._queue.push(state);
 
         if (this.setReady) {
             this.setReady(true);
         }
+        if (state) {
+            this._pendingStates.push(state);
+            state.promise
+                // Using finally causes triggering `unhandledrejection` event
+                .catch(() => {})
+                .finally(() => {
+                    this._pendingStates = this._pendingStates.filter(
+                        (s) => s !== state
+                    );
+                });
+        }
     }
 
     extend(states: RequestState[]) {
@@ -140,4 +138,9 @@ export class MessagePacker {
         data = buffer.getValue();
         return { batch, data };
     }
+    rejectAll() {
+        this._pendingStates.forEach((requestState) => {
+            requestState.reject(new Error("Disconnect"));
+        });
+    }
 }

+ 99 - 47
gramjs/network/Authenticator.ts

@@ -4,66 +4,128 @@
  * @param log
  * @returns {Promise<{authKey: *, timeOffset: *}>}
  */
+import { MTProtoPlainSender } from "./MTProtoPlainSender";
+import {
+    bufferXor,
+    generateKeyDataFromNonce,
+    generateRandomBytes,
+    getByteArray,
+    modExp,
+    readBigIntFromBuffer,
+    readBufferFromBigInt,
+    sha1,
+    sha256,
+    toSignedLittleBuffer,
+} from "../Helpers";
 import { Api } from "../tl";
 import { SecurityError } from "../errors";
 import { Factorizator } from "../crypto/Factorizator";
+import { _serverKeys } from "../crypto/RSA";
 import { IGE } from "../crypto/IGE";
+import bigInt from "big-integer";
 import { BinaryReader } from "../extensions";
 import { AuthKey } from "../crypto/AuthKey";
-import { helpers } from "../";
-import { encrypt } from "../crypto/RSA";
-import bigInt from "big-integer";
-import type { MTProtoPlainSender } from "./MTProtoPlainSender";
+
+const RETRIES = 20;
 
 export async function doAuthentication(sender: MTProtoPlainSender, log: any) {
     // Step 1 sending: PQ Request, endianness doesn't matter since it's random
-    let bytes = helpers.generateRandomBytes(16);
+    let bytes = generateRandomBytes(16);
 
-    const nonce = helpers.readBigIntFromBuffer(bytes, false, true);
-    const resPQ = await sender.send(new Api.ReqPqMulti({ nonce: nonce }));
+    const nonce = readBigIntFromBuffer(bytes, false, true);
+    const resPQ = await sender.send(new Api.ReqPqMulti({ nonce }));
     log.debug("Starting authKey generation step 1");
 
     if (!(resPQ instanceof Api.ResPQ)) {
-        throw new Error(`Step 1 answer was ${resPQ}`);
+        throw new SecurityError(`Step 1 answer was ${resPQ}`);
     }
     if (resPQ.nonce.neq(nonce)) {
         throw new SecurityError("Step 1 invalid nonce from server");
     }
-    const pq = helpers.readBigIntFromBuffer(resPQ.pq, false, true);
+    const pq = readBigIntFromBuffer(resPQ.pq, false, true);
     log.debug("Finished authKey generation step 1");
-    log.debug("Starting authKey generation step 2");
     // Step 2 sending: DH Exchange
-    let { p, q } = Factorizator.factorize(pq);
+    const { p, q } = Factorizator.factorize(pq);
 
-    const pBuffer = helpers.getByteArray(p);
-    const qBuffer = helpers.getByteArray(q);
+    const pBuffer = getByteArray(p);
+    const qBuffer = getByteArray(q);
 
-    bytes = helpers.generateRandomBytes(32);
-    const newNonce = helpers.readBigIntFromBuffer(bytes, true, true);
+    bytes = generateRandomBytes(32);
+    const newNonce = readBigIntFromBuffer(bytes, true, true);
     const pqInnerData = new Api.PQInnerData({
-        pq: helpers.getByteArray(pq), // unsigned
+        pq: getByteArray(pq), // unsigned
         p: pBuffer,
         q: qBuffer,
         nonce: resPQ.nonce,
         serverNonce: resPQ.serverNonce,
-        newNonce: newNonce,
-    });
-
-    // sha_digest + data + random_bytes
-    let cipherText = undefined;
-    let targetFingerprint = undefined;
+        newNonce,
+    }).getBytes();
+    if (pqInnerData.length > 144) {
+        throw new SecurityError("Step 1 invalid nonce from server");
+    }
+    let targetFingerprint;
+    let targetKey;
     for (const fingerprint of resPQ.serverPublicKeyFingerprints) {
-        cipherText = await encrypt(fingerprint, pqInnerData.getBytes());
-        if (cipherText !== undefined) {
+        targetKey = _serverKeys.get(fingerprint.toString());
+        if (targetKey !== undefined) {
             targetFingerprint = fingerprint;
             break;
         }
     }
-    if (cipherText === undefined) {
+    if (targetFingerprint === undefined || targetKey === undefined) {
         throw new SecurityError(
             "Step 2 could not find a valid key for fingerprints"
         );
     }
+    // Value should be padded to be made 192 exactly
+    const padding = generateRandomBytes(192 - pqInnerData.length);
+    const dataWithPadding = Buffer.concat([pqInnerData, padding]);
+    const dataPadReversed = Buffer.from(dataWithPadding).reverse();
+
+    let encryptedData;
+    for (let i = 0; i < RETRIES; i++) {
+        const tempKey = generateRandomBytes(32);
+        const shaDigestKeyWithData = await sha256(
+            Buffer.concat([tempKey, dataWithPadding])
+        );
+        const dataWithHash = Buffer.concat([
+            dataPadReversed,
+            shaDigestKeyWithData,
+        ]);
+
+        const ige = new IGE(tempKey, Buffer.alloc(32));
+        const aesEncrypted = ige.encryptIge(dataWithHash);
+        const tempKeyXor = bufferXor(tempKey, await sha256(aesEncrypted));
+
+        const keyAesEncrypted = Buffer.concat([tempKeyXor, aesEncrypted]);
+        const keyAesEncryptedInt = readBigIntFromBuffer(
+            keyAesEncrypted,
+            false,
+            false
+        );
+        if (keyAesEncryptedInt.greaterOrEquals(targetKey.n)) {
+            log.debug("Aes key greater than RSA. retrying");
+            continue;
+        }
+        const encryptedDataBuffer = modExp(
+            keyAesEncryptedInt,
+            bigInt(targetKey.e),
+            targetKey.n
+        );
+        encryptedData = readBufferFromBigInt(
+            encryptedDataBuffer,
+            256,
+            false,
+            false
+        );
+
+        break;
+    }
+    if (encryptedData === undefined) {
+        throw new SecurityError("Step 2 could create a secure encrypted key");
+    }
+    log.debug("Step 2 : Generated a secure aes encrypted data");
+
     const serverDhParams = await sender.send(
         new Api.ReqDHParams({
             nonce: resPQ.nonce,
@@ -71,7 +133,7 @@ export async function doAuthentication(sender: MTProtoPlainSender, log: any) {
             p: pBuffer,
             q: qBuffer,
             publicKeyFingerprint: targetFingerprint,
-            encryptedData: cipherText,
+            encryptedData,
         })
     );
 
@@ -92,10 +154,8 @@ export async function doAuthentication(sender: MTProtoPlainSender, log: any) {
     }
 
     if (serverDhParams instanceof Api.ServerDHParamsFail) {
-        const sh = await helpers.sha1(
-            helpers.toSignedLittleBuffer(newNonce, 32).slice(4, 20)
-        );
-        const nnh = helpers.readBigIntFromBuffer(sh, true, true);
+        const sh = await sha1(toSignedLittleBuffer(newNonce, 32).slice(4, 20));
+        const nnh = readBigIntFromBuffer(sh, true, true);
         if (serverDhParams.newNonceHash.neq(nnh)) {
             throw new SecurityError("Step 2 invalid DH fail nonce from server");
         }
@@ -107,7 +167,7 @@ export async function doAuthentication(sender: MTProtoPlainSender, log: any) {
     log.debug("Starting authKey generation step 3");
 
     // Step 3 sending: Complete DH Exchange
-    const { key, iv } = await helpers.generateKeyDataFromNonce(
+    const { key, iv } = await generateKeyDataFromNonce(
         resPQ.serverNonce,
         newNonce
     );
@@ -132,32 +192,24 @@ export async function doAuthentication(sender: MTProtoPlainSender, log: any) {
             "Step 3 Invalid server nonce in encrypted answer"
         );
     }
-    const dhPrime = helpers.readBigIntFromBuffer(
-        serverDhInner.dhPrime,
-        false,
-        false
-    );
-    const ga = helpers.readBigIntFromBuffer(serverDhInner.gA, false, false);
+    const dhPrime = readBigIntFromBuffer(serverDhInner.dhPrime, false, false);
+    const ga = readBigIntFromBuffer(serverDhInner.gA, false, false);
     const timeOffset =
         serverDhInner.serverTime - Math.floor(new Date().getTime() / 1000);
-    const b = helpers.readBigIntFromBuffer(
-        helpers.generateRandomBytes(256),
-        false,
-        false
-    );
-    const gb = helpers.modExp(bigInt(serverDhInner.g), b, dhPrime);
-    const gab = helpers.modExp(ga, b, dhPrime);
+    const b = readBigIntFromBuffer(generateRandomBytes(256), false, false);
+    const gb = modExp(bigInt(serverDhInner.g), b, dhPrime);
+    const gab = modExp(ga, b, dhPrime);
 
     // Prepare client DH Inner Data
     const clientDhInner = new Api.ClientDHInnerData({
         nonce: resPQ.nonce,
         serverNonce: resPQ.serverNonce,
         retryId: bigInt.zero, // TODO Actual retry ID
-        gB: helpers.getByteArray(gb, false),
+        gB: getByteArray(gb, false),
     }).getBytes();
 
     const clientDdhInnerHashed = Buffer.concat([
-        await helpers.sha1(clientDhInner),
+        await sha1(clientDhInner),
         clientDhInner,
     ]);
     // Encryption
@@ -192,7 +244,7 @@ export async function doAuthentication(sender: MTProtoPlainSender, log: any) {
         );
     }
     const authKey = new AuthKey();
-    await authKey.setKey(helpers.getByteArray(gab));
+    await authKey.setKey(getByteArray(gab));
 
     const nonceNumber = 1 + nonceTypesString.indexOf(dhGen.className);
 

+ 112 - 72
gramjs/network/MTProtoSender.ts

@@ -13,7 +13,7 @@
  */
 import { AuthKey } from "../crypto/AuthKey";
 import { MTProtoState } from "./MTProtoState";
-import { BinaryReader } from "../extensions";
+import { BinaryReader, Logger } from "../extensions";
 import { MessagePacker } from "../extensions";
 import { GZIPPacked, MessageContainer, RPCResult, TLMessage } from "../tl/core";
 import { Api } from "../tl";
@@ -45,6 +45,7 @@ interface DEFAULT_OPTIONS {
     dcId: number;
     senderCallback?: any;
     client: TelegramClient;
+    onConnectionBreak?: CallableFunction;
 }
 
 export class MTProtoSender {
@@ -59,9 +60,10 @@ export class MTProtoSender {
         autoReconnectCallback: null,
         isMainSender: null,
         senderCallback: null,
+        onConnectionBreak: undefined,
     };
     private _connection?: Connection;
-    private readonly _log: any;
+    private readonly _log: Logger;
     private _dcId: number;
     private readonly _retries: number;
     private readonly _delay: number;
@@ -70,13 +72,13 @@ export class MTProtoSender {
     private readonly _authKeyCallback: any;
     private readonly _updateCallback: (
         client: TelegramClient,
-        update: Api.TypeUpdate | number
+        update: UpdateConnectionState
     ) => void;
     private readonly _autoReconnectCallback?: any;
     private readonly _senderCallback: any;
     private readonly _isMainSender: boolean;
-    private _userConnected: boolean;
-    private _reconnecting: boolean;
+    _userConnected: boolean;
+    _reconnecting: boolean;
     _disconnected: boolean;
     private _sendLoopHandle: any;
     private _recvLoopHandle: any;
@@ -88,6 +90,10 @@ export class MTProtoSender {
     private readonly _lastAcks: any[];
     private readonly _handlers: any;
     private readonly _client: TelegramClient;
+    private readonly _onConnectionBreak?: CallableFunction;
+    userDisconnected: boolean;
+    isConnecting: boolean;
+    _authenticated: boolean;
 
     /**
      * @param authKey
@@ -108,15 +114,21 @@ export class MTProtoSender {
         this._isMainSender = args.isMainSender;
         this._senderCallback = args.senderCallback;
         this._client = args.client;
+        this._onConnectionBreak = args.onConnectionBreak;
+
+        /**
+         * whether we disconnected ourself or telegram did it.
+         */
+        this.userDisconnected = false;
 
         /**
-         * Whether the user has explicitly connected or disconnected.
-         *
          * If a disconnection happens for any other reason and it
          * was *not* user action then the pending messages won't
          * be cleared but on explicit user disconnection all the
          * pending futures should be cancelled.
          */
+        this.isConnecting = false;
+        this._authenticated = false;
         this._userConnected = false;
         this._reconnecting = false;
         this._disconnected = true;
@@ -202,36 +214,46 @@ export class MTProtoSender {
 
     /**
      * Connects to the specified given connection using the given auth key.
-     * @param connection
-     * @param eventDispatch {function}
-     * @returns {Promise<boolean>}
      */
-    async connect(connection: any, eventDispatch?: any) {
-        if (this._userConnected) {
+    async connect(connection: Connection, force?: boolean) {
+        if (this._userConnected && !force) {
             this._log.info("User is already connected!");
             return false;
         }
+        this.isConnecting = true;
         this._connection = connection;
 
-        const retries = this._retries;
-
-        for (let attempt = 0; attempt < retries; attempt++) {
+        for (let attempt = 0; attempt < this._retries; attempt++) {
             try {
                 await this._connect();
+                if (this._updateCallback) {
+                    this._updateCallback(
+                        this._client,
+                        new UpdateConnectionState(
+                            UpdateConnectionState.connected
+                        )
+                    );
+                }
                 break;
-            } catch (e) {
-                if (attempt === 0 && eventDispatch) {
-                    eventDispatch({ update: new UpdateConnectionState(-1) });
+            } catch (err) {
+                if (this._updateCallback && attempt === 0) {
+                    this._updateCallback(
+                        this._client,
+                        new UpdateConnectionState(
+                            UpdateConnectionState.disconnected
+                        )
+                    );
                 }
                 this._log.error(
-                    "WebSocket connection failed attempt : " + (attempt + 1)
+                    `WebSocket connection failed attempt: ${attempt + 1}`
                 );
                 if (this._log.canSend("error")) {
-                    console.error(e);
+                    console.error(err);
                 }
                 await sleep(this._delay);
             }
         }
+        this.isConnecting = false;
         return true;
     }
 
@@ -244,6 +266,7 @@ export class MTProtoSender {
      * all pending requests, and closes the send and receive loops.
      */
     async disconnect() {
+        this.userDisconnected = true;
         await this._disconnect();
     }
 
@@ -296,7 +319,7 @@ export class MTProtoSender {
         );
         await this._connection!.connect();
         this._log.debug("Connection success!");
-        //process.exit(0)
+
         if (!this.authKey.getKey()) {
             const plain = new MTProtoPlainSender(this._connection, this._log);
             this._log.debug("New auth_key attempt ...");
@@ -316,6 +339,7 @@ export class MTProtoSender {
                 await this._authKeyCallback(this.authKey, this._dcId);
             }
         } else {
+            this._authenticated = true;
             this._log.debug("Already have an auth key ...");
         }
         this._userConnected = true;
@@ -339,12 +363,17 @@ export class MTProtoSender {
     }
 
     async _disconnect(error = null) {
+        this._sendQueue.rejectAll();
+
         if (this._connection === null) {
             this._log.info("Not disconnecting (already have no connection)");
             return;
         }
         if (this._updateCallback) {
-            this._updateCallback(this._client, -1);
+            this._updateCallback(
+                this._client,
+                new UpdateConnectionState(UpdateConnectionState.disconnected)
+            );
         }
         this._log.info(
             "Disconnecting from %s...".replace(
@@ -389,11 +418,11 @@ export class MTProtoSender {
             }
 
             if (!res) {
-                this._log.debug("Empty result. will stop loop");
+                this._log.debug("Empty result. will not stop loop");
                 continue;
             }
-            let data = res.data;
-            const batch = res.batch;
+            let { data } = res;
+            const { batch } = res;
             this._log.debug(
                 `Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`
             );
@@ -434,9 +463,13 @@ export class MTProtoSender {
             try {
                 body = await this._connection!.recv();
             } catch (e) {
-                // this._log.info('Connection closed while receiving data');
+                /** when the server disconnects us we want to reconnect */
                 this._log.warn("Connection closed while receiving data");
-                this._startReconnect();
+                if (!this.userDisconnected) {
+                    this._log.error(e);
+                    this._log.warn("Connection closed while receiving data");
+                    this.reconnect();
+                }
                 return;
             }
             try {
@@ -456,29 +489,39 @@ export class MTProtoSender {
                     );
                     continue;
                 } else if (e instanceof InvalidBufferError) {
-                    this._log.info("Broken authorization key; resetting");
-                    if (this._updateCallback && this._isMainSender) {
-                        // 0 == broken
-                        this._updateCallback(this._client, 0);
-                    } else if (this._senderCallback && !this._isMainSender) {
-                        // Deletes the current sender from the object
-                        this._senderCallback(this._dcId);
+                    // 404 means that the server has "forgotten" our auth key and we need to create a new one.
+                    if (e.code === 404) {
+                        this._log.warn(
+                            `Broken authorization key for dc ${this._dcId}; resetting`
+                        );
+                        if (this._updateCallback && this._isMainSender) {
+                            this._updateCallback(
+                                this._client,
+                                new UpdateConnectionState(
+                                    UpdateConnectionState.broken
+                                )
+                            );
+                        } else if (
+                            this._onConnectionBreak &&
+                            !this._isMainSender
+                        ) {
+                            // Deletes the current sender from the object
+                            this._onConnectionBreak(this._dcId);
+                        }
+                    } else {
+                        // this happens sometimes when telegram is having some internal issues.
+                        // reconnecting should be enough usually
+                        // since the data we sent and received is probably wrong now.
+                        this._log.warn(
+                            `Invalid buffer ${e.code} for dc ${this._dcId}`
+                        );
+                        this.reconnect();
                     }
-
-                    // We don't really need to do this if we're going to sign in again
-                    /*await this.authKey.setKey(null)
-
-                    if (this._authKeyCallback) {
-                        await this._authKeyCallback(null)
-                    }*/
-                    // We can disconnect at sign in
-                    /* await this.disconnect()
-                     */
                     return;
                 } else {
                     this._log.error("Unhandled error while receiving data");
                     this._log.error(e);
-                    this._startReconnect();
+                    this.reconnect();
                     return;
                 }
             }
@@ -849,13 +892,17 @@ export class MTProtoSender {
      */
     async _handleMsgAll(message: TLMessage) {}
 
-    async _startReconnect() {
+    reconnect() {
         if (this._userConnected && !this._reconnecting) {
             this._reconnecting = true;
             // TODO Should we set this?
             // this._user_connected = false
-            this._log.info("Started reconnecting");
-            this._reconnect();
+            // we want to wait a second between each reconnect try to not flood the server with reconnects
+            // in case of internal server issues.
+            sleep(1000).then(() => {
+                this._log.info("Started reconnecting");
+                this._reconnect();
+            });
         }
     }
 
@@ -867,33 +914,26 @@ export class MTProtoSender {
             this._log.warn(err);
         }
         // @ts-ignore
-        this._sendQueue.append(null);
+        this._sendQueue.append(undefined);
         this._state.reset();
-        const retries = this._retries;
 
-        for (let attempt = 0; attempt < retries; attempt++) {
-            try {
-                await this._connect();
-                // uncomment this if you want to resend
-                this._sendQueue.extend(Array.from(this._pendingState.values()));
-                this._pendingState = new Map<string, RequestState>();
-                if (this._autoReconnectCallback) {
-                    await this._autoReconnectCallback();
-                }
-                if (this._updateCallback) {
-                    this._updateCallback(this._client, 1);
-                }
+        // For some reason reusing existing connection caused stuck requests
+        const constructor = this._connection!
+            .constructor as unknown as typeof Connection;
 
-                break;
-            } catch (e) {
-                this._log.error(
-                    "WebSocket connection failed attempt : " + (attempt + 1)
-                );
-                if (this._log.canSend("error")) {
-                    console.error(e);
-                }
-                await sleep(this._delay);
-            }
+        const newConnection = new constructor(
+            this._connection!._ip,
+            this._connection!._port,
+            this._connection!._dcId,
+            this._connection!._log
+        );
+        await this.connect(newConnection, true);
+
+        this._reconnecting = false;
+        this._sendQueue.extend(Array.from(this._pendingState.values()));
+        this._pendingState = new Map<string, RequestState>();
+        if (this._autoReconnectCallback) {
+            await this._autoReconnectCallback();
         }
     }
 }

+ 1 - 1
gramjs/network/MTProtoState.ts

@@ -110,7 +110,7 @@ export class MTProtoState {
         buffer: BinaryWriter,
         data: Buffer,
         contentRelated: boolean,
-        afterId: bigInt.BigInteger
+        afterId?: bigInt.BigInteger
     ) {
         const msgId = this._getNewMsgId();
         const seqNo = this._getSeqNo(contentRelated);

+ 4 - 4
gramjs/network/connection/Connection.ts

@@ -16,10 +16,10 @@ import { IS_NODE } from "../../Helpers";
 class Connection {
     // @ts-ignore
     PacketCodecClass: any; //"typeof AbridgedPacketCodec|typeof FullPacketCodec|typeof ObfuscatedConnection as "
-    private readonly _ip: string;
-    private readonly _port: number;
-    private _dcId: number;
-    private _log: any;
+    readonly _ip: string;
+    readonly _port: number;
+    _dcId: number;
+    _log: any;
     private _connected: boolean;
     private _sendTask?: Promise<void>;
     private _recvTask?: Promise<void>;

+ 6 - 6
gramjs/network/index.ts

@@ -9,12 +9,12 @@ interface states {
 }
 
 export class UpdateConnectionState {
-    static states = {
-        disconnected: -1,
-        connected: 1,
-        broken: 0,
-    };
-    private state: number;
+    static disconnected = -1;
+
+    static connected = 1;
+
+    static broken = 0;
+    state: number;
 
     constructor(state: number) {
         this.state = state;

+ 5 - 2
gramjs/tl/custom/message.ts

@@ -8,7 +8,10 @@ import { Forward } from "./forward";
 import type { File } from "./file";
 import { Mixin } from "ts-mixer";
 import { EditMessageParams, SendMessageParams } from "../../client/messages";
-import { DownloadFileParams } from "../../client/downloads";
+import {
+    DownloadFileParams,
+    DownloadMediaInterface,
+} from "../../client/downloads";
 import { inspect } from "util";
 import { betterConsoleLog } from "../../Helpers";
 import { _selfId } from "../../client/users";
@@ -794,7 +797,7 @@ export class Message extends Mixin(SenderGetter, ChatGetter) {
         }
     }
 
-    async downloadMedia(params: DownloadFileParams) {
+    async downloadMedia(params: DownloadMediaInterface) {
         if (this._client) return this._client.downloadMedia(this, params);
     }
 

+ 1 - 1
gramjs/tl/generationHelpers.ts

@@ -245,7 +245,7 @@ const parseTl = function* (
     // Once all objects have been parsed, replace the
     // string type from the arguments with references
     for (const obj of objAll) {
-        //console.log(obj)
+
         if (AUTH_KEY_TYPES.has(obj.constructorId)) {
             for (const arg in obj.argsConfig) {
                 if (obj.argsConfig[arg].type === "string") {