|
@@ -4,8 +4,6 @@ const Helpers = require("../utils/Helpers");
|
|
const {MsgsAck} = require("../tl/types");
|
|
const {MsgsAck} = require("../tl/types");
|
|
const AuthKey = require("../crypto/AuthKey");
|
|
const AuthKey = require("../crypto/AuthKey");
|
|
const doAuthentication = require("./Authenticator");
|
|
const doAuthentication = require("./Authenticator");
|
|
-const AES = require("../crypto/AES");
|
|
|
|
-const {RPCError} = require("../errors/RPCBaseErrors");
|
|
|
|
const RPCResult = require("../tl/core/RPCResult");
|
|
const RPCResult = require("../tl/core/RPCResult");
|
|
const MessageContainer = require("../tl/core/MessageContainer");
|
|
const MessageContainer = require("../tl/core/MessageContainer");
|
|
const GZIPPacked = require("../tl/core/GZIPPacked");
|
|
const GZIPPacked = require("../tl/core/GZIPPacked");
|
|
@@ -13,9 +11,6 @@ const TLMessage = require("../tl/core/TLMessage");
|
|
const RequestState = require("./RequestState");
|
|
const RequestState = require("./RequestState");
|
|
const format = require('string-format');
|
|
const format = require('string-format');
|
|
const {TypeNotFoundError} = require("../errors");
|
|
const {TypeNotFoundError} = require("../errors");
|
|
-const {BadMessageError} = require("../errors");
|
|
|
|
-const {InvalidDCError} = require("../errors");
|
|
|
|
-const {gzip, ungzip} = require('node-gzip');
|
|
|
|
const MessagePacker = require("../extensions/MessagePacker");
|
|
const MessagePacker = require("../extensions/MessagePacker");
|
|
const Pong = require("../tl/core/GZIPPacked");
|
|
const Pong = require("../tl/core/GZIPPacked");
|
|
const BadServerSalt = require("../tl/core/GZIPPacked");
|
|
const BadServerSalt = require("../tl/core/GZIPPacked");
|
|
@@ -27,6 +22,9 @@ const FutureSalts = require("../tl/core/GZIPPacked");
|
|
const MsgsStateReq = require("../tl/core/GZIPPacked");
|
|
const MsgsStateReq = require("../tl/core/GZIPPacked");
|
|
const MsgResendReq = require("../tl/core/GZIPPacked");
|
|
const MsgResendReq = require("../tl/core/GZIPPacked");
|
|
const MsgsAllInfo = require("../tl/core/GZIPPacked");
|
|
const MsgsAllInfo = require("../tl/core/GZIPPacked");
|
|
|
|
+const {SecurityError} = require("../errors/Common");
|
|
|
|
+const {InvalidBufferError} = require("../errors/Common");
|
|
|
|
+const {LogOutRequest} = require("../tl/functions/auth");
|
|
//const {tlobjects} = require("../gramjs/tl/alltlobjects");
|
|
//const {tlobjects} = require("../gramjs/tl/alltlobjects");
|
|
format.extend(String.prototype, {});
|
|
format.extend(String.prototype, {});
|
|
|
|
|
|
@@ -50,7 +48,7 @@ class MTProtoSender {
|
|
* @param opt
|
|
* @param opt
|
|
*/
|
|
*/
|
|
constructor(authKey, opt = {
|
|
constructor(authKey, opt = {
|
|
- loggers: null,
|
|
|
|
|
|
+ logger: null,
|
|
retries: 5,
|
|
retries: 5,
|
|
delay: 1,
|
|
delay: 1,
|
|
autoReconnect: true,
|
|
autoReconnect: true,
|
|
@@ -60,8 +58,7 @@ class MTProtoSender {
|
|
autoReconnectCallback: null
|
|
autoReconnectCallback: null
|
|
}) {
|
|
}) {
|
|
this._connection = null;
|
|
this._connection = null;
|
|
- this._loggers = opt.loggers;
|
|
|
|
- this._log = opt.loggers;
|
|
|
|
|
|
+ this._log = opt.logger;
|
|
this._retries = opt.retries;
|
|
this._retries = opt.retries;
|
|
this._delay = opt.delay;
|
|
this._delay = opt.delay;
|
|
this._autoReconnect = opt.autoReconnect;
|
|
this._autoReconnect = opt.autoReconnect;
|
|
@@ -99,7 +96,7 @@ class MTProtoSender {
|
|
* Note that here we're also storing their ``_RequestState``.
|
|
* Note that here we're also storing their ``_RequestState``.
|
|
*/
|
|
*/
|
|
this._send_queue = new MessagePacker(this._state,
|
|
this._send_queue = new MessagePacker(this._state,
|
|
- this._loggers);
|
|
|
|
|
|
+ this._log);
|
|
|
|
|
|
/**
|
|
/**
|
|
* Sent states are remembered until a response is received.
|
|
* Sent states are remembered until a response is received.
|
|
@@ -154,11 +151,8 @@ class MTProtoSender {
|
|
this._log.info('User is already connected!');
|
|
this._log.info('User is already connected!');
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
- console.log("connecting sender");
|
|
|
|
this._connection = connection;
|
|
this._connection = connection;
|
|
await this._connect();
|
|
await this._connect();
|
|
- console.log("finished connecting sender");
|
|
|
|
- this._user_connected = true;
|
|
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -207,7 +201,7 @@ class MTProtoSender {
|
|
if (!Helpers.isArrayLike(request)) {
|
|
if (!Helpers.isArrayLike(request)) {
|
|
let state = new RequestState(request);
|
|
let state = new RequestState(request);
|
|
this._send_queue.append(state);
|
|
this._send_queue.append(state);
|
|
- return state;
|
|
|
|
|
|
+ return state.promise;
|
|
} else {
|
|
} else {
|
|
throw new Error("not supported");
|
|
throw new Error("not supported");
|
|
}
|
|
}
|
|
@@ -221,14 +215,12 @@ class MTProtoSender {
|
|
* @private
|
|
* @private
|
|
*/
|
|
*/
|
|
async _connect() {
|
|
async _connect() {
|
|
- //this._log.info('Connecting to {0}...'.replace("{0}", this._connection));
|
|
|
|
|
|
+ this._log.info('Connecting to {0}...'.replace("{0}", this._connection));
|
|
await this._connection.connect();
|
|
await this._connection.connect();
|
|
- console.log("Connection success");
|
|
|
|
- //this._log.debug("Connection success!");
|
|
|
|
- console.log("auth key is ", this.authKey);
|
|
|
|
|
|
+ this._log.debug("Connection success!");
|
|
if (!this.authKey._key) {
|
|
if (!this.authKey._key) {
|
|
- console.log("creating authKey");
|
|
|
|
let plain = new MtProtoPlainSender(this._connection, this._loggers);
|
|
let plain = new MtProtoPlainSender(this._connection, this._loggers);
|
|
|
|
+ this._log.debug('New auth_key attempt ...');
|
|
let res = await doAuthentication(plain);
|
|
let res = await doAuthentication(plain);
|
|
this.authKey.key = res.authKey;
|
|
this.authKey.key = res.authKey;
|
|
this._state.time_offset = res.timeOffset;
|
|
this._state.time_offset = res.timeOffset;
|
|
@@ -244,23 +236,27 @@ class MTProtoSender {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ } else {
|
|
|
|
+ this._log.debug('Already have an auth key ...');
|
|
}
|
|
}
|
|
- //this._log.debug('Starting send loop');
|
|
|
|
|
|
+ this._user_connected = true;
|
|
|
|
+
|
|
|
|
+ this._log.debug('Starting send loop');
|
|
this._send_loop_handle = this._send_loop();
|
|
this._send_loop_handle = this._send_loop();
|
|
|
|
|
|
- //this._log.debug('Starting receive loop');
|
|
|
|
|
|
+ this._log.debug('Starting receive loop');
|
|
this._recv_loop_handle = this._recv_loop();
|
|
this._recv_loop_handle = this._recv_loop();
|
|
|
|
|
|
// _disconnected only completes after manual disconnection
|
|
// _disconnected only completes after manual disconnection
|
|
// or errors after which the sender cannot continue such
|
|
// or errors after which the sender cannot continue such
|
|
// as failing to reconnect or any unexpected error.
|
|
// as failing to reconnect or any unexpected error.
|
|
|
|
|
|
- //this._log.info('Connection to %s complete!', this._connection)
|
|
|
|
|
|
+ this._log.info('Connection to %s complete!', this._connection)
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- async _disconnected(error = null) {
|
|
|
|
|
|
+ async _disconnect(error = null) {
|
|
if (this._connection === null) {
|
|
if (this._connection === null) {
|
|
this._log.info('Not disconnecting (already have no connection)');
|
|
this._log.info('Not disconnecting (already have no connection)');
|
|
return
|
|
return
|
|
@@ -281,21 +277,24 @@ class MTProtoSender {
|
|
*/
|
|
*/
|
|
async _send_loop() {
|
|
async _send_loop() {
|
|
while (this._user_connected && !this._reconnecting) {
|
|
while (this._user_connected && !this._reconnecting) {
|
|
- if (this._pending_ack) {
|
|
|
|
- let ack = new RequestState(new MsgsAck(Array(this._pending_ack)));
|
|
|
|
|
|
+ if (this._pending_ack.size) {
|
|
|
|
+ let ack = new RequestState(new MsgsAck({msgIds: Array(this._pending_ack)}));
|
|
this._send_queue.append(ack);
|
|
this._send_queue.append(ack);
|
|
this._last_acks.push(ack);
|
|
this._last_acks.push(ack);
|
|
this._pending_ack.clear()
|
|
this._pending_ack.clear()
|
|
}
|
|
}
|
|
- this._log.debug('Waiting for messages to send...');
|
|
|
|
|
|
+ //this._log.debug('Waiting for messages to send...');
|
|
|
|
+ this._log.debug("Waiting for messages to send...");
|
|
// TODO Wait for the connection send queue to be empty?
|
|
// TODO Wait for the connection send queue to be empty?
|
|
// This means that while it's not empty we can wait for
|
|
// This means that while it's not empty we can wait for
|
|
// more messages to be added to the send queue.
|
|
// more messages to be added to the send queue.
|
|
- let {batch, data} = await this._send_queue.get();
|
|
|
|
-
|
|
|
|
- if (!data) {
|
|
|
|
|
|
+ let res = await this._send_queue.get();
|
|
|
|
+ if (!res) {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
+ let data = res.data;
|
|
|
|
+ let batch = res.batch;
|
|
|
|
+ //this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
|
|
this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
|
|
this._log.debug(`Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`);
|
|
|
|
|
|
data = this._state.encryptMessageData(data);
|
|
data = this._state.encryptMessageData(data);
|
|
@@ -306,6 +305,10 @@ class MTProtoSender {
|
|
this._log.info('Connection closed while sending data');
|
|
this._log.info('Connection closed while sending data');
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
+ for (let state of batch) {
|
|
|
|
+ this._pending_state[state.msgId] = state;
|
|
|
|
+ }
|
|
|
|
+ this._log.debug('Encrypted messages put in a queue to be sent');
|
|
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -316,10 +319,12 @@ class MTProtoSender {
|
|
|
|
|
|
while (this._user_connected && !this._reconnecting) {
|
|
while (this._user_connected && !this._reconnecting) {
|
|
//this._log.debug('Receiving items from the network...');
|
|
//this._log.debug('Receiving items from the network...');
|
|
|
|
+ this._log.debug('Receiving items from the network...');
|
|
try {
|
|
try {
|
|
body = await this._connection.recv();
|
|
body = await this._connection.recv();
|
|
} catch (e) {
|
|
} catch (e) {
|
|
//this._log.info('Connection closed while receiving data');
|
|
//this._log.info('Connection closed while receiving data');
|
|
|
|
+ this._log.debug('Connection closed while receiving data');
|
|
return
|
|
return
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
@@ -329,28 +334,29 @@ class MTProtoSender {
|
|
|
|
|
|
if (e instanceof TypeNotFoundError) {
|
|
if (e instanceof TypeNotFoundError) {
|
|
// Received object which we don't know how to deserialize
|
|
// Received object which we don't know how to deserialize
|
|
- //this._log.info(`Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`);
|
|
|
|
|
|
+ this._log.info(`Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`);
|
|
|
|
+ continue
|
|
} else if (e instanceof SecurityError) {
|
|
} else if (e instanceof SecurityError) {
|
|
// A step while decoding had the incorrect data. This message
|
|
// A step while decoding had the incorrect data. This message
|
|
// should not be considered safe and it should be ignored.
|
|
// should not be considered safe and it should be ignored.
|
|
- //this._log.warning(`Security error while unpacking a received message: ${e}`);
|
|
|
|
|
|
+ this._log.warning(`Security error while unpacking a received message: ${e}`);
|
|
continue
|
|
continue
|
|
} else if (e instanceof InvalidBufferError) {
|
|
} else if (e instanceof InvalidBufferError) {
|
|
- //this._log.info('Broken authorization key; resetting');
|
|
|
|
|
|
+ this._log.info('Broken authorization key; resetting');
|
|
this.authKey.key = null;
|
|
this.authKey.key = null;
|
|
if (this._authKeyCallback) {
|
|
if (this._authKeyCallback) {
|
|
this._authKeyCallback(null)
|
|
this._authKeyCallback(null)
|
|
}
|
|
}
|
|
return
|
|
return
|
|
} else {
|
|
} else {
|
|
- //this._log.exception('Unhandled error while receiving data');
|
|
|
|
|
|
+ this._log.exception('Unhandled error while receiving data');
|
|
return
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
await this._processMessage(message)
|
|
await this._processMessage(message)
|
|
} catch (e) {
|
|
} catch (e) {
|
|
- //this._log.exception('Unhandled error while receiving data');
|
|
|
|
|
|
+ this._log.exception('Unhandled error while receiving data');
|
|
console.log(e);
|
|
console.log(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -369,7 +375,11 @@ class MTProtoSender {
|
|
*/
|
|
*/
|
|
async _processMessage(message) {
|
|
async _processMessage(message) {
|
|
this._pending_ack.add(message.msgId);
|
|
this._pending_ack.add(message.msgId);
|
|
- let handler = this._handlers.get(message.obj.CONSTRUCTOR_ID, this.handleUpdate);
|
|
|
|
|
|
+ let handler = this._handlers.get(message.obj.CONSTRUCTOR_ID);
|
|
|
|
+ if (!handler) {
|
|
|
|
+ handler = this._handleUpdate
|
|
|
|
+ }
|
|
|
|
+
|
|
await handler(message);
|
|
await handler(message);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -449,7 +459,7 @@ class MTProtoSender {
|
|
);
|
|
);
|
|
} else {
|
|
} else {
|
|
let reader = new BinaryReader(RPCResult.body);
|
|
let reader = new BinaryReader(RPCResult.body);
|
|
- let result = state.request.readResult(reader);
|
|
|
|
|
|
+ state.resolve(state.request.readResult(reader));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -485,7 +495,7 @@ class MTProtoSender {
|
|
|
|
|
|
async _handleUpdate(message) {
|
|
async _handleUpdate(message) {
|
|
if (message.obj.SUBCLASS_OF_ID !== 0x8af52aac) { // crc32(b'Updates')
|
|
if (message.obj.SUBCLASS_OF_ID !== 0x8af52aac) { // crc32(b'Updates')
|
|
- this._log.warning(`Note: %s is not an update, not dispatching it ${message.obj}`);
|
|
|
|
|
|
+ this._log.warning(`Note: ${message.obj} is not an update, not dispatching it ${message.obj}`);
|
|
return
|
|
return
|
|
}
|
|
}
|
|
this._log.debug('Handling update %s', message.obj.constructor.name);
|
|
this._log.debug('Handling update %s', message.obj.constructor.name);
|
|
@@ -508,7 +518,7 @@ class MTProtoSender {
|
|
let state = this._pending_state.pop(pong.msgId, null);
|
|
let state = this._pending_state.pop(pong.msgId, null);
|
|
// Todo Check result
|
|
// Todo Check result
|
|
if (state) {
|
|
if (state) {
|
|
- state.future.set_result(pong)
|
|
|
|
|
|
+ state.resolve(pong)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -527,7 +537,7 @@ class MTProtoSender {
|
|
this._state.salt = badSalt.newServerSalt;
|
|
this._state.salt = badSalt.newServerSalt;
|
|
let states = this._popStates(badSalt.badMsgId);
|
|
let states = this._popStates(badSalt.badMsgId);
|
|
this._send_queue.extend(states);
|
|
this._send_queue.extend(states);
|
|
- this._log.debug('%d message(s) will be resent', states.length);
|
|
|
|
|
|
+ this._log.debug(`${states.length} message(s) will be resent`);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -638,7 +648,7 @@ class MTProtoSender {
|
|
let state = this._pending_state[msgId];
|
|
let state = this._pending_state[msgId];
|
|
if (state && state.request instanceof LogOutRequest) {
|
|
if (state && state.request instanceof LogOutRequest) {
|
|
delete this._pending_state[msgId];
|
|
delete this._pending_state[msgId];
|
|
- state.future.set_result(true)
|
|
|
|
|
|
+ state.resolve(true)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -659,7 +669,7 @@ class MTProtoSender {
|
|
this._log.debug(`Handling future salts for message ${message.msgId}`);
|
|
this._log.debug(`Handling future salts for message ${message.msgId}`);
|
|
let state = self._pending_state.pop(message.msgId, null);
|
|
let state = self._pending_state.pop(message.msgId, null);
|
|
if (state) {
|
|
if (state) {
|
|
- state.future.set_result(message.obj)
|
|
|
|
|
|
+ state.resolve(message.obj)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -685,327 +695,7 @@ class MTProtoSender {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Adds an update handler (a method with one argument, the received
|
|
|
|
- * TLObject) that is fired when there are updates available
|
|
|
|
- * @param handler {function}
|
|
|
|
- */
|
|
|
|
- addUpdateHandler(handler) {
|
|
|
|
- let firstHandler = Boolean(this.onUpdateHandlers.length);
|
|
|
|
- this.onUpdateHandlers.push(handler);
|
|
|
|
- // If this is the first added handler,
|
|
|
|
- // we must start receiving updates
|
|
|
|
- if (firstHandler) {
|
|
|
|
- this.setListenForUpdates(true);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Removes an update handler (a method with one argument, the received
|
|
|
|
- * TLObject) that is fired when there are updates available
|
|
|
|
- * @param handler {function}
|
|
|
|
- */
|
|
|
|
- removeUpdateHandler(handler) {
|
|
|
|
- let index = this.onUpdateHandlers.indexOf(handler);
|
|
|
|
- if (index !== -1) this.onUpdateHandlers.splice(index, 1);
|
|
|
|
- if (!Boolean(this.onUpdateHandlers.length)) {
|
|
|
|
- this.setListenForUpdates(false);
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- *
|
|
|
|
- * @param confirmed {boolean}
|
|
|
|
- * @returns {number}
|
|
|
|
- */
|
|
|
|
- generateSequence(confirmed) {
|
|
|
|
- if (confirmed) {
|
|
|
|
- let result = this.session.sequence * 2 + 1;
|
|
|
|
- this.session.sequence += 1;
|
|
|
|
- return result;
|
|
|
|
- } else {
|
|
|
|
- return this.session.sequence * 2;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- *
|
|
|
|
- * @param request
|
|
|
|
- */
|
|
|
|
- async receive(request) {
|
|
|
|
- try {
|
|
|
|
- //Try until we get an update
|
|
|
|
- while (!request.confirmReceived) {
|
|
|
|
- let {seq, body} = await this.transport.receive();
|
|
|
|
- let {message, remoteMsgId, remoteSequence} = this.decodeMsg(body);
|
|
|
|
- console.log("processing msg");
|
|
|
|
- await this.processMsg(remoteMsgId, remoteSequence, message, 0, request);
|
|
|
|
- console.log("finished processing msg");
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- // Todo
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // region Low level processing
|
|
|
|
- /**
|
|
|
|
- * Sends the given packet bytes with the additional
|
|
|
|
- * information of the original request.
|
|
|
|
- * @param packet
|
|
|
|
- * @param request
|
|
|
|
- */
|
|
|
|
- async sendPacket(packet, request) {
|
|
|
|
- request.msgId = this.session.getNewMsgId();
|
|
|
|
- // First Calculate plainText to encrypt it
|
|
|
|
- let first = Buffer.alloc(8);
|
|
|
|
- let second = Buffer.alloc(8);
|
|
|
|
- let third = Buffer.alloc(8);
|
|
|
|
- let forth = Buffer.alloc(4);
|
|
|
|
- let fifth = Buffer.alloc(4);
|
|
|
|
- first.writeBigUInt64LE(this.session.salt, 0);
|
|
|
|
- second.writeBigUInt64LE(this.session.id, 0);
|
|
|
|
- third.writeBigUInt64LE(request.msgId, 0);
|
|
|
|
- forth.writeInt32LE(this.generateSequence(request.confirmed), 0);
|
|
|
|
- fifth.writeInt32LE(packet.length, 0);
|
|
|
|
- let plain = Buffer.concat([
|
|
|
|
- first,
|
|
|
|
- second,
|
|
|
|
- third,
|
|
|
|
- forth,
|
|
|
|
- fifth,
|
|
|
|
- packet
|
|
|
|
- ]);
|
|
|
|
- let msgKey = Helpers.calcMsgKey(plain);
|
|
|
|
- let {key, iv} = Helpers.calcKey(this.session.authKey.key, msgKey, true);
|
|
|
|
-
|
|
|
|
- let cipherText = AES.encryptIge(plain, key, iv);
|
|
|
|
-
|
|
|
|
- //And then finally send the encrypted packet
|
|
|
|
-
|
|
|
|
- first = Buffer.alloc(8);
|
|
|
|
- first.writeBigUInt64LE(this.session.authKey.keyId, 0);
|
|
|
|
- let cipher = Buffer.concat([
|
|
|
|
- first,
|
|
|
|
- msgKey,
|
|
|
|
- cipherText,
|
|
|
|
- ]);
|
|
|
|
- await this.transport.send(cipher);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- *
|
|
|
|
- * @param body {Buffer}
|
|
|
|
- * @returns {{remoteMsgId: number, remoteSequence: BigInt, message: Buffer}}
|
|
|
|
- */
|
|
|
|
- decodeMsg(body) {
|
|
|
|
- if (body.length < 8) {
|
|
|
|
- throw Error("Can't decode packet");
|
|
|
|
- }
|
|
|
|
- let remoteAuthKeyId = body.readBigInt64LE(0);
|
|
|
|
- let offset = 8;
|
|
|
|
- let msgKey = body.slice(offset, offset + 16);
|
|
|
|
- offset += 16;
|
|
|
|
- let {key, iv} = Helpers.calcKey(this.session.authKey.key, msgKey, false);
|
|
|
|
- let plainText = AES.decryptIge(body.slice(offset, body.length), key, iv);
|
|
|
|
- offset = 0;
|
|
|
|
- let remoteSalt = plainText.readBigInt64LE(offset);
|
|
|
|
- offset += 8;
|
|
|
|
- let remoteSessionId = plainText.readBigInt64LE(offset);
|
|
|
|
- offset += 8;
|
|
|
|
- let remoteMsgId = plainText.readBigInt64LE(offset);
|
|
|
|
- offset += 8;
|
|
|
|
- let remoteSequence = plainText.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let msgLen = plainText.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let message = plainText.slice(offset, offset + msgLen);
|
|
|
|
- return {message, remoteMsgId, remoteSequence}
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- async processMsg(msgId, sequence, reader, offset, request = undefined) {
|
|
|
|
- this.needConfirmation.push(msgId);
|
|
|
|
- let code = reader.readUInt32LE(offset);
|
|
|
|
- console.log("code is ", code);
|
|
|
|
- // The following codes are "parsed manually"
|
|
|
|
- if (code === 0xf35c6d01) { //rpc_result, (response of an RPC call, i.e., we sent a request)
|
|
|
|
- console.log("got rpc result");
|
|
|
|
- return await this.handleRpcResult(msgId, sequence, reader, offset, request);
|
|
|
|
- }
|
|
|
|
|
|
|
|
- if (code === 0x73f1f8dc) { //msg_container
|
|
|
|
- return this.handleContainer(msgId, sequence, reader, offset, request);
|
|
|
|
- }
|
|
|
|
- if (code === 0x3072cfa1) { //gzip_packed
|
|
|
|
- return this.handleGzipPacked(msgId, sequence, reader, offset, request);
|
|
|
|
- }
|
|
|
|
- if (code === 0xedab447b) { //bad_server_salt
|
|
|
|
- return await this.handleBadServerSalt(msgId, sequence, reader, offset, request);
|
|
|
|
- }
|
|
|
|
- if (code === 0xa7eff811) { //bad_msg_notification
|
|
|
|
- console.log("bad msg notification");
|
|
|
|
- return this.handleBadMsgNotification(msgId, sequence, reader, offset);
|
|
|
|
- }
|
|
|
|
- /**
|
|
|
|
- * If the code is not parsed manually, then it was parsed by the code generator!
|
|
|
|
- * In this case, we will simply treat the incoming TLObject as an Update,
|
|
|
|
- * if we can first find a matching TLObject
|
|
|
|
- */
|
|
|
|
- console.log("code", code);
|
|
|
|
- if (code === 0x9ec20908) {
|
|
|
|
- return this.handleUpdate(msgId, sequence, reader, offset);
|
|
|
|
- } else {
|
|
|
|
-
|
|
|
|
- if (tlobjects.contains(code)) {
|
|
|
|
- return this.handleUpdate(msgId, sequence, reader);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- console.log("Unknown message");
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // region Message handling
|
|
|
|
-
|
|
|
|
- handleUpdate(msgId, sequence, reader, offset = 0) {
|
|
|
|
- let tlobject = Helpers.tgReadObject(reader, offset);
|
|
|
|
- for (let handler of this.onUpdateHandlers) {
|
|
|
|
- handler(tlobject);
|
|
|
|
- }
|
|
|
|
- return Float32Array
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- async handleContainer(msgId, sequence, reader, offset, request) {
|
|
|
|
- let code = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let size = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- for (let i = 0; i < size; i++) {
|
|
|
|
- let innerMsgId = reader.readBigUInt64LE(offset);
|
|
|
|
- offset += 8;
|
|
|
|
- let innerSequence = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let innerLength = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- if (!(await this.processMsg(innerMsgId, sequence, reader, offset, request))) {
|
|
|
|
- offset += innerLength;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return false;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- async handleBadServerSalt(msgId, sequence, reader, offset, request) {
|
|
|
|
- let code = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let badMsgId = reader.readBigUInt64LE(offset);
|
|
|
|
- offset += 8;
|
|
|
|
- let badMsgSeqNo = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let errorCode = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let newSalt = reader.readBigUInt64LE(offset);
|
|
|
|
- offset += 8;
|
|
|
|
- this.session.salt = newSalt;
|
|
|
|
-
|
|
|
|
- if (!request) {
|
|
|
|
- throw Error("Tried to handle a bad server salt with no request specified");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //Resend
|
|
|
|
- await this.send(request, true);
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- handleBadMsgNotification(msgId, sequence, reader, offset) {
|
|
|
|
- let code = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let requestId = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let requestSequence = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let errorCode = reader.readInt32LE(offset);
|
|
|
|
- return new BadMessageError(errorCode);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- async handleRpcResult(msgId, sequence, reader, offset, request) {
|
|
|
|
- if (!request) {
|
|
|
|
- throw Error("RPC results should only happen after a request was sent");
|
|
|
|
- }
|
|
|
|
- let buffer = Buffer.alloc(0);
|
|
|
|
- let code = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let requestId = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let innerCode = reader.readUInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- if (requestId === request.msgId) {
|
|
|
|
- request.confirmReceived = true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (innerCode === 0x2144ca19) { // RPC Error
|
|
|
|
- console.log("Got an error");
|
|
|
|
- let errorCode = reader.readInt32LE(offset);
|
|
|
|
- offset += 4;
|
|
|
|
- let errorMessage = Helpers.tgReadString(reader, offset);
|
|
|
|
- offset = errorMessage.offset;
|
|
|
|
- errorMessage = errorMessage.data;
|
|
|
|
- let error = new RPCError(errorCode, errorMessage);
|
|
|
|
- if (error.mustResend) {
|
|
|
|
- request.confirmReceived = false;
|
|
|
|
- }
|
|
|
|
- if (error.message.startsWith("FLOOD_WAIT_")) {
|
|
|
|
- console.log("Should wait {}s. Sleeping until then.".format(error.additionalData));
|
|
|
|
- await Helpers.sleep();
|
|
|
|
- } else if (error.message.startsWith("PHONE_MIGRATE_")) {
|
|
|
|
- throw new InvalidDCError(error.additionalData);
|
|
|
|
- } else {
|
|
|
|
- throw error;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- } else {
|
|
|
|
- console.log("no errors");
|
|
|
|
- if (innerCode === 0x3072cfa1) { //GZip packed
|
|
|
|
- console.log("Gzipped data");
|
|
|
|
- let res = Helpers.tgReadByte(reader, offset);
|
|
|
|
- let unpackedData = await ungzip(res.data);
|
|
|
|
- offset = res.offset;
|
|
|
|
- res = request.onResponse(unpackedData, offset);
|
|
|
|
- buffer = res.data;
|
|
|
|
- offset = res.offset;
|
|
|
|
- } else {
|
|
|
|
- console.log("plain data");
|
|
|
|
- offset -= 4;
|
|
|
|
- let res = request.onResponse(reader, offset);
|
|
|
|
- buffer = res.data;
|
|
|
|
- offset = res.offset;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return {buffer, offset}
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- handleGzipPacked(msgId, sequence, reader, offset, request) {
|
|
|
|
- throw Error("not implemented");
|
|
|
|
- // TODO
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- setListenForUpdates(enabled) {
|
|
|
|
-
|
|
|
|
- if (enabled) {
|
|
|
|
- console.log("Enabled updates");
|
|
|
|
- } else {
|
|
|
|
- console.log("Disabled updates");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- updatesListenMethod() {
|
|
|
|
- while (true) {
|
|
|
|
- let {seq, body} = this.transport.receive();
|
|
|
|
- let {message, remoteMsgId, remoteSequence} = this.decodeMsg(body);
|
|
|
|
- this.processMsg(remoteMsgId, remoteSequence, message);
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
module.exports = MTProtoSender;
|
|
module.exports = MTProtoSender;
|