소스 검색

Resend past request when reconnecting

painor 4 년 전
부모
커밋
994e82b0fc

+ 2 - 2
gramjs/events/common.ts

@@ -100,14 +100,14 @@ export class EventBuilder {
 
 interface EventCommonInterface {
     chatPeer?: EntityLike,
-    msgId?: bigInt.BigInteger,
+    msgId?: number,
     broadcast?: boolean,
 }
 
 export class EventCommon extends ChatGetter {
     _eventName = "Event";
     _entities: any;
-    _messageId?: bigInt.BigInteger;
+    _messageId?: number;
     originalUpdate: undefined;
 
     constructor({chatPeer = undefined, msgId = undefined, broadcast = undefined}: EventCommonInterface) {

+ 1 - 1
gramjs/extensions/MessagePacker.ts

@@ -31,7 +31,7 @@ export class MessagePacker {
         }
     }
 
-    extend(states: any[]) {
+    extend(states: RequestState[]) {
         for (const state of states) {
             this._queue.push(state)
         }

+ 0 - 1
gramjs/extensions/PromisedNetSockets.ts

@@ -116,7 +116,6 @@ export class PromisedNetSockets {
     async receive() {
         if (this.client) {
             this.client.on('data', async message => {
-
                 const release = await mutex.acquire();
                 try {
                     let data;

+ 0 - 1
gramjs/network/MTProtoPlainSender.ts

@@ -40,7 +40,6 @@ export class MTProtoPlainSender {
         b.writeInt32LE(body.length, 0)
 
         const res = Buffer.concat([Buffer.concat([Buffer.alloc(8), m, b]), body])
-
         await this._connection.send(res)
         body = await this._connection.recv()
         if (body.length < 8) {

+ 15 - 24
gramjs/network/MTProtoSender.ts

@@ -23,7 +23,7 @@ import {RequestState} from "./RequestState";
 import {doAuthentication} from "./Authenticator";
 import  {MTProtoPlainSender} from "./MTProtoPlainSender";
 import {BadMessageError, TypeNotFoundError,InvalidBufferError,SecurityError,RPCMessageToError} from "../errors";
-import  {UpdateConnectionState} from "./";
+import {Connection, UpdateConnectionState} from "./";
 
 interface DEFAULT_OPTIONS {
     logger: any,
@@ -56,7 +56,7 @@ export class MTProtoSender {
         isMainSender: null,
         senderCallback: null,
     };
-    private _connection: any;
+    private _connection?: Connection;
     private _log: any;
     private _dcId: number;
     private _retries: number;
@@ -253,18 +253,9 @@ export class MTProtoSender {
         if (!this._userConnected) {
             throw new Error('Cannot send requests while disconnected')
         }
-        //CONTEST
         const state = new RequestState(request);
         this._sendQueue.append(state);
         return state.promise;
-        /*
-        if (!Helpers.isArrayLike(request)) {
-            const state = new RequestState(request)
-            this._send_queue.append(state)
-            return state.promise
-        } else {
-            throw new Error('not supported')
-        }*/
     }
 
     /**
@@ -276,8 +267,8 @@ export class MTProtoSender {
      */
     async _connect() {
 
-        this._log.info('Connecting to {0}...'.replace('{0}', this._connection.toString()));
-        await this._connection.connect();
+        this._log.info('Connecting to {0}...'.replace('{0}', this._connection!.toString()));
+        await this._connection!.connect();
         this._log.debug('Connection success!');
         //process.exit(0)
         if (!this.authKey.getKey()) {
@@ -303,18 +294,18 @@ export class MTProtoSender {
         }
         this._userConnected = true;
         this._reconnecting = false;
+        this._log.debug('Starting receive loop');
+        this._recvLoopHandle = this._recvLoop();
 
         this._log.debug('Starting send loop');
         this._sendLoopHandle = this._sendLoop();
 
-        this._log.debug('Starting receive loop');
-        this._recvLoopHandle = this._recvLoop();
 
         // _disconnected only completes after manual disconnection
         // or errors after which the sender cannot continue such
         // as failing to reconnect or any unexpected error.
 
-        this._log.info('Connection to %s complete!'.replace('%s', this._connection.toString()))
+        this._log.info('Connection to %s complete!'.replace('%s', this._connection!.toString()))
     }
 
     async _disconnect(error = null) {
@@ -325,10 +316,10 @@ export class MTProtoSender {
         if (this._updateCallback) {
             this._updateCallback(-1)
         }
-        this._log.info('Disconnecting from %s...'.replace('%s', this._connection.toString()));
+        this._log.info('Disconnecting from %s...'.replace('%s', this._connection!.toString()));
         this._userConnected = false;
         this._log.debug('Closing current connection...');
-        await this._connection.disconnect()
+        await this._connection!.disconnect()
     }
 
     /**
@@ -343,6 +334,7 @@ export class MTProtoSender {
 
         while (this._userConnected && !this._reconnecting) {
             if (this._pendingAck.size) {
+
                 const ack = new RequestState(new Api.MsgsAck({msgIds: Array(...this._pendingAck)}));
                 this._sendQueue.append(ack);
                 this._lastAcks.push(ack);
@@ -353,12 +345,13 @@ export class MTProtoSender {
             // This means that while it's not empty we can wait for
             // more messages to be added to the send queue.
             const res = await this._sendQueue.get();
-
             if (this._reconnecting) {
+                this._log.debug('Reconnecting. will stop loop');
                 return
             }
 
             if (!res) {
+                this._log.debug('Empty result. will stop loop');
                 continue
             }
             let data = res.data;
@@ -368,7 +361,7 @@ export class MTProtoSender {
             data = await this._state.encryptMessageData(data);
 
             try {
-                await this._connection.send(data)
+                await this._connection!.send(data)
             } catch (e) {
                 this._log.error(e);
                 this._log.info('Connection closed while sending data');
@@ -399,7 +392,7 @@ export class MTProtoSender {
             // this._log.debug('Receiving items from the network...');
             this._log.debug('Receiving items from the network...');
             try {
-                body = await this._connection.recv()
+                body = await this._connection!.recv()
             } catch (e) {
                 // this._log.info('Connection closed while receiving data');
                 this._log.warn('Connection closed while receiving data');
@@ -556,7 +549,6 @@ export class MTProtoSender {
             return;
         }
         if (RPCResult.error && state.msgId) {
-            // eslint-disable-next-line new-cap
             const error = RPCMessageToError(RPCResult.error, state.request);
             this._sendQueue.append(new RequestState(new Api.MsgsAck({msgIds: [state.msgId]})));
             state.reject(error)
@@ -823,7 +815,6 @@ export class MTProtoSender {
         }
         // @ts-ignore
         this._sendQueue.append(null);
-
         this._state.reset();
         const retries = this._retries;
 
@@ -832,7 +823,7 @@ export class MTProtoSender {
             try {
                 await this._connect();
                 // uncomment this if you want to resend
-                //this._send_queue.extend(Object.values(this._pending_state))
+                this._sendQueue.extend(Array.from(this._pendingState.values()));
                 this._pendingState = new Map<string, RequestState>();
                 if (this._autoReconnectCallback) {
                     await this._autoReconnectCallback()

+ 2 - 0
gramjs/network/RequestState.ts

@@ -1,3 +1,5 @@
+import bigInt from "big-integer";
+
 export class RequestState {
     public containerId: undefined;
     public msgId?: bigInt.BigInteger;

+ 5 - 4
gramjs/network/connection/Connection.ts

@@ -60,10 +60,9 @@ class Connection {
         await this._connect();
         this._connected = true;
 
-        if (!this._sendTask) {
-            this._sendTask = this._sendLoop()
-        }
+        this._sendTask = this._sendLoop()
         this._recvTask = this._recvLoop()
+
     }
 
     async disconnect() {
@@ -74,6 +73,9 @@ class Connection {
 
     async send(data: Buffer) {
         if (!this._connected) {
+            // this will stop the current loop
+            // @ts-ignore
+            await this._sendArray(undefined);
             throw new Error('Not connected')
         }
         await this._sendArray.push(data)
@@ -82,7 +84,6 @@ class Connection {
     async recv() {
         while (this._connected) {
             const result = await this._recvArray.pop();
-
             // undefined = sentinel value = keep trying
             if (result && result.length) {
                 return result

+ 0 - 1
gramjs/tl/api.js

@@ -351,7 +351,6 @@ function createClasses(classesType, params) {
                 if (classesType !== 'request') {
                     throw new Error('`resolve()` called for non-request instance')
                 }
-
                 for (const arg in argsConfig) {
                     if (argsConfig.hasOwnProperty(arg)) {
                         if (!AUTO_CASTS.has(argsConfig[arg].type)) {