Procházet zdrojové kódy

Try to fix reconnects when downloading.

Painor před 8 měsíci
rodič
revize
b0d317c8ba

+ 1 - 1
gramjs/Version.ts

@@ -1 +1 @@
-export const version = "2.23.8";
+export const version = "2.24.8";

+ 1 - 0
gramjs/client/TelegramClient.ts

@@ -1416,6 +1416,7 @@ export class TelegramClient extends TelegramBaseClient {
                 client: this,
                 securityChecks: this._securityChecks,
                 autoReconnectCallback: this._handleReconnect.bind(this),
+                _exportedSenderPromises: this._exportedSenderPromises,
             });
         }
 

+ 11 - 2
gramjs/client/telegramBaseClient.ts

@@ -212,7 +212,7 @@ export abstract class TelegramBaseClient {
         [ReturnType<typeof setTimeout>, Api.TypeUpdate[]]
     >();
     /** @hidden */
-    private _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();
+    public _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();
     /** @hidden */
     private _exportedSenderReleaseTimeouts = new Map<
         number,
@@ -540,7 +540,15 @@ export abstract class TelegramBaseClient {
             dcId,
             setTimeout(() => {
                 this._exportedSenderReleaseTimeouts.delete(dcId);
-                sender.disconnect();
+                if (sender._pendingState.values().length) {
+                    console.log(
+                        "sender already has some hanging states. reconnecting"
+                    );
+                    sender._reconnect();
+                    this._borrowExportedSender(dcId, false, sender);
+                } else {
+                    sender.disconnect();
+                }
             }, EXPORTED_SENDER_RELEASE_TIMEOUT)
         );
 
@@ -561,6 +569,7 @@ export abstract class TelegramBaseClient {
             onConnectionBreak: this._cleanupExportedSender.bind(this),
             client: this as unknown as TelegramClient,
             securityChecks: this._securityChecks,
+            _exportedSenderPromises: this._exportedSenderPromises,
         });
     }
 

+ 2 - 2
gramjs/extensions/MessagePacker.ts

@@ -13,7 +13,7 @@ const USE_INVOKE_AFTER_WITH = new Set([
 
 export class MessagePacker {
     private _state: MTProtoState;
-    private _pendingStates: RequestState[];
+    public _pendingStates: RequestState[];
     private _queue: any[];
     private _ready: Promise<unknown>;
     private setReady: ((value?: any) => void) | undefined;
@@ -79,7 +79,7 @@ export class MessagePacker {
             this._pendingStates.push(state);
             state
                 .promise! // Using finally causes triggering `unhandledrejection` event
-                .catch(() => {})
+                .catch((err) => {})
                 .finally(() => {
                     this._pendingStates = this._pendingStates.filter(
                         (s) => s !== state

+ 47 - 17
gramjs/network/MTProtoSender.ts

@@ -53,6 +53,7 @@ interface DEFAULT_OPTIONS {
     client: TelegramClient;
     onConnectionBreak?: CallableFunction;
     securityChecks: boolean;
+    _exportedSenderPromises: Map<number, Promise<MTProtoSender>>;
 }
 
 export class MTProtoSender {
@@ -94,7 +95,7 @@ export class MTProtoSender {
     readonly authKey: AuthKey;
     private readonly _state: MTProtoState;
     private _sendQueue: MessagePacker;
-    private _pendingState: PendingState;
+    _pendingState: PendingState;
     private readonly _pendingAck: Set<any>;
     private readonly _lastAcks: any[];
     private readonly _handlers: any;
@@ -108,6 +109,7 @@ export class MTProtoSender {
     private _cancelSend: boolean;
     cancellableRecvLoopPromise?: CancellablePromise<any>;
     private _finishedConnecting: boolean;
+    private _exportedSenderPromises = new Map<number, Promise<MTProtoSender>>();
 
     /**
      * @param authKey
@@ -137,7 +139,7 @@ export class MTProtoSender {
         this._securityChecks = args.securityChecks;
 
         this._connectMutex = new Mutex();
-
+        this._exportedSenderPromises = args._exportedSenderPromises;
         /**
          * whether we disconnected ourself or telegram did it.
          */
@@ -404,6 +406,7 @@ export class MTProtoSender {
             "Connection to %s complete!".replace("%s", connection.toString())
         );
     }
+
     async _disconnect() {
         const connection = this._connection;
         if (this._updateCallback) {
@@ -494,17 +497,6 @@ export class MTProtoSender {
             );
 
             data = await this._state.encryptMessageData(data);
-
-            try {
-                await this._connection!.send(data);
-            } catch (e) {
-                this._log.debug(`Connection closed while sending data ${e}`);
-                if (this._log.canSend(LogLevel.DEBUG)) {
-                    console.error(e);
-                }
-                this._sendLoopHandle = undefined;
-                return;
-            }
             for (const state of batch) {
                 if (!Array.isArray(state)) {
                     if (state.request.classType === "request") {
@@ -518,6 +510,17 @@ export class MTProtoSender {
                     }
                 }
             }
+            try {
+                await this._connection!.send(data);
+            } catch (e) {
+                this._log.debug(`Connection closed while sending data ${e}`);
+                if (this._log.canSend(LogLevel.DEBUG)) {
+                    console.error(e);
+                }
+                this._sendLoopHandle = undefined;
+                return;
+            }
+
             this._log.debug("Encrypted messages put in a queue to be sent");
         }
 
@@ -639,6 +642,7 @@ export class MTProtoSender {
             this._onConnectionBreak(this._dcId);
         }
     }
+
     /**
      * Adds the given message to the list of messages that must be
      * acknowledged and dispatches control to different ``_handle_*``
@@ -696,6 +700,7 @@ export class MTProtoSender {
 
         return [];
     }
+
     /**
      * Handles the result for Remote Procedure Calls:
      * rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
@@ -744,9 +749,7 @@ export class MTProtoSender {
             try {
                 const reader = new BinaryReader(result.body);
                 const read = state.request.readResult(reader);
-                this._log.debug(
-                    `Handling RPC result ${read?.constructor?.name}`
-                );
+                this._log.debug(`Handling RPC result ${read?.className}`);
                 state.resolve(read);
             } catch (err) {
                 state.reject(err);
@@ -968,9 +971,26 @@ export class MTProtoSender {
      * @private
      */
     async _handleMsgAll(message: TLMessage) {}
+
     reconnect() {
         if (this._userConnected && !this.isReconnecting) {
             this.isReconnecting = true;
+            if (this._isMainSender) {
+                this._log.debug("Reconnecting all senders");
+                for (const promise of this._exportedSenderPromises.values()) {
+                    promise
+                        .then((sender) => {
+                            if (sender && !sender._isMainSender) {
+                                sender.reconnect();
+                            }
+                        })
+                        .catch((error) => {
+                            this._log.warn(
+                                "Error getting sender to reconnect to"
+                            );
+                        });
+                }
+            }
             // we want to wait a second between each reconnect try to not flood the server with reconnects
             // in case of internal server issues.
             sleep(1000).then(() => {
@@ -981,7 +1001,6 @@ export class MTProtoSender {
     }
 
     async _reconnect() {
-        this._log.debug("Closing current connection...");
         try {
             this._log.warn("[Reconnect] Closing current connection...");
             await this._disconnect();
@@ -994,6 +1013,17 @@ export class MTProtoSender {
                 console.error(err);
             }
         }
+        this._log.debug(
+            `Adding ${this._sendQueue._pendingStates.length} old request to resend`
+        );
+        for (let i = 0; i < this._sendQueue._pendingStates.length; i++) {
+            if (this._sendQueue._pendingStates[i].msgId != undefined) {
+                this._pendingState.set(
+                    this._sendQueue._pendingStates[i].msgId!,
+                    this._sendQueue._pendingStates[i]
+                );
+            }
+        }
 
         this._sendQueue.clear();
         this._state.reset();

+ 1 - 1
gramjs/network/connection/TCPFull.ts

@@ -44,7 +44,7 @@ export class FullPacketCodec extends PacketCodec {
             // # It has been observed that the length and seq can be -429,
             // # followed by the body of 4 bytes also being -429.
             // # See https://github.com/LonamiWebs/Telethon/issues/4042.
-            const body = await reader.readExactly(4)
+            const body = await reader.readExactly(4);
             throw new InvalidBufferError(body);
         }
         let body = await reader.readExactly(packetLen - 8);

+ 3 - 1
gramjs/sessions/StoreSession.ts

@@ -10,7 +10,9 @@ export class StoreSession extends MemorySession {
     constructor(sessionName: string, divider = ":") {
         super();
         if (sessionName === "session") {
-            throw new Error("Session name can't be 'session'. Please use a different name.");
+            throw new Error(
+                "Session name can't be 'session'. Please use a different name."
+            );
         }
         if (typeof localStorage === "undefined" || localStorage === null) {
             const LocalStorage = require("./localStorage").LocalStorage;

+ 2 - 2
package-lock.json

@@ -1,12 +1,12 @@
 {
   "name": "telegram",
-  "version": "2.23.8",
+  "version": "2.24.9",
   "lockfileVersion": 2,
   "requires": true,
   "packages": {
     "": {
       "name": "telegram",
-      "version": "2.23.8",
+      "version": "2.24.9",
       "license": "MIT",
       "dependencies": {
         "@cryptography/aes": "^0.1.1",

+ 2 - 2
package.json

@@ -1,6 +1,6 @@
 {
   "name": "telegram",
-  "version": "2.23.8",
+  "version": "2.24.9",
   "description": "NodeJS/Browser MTProto API Telegram client library,",
   "main": "index.js",
   "types": "index.d.ts",
@@ -70,4 +70,4 @@
     "node-localstorage": "^2.2.1",
     "socks": "^2.6.2"
   }
-}
+}