소스 검색

Add DC switching
Fix Session saving bugs
Add more logging

painor 5 년 전
부모
커밋
fd9e6614ce

+ 0 - 2
examples/main.js

@@ -18,6 +18,4 @@ console.log(message.bytes.toString('hex'));
     await client.connect()
     await client.connect()
 
 
     console.log('You should now be connected.')
     console.log('You should now be connected.')
-    await client.disconnect()
-    await client.connect()
 })()
 })()

+ 15 - 13
gramjs/client/TelegramClient.js

@@ -99,6 +99,13 @@ class TelegramClient {
         this._config = null
         this._config = null
         this._sender = new MTProtoSender(this.session.authKey, {
         this._sender = new MTProtoSender(this.session.authKey, {
             logger: this._log,
             logger: this._log,
+            retries: this._connectionRetries,
+            delay: this._retryDelay,
+            autoReconnect: this._autoReconnect,
+            connectTimeout: this._timeout,
+            authKeyCallback: this._authKeyCallback.bind(this),
+            updateCallback: this._handleUpdate.bind(this),
+
         })
         })
         this.phoneCodeHashes = []
         this.phoneCodeHashes = []
     }
     }
@@ -113,13 +120,14 @@ class TelegramClient {
      * @returns {Promise<void>}
      * @returns {Promise<void>}
      */
      */
     async connect() {
     async connect() {
-
         const connection = new this._connection(this.session.serverAddress
         const connection = new this._connection(this.session.serverAddress
             , this.session.port, this.session.dcId, this._log)
             , this.session.port, this.session.dcId, this._log)
         if (!await this._sender.connect(connection)) {
         if (!await this._sender.connect(connection)) {
+            console.log('already connected returning')
             return
             return
         }
         }
         this.session.authKey = this._sender.authKey
         this.session.authKey = this._sender.authKey
+        console.log('auth key is ', this.session.authKey)
         await this.session.save()
         await this.session.save()
         await this._sender.send(this._initWith(
         await this._sender.send(this._initWith(
             new GetConfigRequest(),
             new GetConfigRequest(),
@@ -141,21 +149,18 @@ class TelegramClient {
     async _switchDC(newDc) {
     async _switchDC(newDc) {
         this._log.info(`Reconnecting to new data center ${newDc}`)
         this._log.info(`Reconnecting to new data center ${newDc}`)
         const DC = await this._getDC(newDc)
         const DC = await this._getDC(newDc)
-        console.log('dc is ?????????')
         this.session.setDC(DC.id, DC.ipAddress, DC.port)
         this.session.setDC(DC.id, DC.ipAddress, DC.port)
-        console.log('the dc is ', DC)
         // authKey's are associated with a server, which has now changed
         // authKey's are associated with a server, which has now changed
         // so it's not valid anymore. Set to None to force recreating it.
         // so it's not valid anymore. Set to None to force recreating it.
         this._sender.authKey.key = null
         this._sender.authKey.key = null
         this.session.authKey = null
         this.session.authKey = null
         await this.session.save()
         await this.session.save()
         await this.disconnect()
         await this.disconnect()
-        console.log('hayyyyyyyyyy')
         return await this.connect()
         return await this.connect()
     }
     }
 
 
     async _authKeyCallback(authKey) {
     async _authKeyCallback(authKey) {
-        this.session.auth_key = authKey
+        this.session.authKey = authKey
         await this.session.save()
         await this.session.save()
     }
     }
 
 
@@ -164,11 +169,10 @@ class TelegramClient {
     // region Working with different connections/Data Centers
     // region Working with different connections/Data Centers
 
 
     async _getDC(dcId, cdn = false) {
     async _getDC(dcId, cdn = false) {
-        console.log('hi dc ?')
         if (!this._config) {
         if (!this._config) {
             this._config = await this.invoke(new functions.help.GetConfigRequest())
             this._config = await this.invoke(new functions.help.GetConfigRequest())
+
         }
         }
-        console.log('h')
         if (cdn && !this._cdnConfig) {
         if (cdn && !this._cdnConfig) {
             this._cdnConfig = await this.invoke(new functions.help.GetCdnConfigRequest())
             this._cdnConfig = await this.invoke(new functions.help.GetCdnConfigRequest())
             for (const pk of this._cdnConfig.publicKeys) {
             for (const pk of this._cdnConfig.publicKeys) {
@@ -177,8 +181,7 @@ class TelegramClient {
         }
         }
         console.log('ok')
         console.log('ok')
         for (const DC of this._config.dcOptions) {
         for (const DC of this._config.dcOptions) {
-            console.log(DC)
-            if (DC.id === dcId && DC.ipv6 === this._useIPV6 && DC.cdn === cdn) {
+            if (DC.id === dcId && Boolean(DC.ipv6) === this._useIPV6 && Boolean(DC.cdn) === cdn) {
                 return DC
                 return DC
             }
             }
         }
         }
@@ -215,13 +218,12 @@ class TelegramClient {
         }
         }
         this._last_request = new Date().getTime()
         this._last_request = new Date().getTime()
         let attempt = 0
         let attempt = 0
+        console.log('request retries is ,', this._requestRetries)
         for (attempt = 0; attempt < this._requestRetries; attempt++) {
         for (attempt = 0; attempt < this._requestRetries; attempt++) {
             try {
             try {
-                console.log('sending promise')
+                console.log('curernt attepmt is ', attempt)
                 const promise = this._sender.send(request)
                 const promise = this._sender.send(request)
-                console.log(promise)
                 const result = await promise
                 const result = await promise
-                console.log('the res is : ', result)
                 this.session.processEntities(result)
                 this.session.processEntities(result)
                 this._entityCache.add(result)
                 this._entityCache.add(result)
                 return result
                 return result
@@ -245,6 +247,7 @@ class TelegramClient {
                     if (shouldRaise && await this.isUserAuthorized()) {
                     if (shouldRaise && await this.isUserAuthorized()) {
                         throw e
                         throw e
                     }
                     }
+                    await Helpers.sleep(1000)
                     await this._switchDC(e.newDc)
                     await this._switchDC(e.newDc)
                 } else {
                 } else {
                     throw e
                     throw e
@@ -374,7 +377,6 @@ class TelegramClient {
                 this._processUpdate(u, update.updates)
                 this._processUpdate(u, update.updates)
             }
             }
         } else if (update instanceof types.UpdateShort) {
         } else if (update instanceof types.UpdateShort) {
-
             this._processUpdate(update.update, null)
             this._processUpdate(update.update, null)
         } else {
         } else {
             this._processUpdate(update, null)
             this._processUpdate(update, null)

+ 3 - 2
gramjs/errors/Common.js

@@ -45,7 +45,8 @@ class InvalidChecksumError extends Error {
  */
  */
 class InvalidBufferError extends Error {
 class InvalidBufferError extends Error {
     constructor(payload) {
     constructor(payload) {
-        const [code] = -struct.unpack('<i', payload)
+        console.log('payload is ', payload.toString('hex'))
+        const code = -(struct.unpack('<i', payload)[0])
         if (payload.length === 4) {
         if (payload.length === 4) {
             super(`Invalid response buffer (HTTP code ${code})`)
             super(`Invalid response buffer (HTTP code ${code})`)
         } else {
         } else {
@@ -130,7 +131,7 @@ class BadMessageError extends Error {
             'the correct salt, and the message is to be re-sent with it).',
             'the correct salt, and the message is to be re-sent with it).',
 
 
         64: 'Invalid container.',
         64: 'Invalid container.',
-    };
+    }
 
 
     constructor(code) {
     constructor(code) {
         super(BadMessageError.ErrorMessages[code] || `Unknown error code (this should not happen): ${code}.`)
         super(BadMessageError.ErrorMessages[code] || `Unknown error code (this should not happen): ${code}.`)

+ 7 - 3
gramjs/extensions/MessagePacker.js

@@ -14,6 +14,10 @@ class MessagePacker {
         this._log = logger
         this._log = logger
     }
     }
 
 
+    values() {
+        return this._queue
+    }
+
     append(state) {
     append(state) {
         this._queue.push(state)
         this._queue.push(state)
         this.setReady(true)
         this.setReady(true)
@@ -49,7 +53,7 @@ class MessagePacker {
                 }
                 }
                 state.msgId = await this._state.writeDataAsMessage(
                 state.msgId = await this._state.writeDataAsMessage(
                     buffer, state.data, state.request instanceof TLRequest,
                     buffer, state.data, state.request instanceof TLRequest,
-                    afterId
+                    afterId,
                 )
                 )
 
 
                 this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.constructor.name}`)
                 this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.constructor.name}`)
@@ -70,11 +74,11 @@ class MessagePacker {
         }
         }
         if (batch.length > 1) {
         if (batch.length > 1) {
             data = Buffer.concat([struct.pack(
             data = Buffer.concat([struct.pack(
-                '<Ii', MessageContainer.CONSTRUCTOR_ID, batch.length
+                '<Ii', MessageContainer.CONSTRUCTOR_ID, batch.length,
             ), buffer.getValue()])
             ), buffer.getValue()])
 
 
             const containerId = await this._state.writeDataAsMessage(
             const containerId = await this._state.writeDataAsMessage(
-                buffer, data, false
+                buffer, data, false,
             )
             )
             for (const s of batch) {
             for (const s of batch) {
                 s.containerId = containerId
                 s.containerId = containerId

+ 34 - 2
gramjs/network/MTProtoSender.js

@@ -48,7 +48,6 @@ format.extend(String.prototype, {})
  * key exists yet.
  * key exists yet.
  */
  */
 class MTProtoSender {
 class MTProtoSender {
-
     static DEFAULT_OPTIONS = {
     static DEFAULT_OPTIONS = {
         logger: null,
         logger: null,
         retries: 5,
         retries: 5,
@@ -225,7 +224,7 @@ class MTProtoSender {
         await this._connection.connect()
         await this._connection.connect()
         this._log.debug('Connection success!')
         this._log.debug('Connection success!')
         if (!this.authKey._key) {
         if (!this.authKey._key) {
-            const plain = new MtProtoPlainSender(this._connection, this._loggers)
+            const plain = new MtProtoPlainSender(this._connection, this._log)
             this._log.debug('New auth_key attempt ...')
             this._log.debug('New auth_key attempt ...')
             const res = await doAuthentication(plain, this._log)
             const res = await doAuthentication(plain, this._log)
             this._log.debug('Generated new auth_key successfully')
             this._log.debug('Generated new auth_key successfully')
@@ -348,6 +347,8 @@ class MTProtoSender {
                     if (this._authKeyCallback) {
                     if (this._authKeyCallback) {
                         this._authKeyCallback(null)
                         this._authKeyCallback(null)
                     }
                     }
+                    this._startReconnect(e)
+
                     return
                     return
                 } else {
                 } else {
                     this._log.error('Unhandled error while receiving data')
                     this._log.error('Unhandled error while receiving data')
@@ -706,6 +707,37 @@ class MTProtoSender {
      */
      */
     async _handleMsgAll(message) {
     async _handleMsgAll(message) {
     }
     }
+
+    _startReconnect(e) {
+        if (this._user_connected && !this._reconnecting) {
+            this._reconnecting = true
+            this._reconnect(e)
+
+        }
+
+    }
+
+    async _reconnect(e) {
+        this._log.debug('Closing current connection...')
+        await this._connection.disconnect()
+        this._reconnecting = false
+        this._state.reset()
+        const retries = this._retries
+        for (let attempt = 0; attempt < retries; attempt++) {
+            try {
+                await this._connect()
+                this._send_queue.extend(Object.values(this._pending_state))
+                this._pending_state = {}
+                if (this._autoReconnectCallback) {
+                    this._autoReconnectCallback()
+                }
+                break
+            } catch (e) {
+                this._log.error(e)
+                await Helpers.sleep(this._delay)
+            }
+        }
+    }
 }
 }
 
 
 module.exports = MTProtoSender
 module.exports = MTProtoSender

+ 1 - 0
gramjs/network/connection/Connection.js

@@ -98,6 +98,7 @@ class Connection {
             } catch (e) {
             } catch (e) {
                 console.log(e)
                 console.log(e)
                 this._log.info('an error occured')
                 this._log.info('an error occured')
+                return
             }
             }
             await this._recvArray.push(data)
             await this._recvArray.push(data)
         }
         }

+ 6 - 4
gramjs/sessions/Session.js

@@ -15,7 +15,7 @@ class Session {
     constructor(sessionUserId) {
     constructor(sessionUserId) {
         this.sessionUserId = sessionUserId
         this.sessionUserId = sessionUserId
         this._serverAddress = null
         this._serverAddress = null
-        this.__dcId = 0
+        this._dcId = 0
         this._port = null
         this._port = null
         // this.serverAddress = "localhost";
         // this.serverAddress = "localhost";
         // this.port = 21;
         // this.port = 21;
@@ -40,7 +40,7 @@ class Session {
                     return value
                     return value
                 }
                 }
             })
             })
-
+            console.log('current auth key will be ', this.authKey)
             await fs.writeFile(`${this.sessionUserId}.session`, str)
             await fs.writeFile(`${this.sessionUserId}.session`, str)
         }
         }
     }
     }
@@ -82,8 +82,10 @@ class Session {
 
 
                 const authKey = new AuthKey(Buffer.from(ob.authKey._key.data))
                 const authKey = new AuthKey(Buffer.from(ob.authKey._key.data))
                 const session = new Session(ob.sessionUserId)
                 const session = new Session(ob.sessionUserId)
-                session._serverAddress = ob.serverAddress
-                session._port = ob.port
+                session._serverAddress = ob._serverAddress
+                session._port = ob._port
+                session._dcId = ob._dcId
+
                 // this.serverAddress = "localhost";
                 // this.serverAddress = "localhost";
                 // this.port = 21;
                 // this.port = 21;
                 session.authKey = authKey
                 session.authKey = authKey