|
@@ -10,6 +10,7 @@ const RequestState = require('./RequestState')
|
|
|
const { MsgsAck, upload, MsgsStateInfo, Pong } = require('../tl').constructors
|
|
|
const MessagePacker = require('../extensions/MessagePacker')
|
|
|
const BinaryReader = require('../extensions/BinaryReader')
|
|
|
+const { UpdateConnectionState } = require("./index");
|
|
|
const { BadMessageError } = require("../errors/Common")
|
|
|
const {
|
|
|
BadServerSalt,
|
|
@@ -45,8 +46,8 @@ const { TypeNotFoundError } = require('../errors/Common')
|
|
|
class MTProtoSender {
|
|
|
static DEFAULT_OPTIONS = {
|
|
|
logger: null,
|
|
|
- retries: 5,
|
|
|
- delay: 1,
|
|
|
+ retries: Infinity,
|
|
|
+ delay: 2000,
|
|
|
autoReconnect: true,
|
|
|
connectTimeout: null,
|
|
|
authKeyCallback: null,
|
|
@@ -145,15 +146,30 @@ class MTProtoSender {
|
|
|
/**
|
|
|
* Connects to the specified given connection using the given auth key.
|
|
|
* @param connection
|
|
|
+ * @param eventDispatch {function}
|
|
|
* @returns {Promise<boolean>}
|
|
|
*/
|
|
|
- async connect(connection) {
|
|
|
+ async connect(connection, eventDispatch=null) {
|
|
|
if (this._user_connected) {
|
|
|
this._log.info('User is already connected!')
|
|
|
return false
|
|
|
}
|
|
|
this._connection = connection
|
|
|
- await this._connect()
|
|
|
+
|
|
|
+ const retries = this._retries
|
|
|
+
|
|
|
+ for (let attempt = 0; attempt < retries; attempt++) {
|
|
|
+ try {
|
|
|
+ await this._connect()
|
|
|
+ break
|
|
|
+ } catch (e) {
|
|
|
+ if (attempt===0 && eventDispatch!==null){
|
|
|
+ eventDispatch({ update: new UpdateConnectionState(-1) })
|
|
|
+ }
|
|
|
+ this._log.error("WebSocket connection failed attempt : "+(attempt+1))
|
|
|
+ await Helpers.sleep(this._delay)
|
|
|
+ }
|
|
|
+ }
|
|
|
return true
|
|
|
}
|
|
|
|
|
@@ -166,6 +182,7 @@ class MTProtoSender {
|
|
|
* all pending requests, and closes the send and receive loops.
|
|
|
*/
|
|
|
async disconnect() {
|
|
|
+
|
|
|
await this._disconnect()
|
|
|
}
|
|
|
|
|
@@ -236,7 +253,7 @@ class MTProtoSender {
|
|
|
* switch to different data centers.
|
|
|
*/
|
|
|
if (this._authKeyCallback) {
|
|
|
- await this._authKeyCallback(this.authKey,this._dcId)
|
|
|
+ await this._authKeyCallback(this.authKey, this._dcId)
|
|
|
}
|
|
|
} else {
|
|
|
this._log.debug('Already have an auth key ...')
|
|
@@ -262,6 +279,9 @@ class MTProtoSender {
|
|
|
this._log.info('Not disconnecting (already have no connection)')
|
|
|
return
|
|
|
}
|
|
|
+ if (this._updateCallback){
|
|
|
+ this._updateCallback(-1)
|
|
|
+ }
|
|
|
this._log.info('Disconnecting from %s...'.replace('%s', this._connection.toString()))
|
|
|
this._user_connected = false
|
|
|
this._log.debug('Closing current connection...')
|
|
@@ -276,6 +296,8 @@ class MTProtoSender {
|
|
|
* @private
|
|
|
*/
|
|
|
async _sendLoop() {
|
|
|
+ this._send_queue = new MessagePacker(this._state, this._log)
|
|
|
+
|
|
|
while (this._user_connected && !this._reconnecting) {
|
|
|
if (this._pending_ack.size) {
|
|
|
const ack = new RequestState(new MsgsAck({ msgIds: Array(...this._pending_ack) }))
|
|
@@ -283,14 +305,14 @@ class MTProtoSender {
|
|
|
this._last_acks.push(ack)
|
|
|
this._pending_ack.clear()
|
|
|
}
|
|
|
- this._log.debug('Waiting for messages to send...')
|
|
|
+ this._log.debug('Waiting for messages to send...'+this._reconnecting)
|
|
|
// TODO Wait for the connection send queue to be empty?
|
|
|
// This means that while it's not empty we can wait for
|
|
|
// more messages to be added to the send queue.
|
|
|
const res = await this._send_queue.get()
|
|
|
|
|
|
if (this._reconnecting) {
|
|
|
- return;
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
if (!res) {
|
|
@@ -338,6 +360,7 @@ class MTProtoSender {
|
|
|
} catch (e) {
|
|
|
// this._log.info('Connection closed while receiving data');
|
|
|
this._log.warn('Connection closed while receiving data')
|
|
|
+ this._startReconnect()
|
|
|
return
|
|
|
}
|
|
|
try {
|
|
@@ -353,14 +376,7 @@ class MTProtoSender {
|
|
|
// A step while decoding had the incorrect data. This message
|
|
|
// should not be considered safe and it should be ignored.
|
|
|
this._log.warn(`Security error while unpacking a received message: ${e}`)
|
|
|
-
|
|
|
- // TODO Reconnecting does not work properly: all subsequent requests hang.
|
|
|
- // this.authKey.key = null
|
|
|
- // if (this._authKeyCallback) {
|
|
|
- // await this._authKeyCallback(null)
|
|
|
- // }
|
|
|
- // this._startReconnect()
|
|
|
- // return
|
|
|
+ continue
|
|
|
} else if (e instanceof InvalidBufferError) {
|
|
|
this._log.info('Broken authorization key; resetting')
|
|
|
await this.authKey.setKey(null)
|
|
@@ -368,11 +384,12 @@ class MTProtoSender {
|
|
|
if (this._authKeyCallback) {
|
|
|
await this._authKeyCallback(null)
|
|
|
}
|
|
|
-
|
|
|
- this._startReconnect()
|
|
|
+ await this.disconnect()
|
|
|
return
|
|
|
} else {
|
|
|
this._log.error('Unhandled error while receiving data')
|
|
|
+ console.log(e)
|
|
|
+ this._startReconnect()
|
|
|
return
|
|
|
}
|
|
|
}
|
|
@@ -600,8 +617,8 @@ class MTProtoSender {
|
|
|
this._state._sequence -= 16
|
|
|
} else {
|
|
|
|
|
|
- for (const state of states){
|
|
|
- state.reject(new BadMessageError(state.request,badMsg.errorCode))
|
|
|
+ for (const state of states) {
|
|
|
+ state.reject(new BadMessageError(state.request, badMsg.errorCode))
|
|
|
}
|
|
|
|
|
|
return
|
|
@@ -733,30 +750,40 @@ class MTProtoSender {
|
|
|
this._reconnecting = true
|
|
|
// TODO Should we set this?
|
|
|
// this._user_connected = false
|
|
|
- await this._reconnect()
|
|
|
+ this._log.info("Started reconnecting")
|
|
|
+ this._reconnect()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
async _reconnect() {
|
|
|
this._log.debug('Closing current connection...')
|
|
|
try {
|
|
|
- await this._connection.disconnect()
|
|
|
+ await this.disconnect()
|
|
|
} catch (err) {
|
|
|
console.warn(err)
|
|
|
}
|
|
|
+ this._send_queue.append(null)
|
|
|
+
|
|
|
this._state.reset()
|
|
|
const retries = this._retries
|
|
|
+
|
|
|
+
|
|
|
for (let attempt = 0; attempt < retries; attempt++) {
|
|
|
try {
|
|
|
await this._connect()
|
|
|
- this._send_queue.extend(Object.values(this._pending_state))
|
|
|
+ // uncomment this if you want to resend
|
|
|
+ //this._send_queue.extend(Object.values(this._pending_state))
|
|
|
this._pending_state = {}
|
|
|
if (this._autoReconnectCallback) {
|
|
|
await this._autoReconnectCallback()
|
|
|
}
|
|
|
+ if (this._updateCallback){
|
|
|
+ this._updateCallback(1)
|
|
|
+ }
|
|
|
+
|
|
|
break
|
|
|
} catch (e) {
|
|
|
- this._log.error(e)
|
|
|
+ this._log.error("WebSocket connection failed attempt : "+(attempt+1))
|
|
|
await Helpers.sleep(this._delay)
|
|
|
}
|
|
|
}
|