MTProtoSender.ts 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. /**
  2. * MTProto Mobile Protocol sender
  3. * (https://core.telegram.org/mtproto/description)
  4. * This class is responsible for wrapping requests into `TLMessage`'s,
  5. * sending them over the network and receiving them in a safe manner.
  6. *
  7. * Automatic reconnection due to temporary network issues is a concern
  8. * for this class as well, including retry of messages that could not
  9. * be sent successfully.
  10. *
  11. * A new authorization key will be generated on connection if no other
  12. * key exists yet.
  13. */
  14. import { AuthKey } from "../crypto/AuthKey";
  15. import { MTProtoState } from "./MTProtoState";
  16. import { BinaryReader, Logger } from "../extensions";
  17. import { MessagePacker } from "../extensions";
  18. import { GZIPPacked, MessageContainer, RPCResult, TLMessage } from "../tl/core";
  19. import { Api } from "../tl";
  20. import bigInt from "big-integer";
  21. import { sleep } from "../Helpers";
  22. import { RequestState } from "./RequestState";
  23. import { doAuthentication } from "./Authenticator";
  24. import { MTProtoPlainSender } from "./MTProtoPlainSender";
  25. import {
  26. BadMessageError,
  27. TypeNotFoundError,
  28. InvalidBufferError,
  29. SecurityError,
  30. RPCMessageToError,
  31. } from "../errors";
  32. import { Connection, UpdateConnectionState } from "./";
  33. import type { TelegramClient } from "..";
  34. import { LogLevel } from "../extensions/Logger";
  35. interface DEFAULT_OPTIONS {
  36. logger: any;
  37. retries: number;
  38. delay: number;
  39. autoReconnect: boolean;
  40. connectTimeout: any;
  41. authKeyCallback: any;
  42. updateCallback?: any;
  43. autoReconnectCallback?: any;
  44. isMainSender: boolean;
  45. dcId: number;
  46. senderCallback?: any;
  47. client: TelegramClient;
  48. onConnectionBreak?: CallableFunction;
  49. securityChecks: boolean;
  50. }
  51. export class MTProtoSender {
  52. static DEFAULT_OPTIONS = {
  53. logger: null,
  54. retries: Infinity,
  55. delay: 2000,
  56. autoReconnect: true,
  57. connectTimeout: null,
  58. authKeyCallback: null,
  59. updateCallback: null,
  60. autoReconnectCallback: null,
  61. isMainSender: null,
  62. senderCallback: null,
  63. onConnectionBreak: undefined,
  64. securityChecks: true,
  65. };
  66. _connection?: Connection;
  67. private readonly _log: Logger;
  68. private _dcId: number;
  69. private readonly _retries: number;
  70. private readonly _delay: number;
  71. private _connectTimeout: null;
  72. private _autoReconnect: boolean;
  73. private readonly _authKeyCallback: any;
  74. private readonly _updateCallback: (
  75. client: TelegramClient,
  76. update: UpdateConnectionState
  77. ) => void;
  78. private readonly _autoReconnectCallback?: any;
  79. private readonly _senderCallback: any;
  80. private readonly _isMainSender: boolean;
  81. _userConnected: boolean;
  82. _reconnecting: boolean;
  83. _disconnected: boolean;
  84. private _sendLoopHandle: any;
  85. private _recvLoopHandle: any;
  86. readonly authKey: AuthKey;
  87. private readonly _state: MTProtoState;
  88. private _sendQueue: MessagePacker;
  89. private _pendingState: Map<string, RequestState>;
  90. private readonly _pendingAck: Set<any>;
  91. private readonly _lastAcks: any[];
  92. private readonly _handlers: any;
  93. private readonly _client: TelegramClient;
  94. private readonly _onConnectionBreak?: CallableFunction;
  95. userDisconnected: boolean;
  96. isConnecting: boolean;
  97. _authenticated: boolean;
  98. private _securityChecks: boolean;
  99. /**
  100. * @param authKey
  101. * @param opts
  102. */
  103. constructor(authKey: undefined | AuthKey, opts: DEFAULT_OPTIONS) {
  104. const args = {
  105. ...MTProtoSender.DEFAULT_OPTIONS,
  106. ...opts,
  107. };
  108. this._connection = undefined;
  109. this._log = args.logger;
  110. this._dcId = args.dcId;
  111. this._retries = args.retries;
  112. this._delay = args.delay;
  113. this._autoReconnect = args.autoReconnect;
  114. this._connectTimeout = args.connectTimeout;
  115. this._authKeyCallback = args.authKeyCallback;
  116. this._updateCallback = args.updateCallback;
  117. this._autoReconnectCallback = args.autoReconnectCallback;
  118. this._isMainSender = args.isMainSender;
  119. this._senderCallback = args.senderCallback;
  120. this._client = args.client;
  121. this._onConnectionBreak = args.onConnectionBreak;
  122. this._securityChecks = args.securityChecks;
  123. /**
  124. * whether we disconnected ourself or telegram did it.
  125. */
  126. this.userDisconnected = false;
  127. /**
  128. * If a disconnection happens for any other reason and it
  129. * was *not* user action then the pending messages won't
  130. * be cleared but on explicit user disconnection all the
  131. * pending futures should be cancelled.
  132. */
  133. this.isConnecting = false;
  134. this._authenticated = false;
  135. this._userConnected = false;
  136. this._reconnecting = false;
  137. this._disconnected = true;
  138. /**
  139. * We need to join the loops upon disconnection
  140. */
  141. this._sendLoopHandle = null;
  142. this._recvLoopHandle = null;
  143. /**
  144. * Preserving the references of the AuthKey and state is important
  145. */
  146. this.authKey = authKey || new AuthKey();
  147. this._state = new MTProtoState(
  148. this.authKey,
  149. this._log,
  150. this._securityChecks
  151. );
  152. /**
  153. * Outgoing messages are put in a queue and sent in a batch.
  154. * Note that here we're also storing their ``_RequestState``.
  155. */
  156. this._sendQueue = new MessagePacker(this._state, this._log);
  157. /**
  158. * Sent states are remembered until a response is received.
  159. */
  160. this._pendingState = new Map<string, any>();
  161. /**
  162. * Responses must be acknowledged, and we can also batch these.
  163. */
  164. this._pendingAck = new Set();
  165. /**
  166. * Similar to pending_messages but only for the last acknowledges.
  167. * These can't go in pending_messages because no acknowledge for them
  168. * is received, but we may still need to resend their state on bad salts.
  169. */
  170. this._lastAcks = [];
  171. /**
  172. * Jump table from response ID to method that handles it
  173. */
  174. this._handlers = {
  175. [RPCResult.CONSTRUCTOR_ID.toString()]:
  176. this._handleRPCResult.bind(this),
  177. [MessageContainer.CONSTRUCTOR_ID.toString()]:
  178. this._handleContainer.bind(this),
  179. [GZIPPacked.CONSTRUCTOR_ID.toString()]:
  180. this._handleGzipPacked.bind(this),
  181. [Api.Pong.CONSTRUCTOR_ID.toString()]: this._handlePong.bind(this),
  182. [Api.BadServerSalt.CONSTRUCTOR_ID.toString()]:
  183. this._handleBadServerSalt.bind(this),
  184. [Api.BadMsgNotification.CONSTRUCTOR_ID.toString()]:
  185. this._handleBadNotification.bind(this),
  186. [Api.MsgDetailedInfo.CONSTRUCTOR_ID.toString()]:
  187. this._handleDetailedInfo.bind(this),
  188. [Api.MsgNewDetailedInfo.CONSTRUCTOR_ID.toString()]:
  189. this._handleNewDetailedInfo.bind(this),
  190. [Api.NewSessionCreated.CONSTRUCTOR_ID.toString()]:
  191. this._handleNewSessionCreated.bind(this),
  192. [Api.MsgsAck.CONSTRUCTOR_ID.toString()]: this._handleAck.bind(this),
  193. [Api.FutureSalts.CONSTRUCTOR_ID.toString()]:
  194. this._handleFutureSalts.bind(this),
  195. [Api.MsgsStateReq.CONSTRUCTOR_ID.toString()]:
  196. this._handleStateForgotten.bind(this),
  197. [Api.MsgResendReq.CONSTRUCTOR_ID.toString()]:
  198. this._handleStateForgotten.bind(this),
  199. [Api.MsgsAllInfo.CONSTRUCTOR_ID.toString()]:
  200. this._handleMsgAll.bind(this),
  201. };
  202. }
  203. set dcId(dcId: number) {
  204. this._dcId = dcId;
  205. }
  206. get dcId() {
  207. return this._dcId;
  208. }
  209. // Public API
  210. /**
  211. * Connects to the specified given connection using the given auth key.
  212. */
  213. async connect(connection: Connection, force?: boolean) {
  214. if (this._userConnected && !force) {
  215. this._log.info("User is already connected!");
  216. return false;
  217. }
  218. this.isConnecting = true;
  219. this._connection = connection;
  220. for (let attempt = 0; attempt < this._retries; attempt++) {
  221. try {
  222. await this._connect();
  223. if (this._updateCallback) {
  224. this._updateCallback(
  225. this._client,
  226. new UpdateConnectionState(
  227. UpdateConnectionState.connected
  228. )
  229. );
  230. }
  231. break;
  232. } catch (err) {
  233. if (this._updateCallback && attempt === 0) {
  234. this._updateCallback(
  235. this._client,
  236. new UpdateConnectionState(
  237. UpdateConnectionState.disconnected
  238. )
  239. );
  240. }
  241. this._log.error(
  242. `WebSocket connection failed attempt: ${attempt + 1}`
  243. );
  244. if (this._log.canSend(LogLevel.ERROR)) {
  245. console.error(err);
  246. }
  247. await sleep(this._delay);
  248. }
  249. }
  250. this.isConnecting = false;
  251. return true;
  252. }
  253. isConnected() {
  254. return this._userConnected;
  255. }
  256. /**
  257. * Cleanly disconnects the instance from the network, cancels
  258. * all pending requests, and closes the send and receive loops.
  259. */
  260. async disconnect() {
  261. this.userDisconnected = true;
  262. await this._disconnect();
  263. }
  264. /**
  265. *
  266. This method enqueues the given request to be sent. Its send
  267. state will be saved until a response arrives, and a ``Future``
  268. that will be resolved when the response arrives will be returned:
  269. .. code-block:: javascript
  270. async def method():
  271. # Sending (enqueued for the send loop)
  272. future = sender.send(request)
  273. # Receiving (waits for the receive loop to read the result)
  274. result = await future
  275. Designed like this because Telegram may send the response at
  276. any point, and it can send other items while one waits for it.
  277. Once the response for this future arrives, it is set with the
  278. received result, quite similar to how a ``receive()`` call
  279. would otherwise work.
  280. Since the receiving part is "built in" the future, it's
  281. impossible to await receive a result that was never sent.
  282. * @param request
  283. * @returns {RequestState}
  284. */
  285. send(request: Api.AnyRequest): any {
  286. if (!this._userConnected) {
  287. throw new Error(
  288. "Cannot send requests while disconnected. You need to call .connect()"
  289. );
  290. }
  291. const state = new RequestState(request);
  292. this._sendQueue.append(state);
  293. return state.promise;
  294. }
  295. /**
  296. * Performs the actual connection, retrying, generating the
  297. * authorization key if necessary, and starting the send and
  298. * receive loops.
  299. * @returns {Promise<void>}
  300. * @private
  301. */
  302. async _connect() {
  303. this._log.info(
  304. "Connecting to {0} using {1}"
  305. .replace("{0}", this._connection!.toString())
  306. .replace("{1}", this._connection!.socket.toString())
  307. );
  308. await this._connection!.connect();
  309. this._log.debug("Connection success!");
  310. if (!this.authKey.getKey()) {
  311. const plain = new MTProtoPlainSender(this._connection, this._log);
  312. this._log.debug("New auth_key attempt ...");
  313. const res = await doAuthentication(plain, this._log);
  314. this._log.debug("Generated new auth_key successfully");
  315. await this.authKey.setKey(res.authKey);
  316. this._state.timeOffset = res.timeOffset;
  317. /**
  318. * This is *EXTREMELY* important since we don't control
  319. * external references to the authorization key, we must
  320. * notify whenever we change it. This is crucial when we
  321. * switch to different data centers.
  322. */
  323. if (this._authKeyCallback) {
  324. await this._authKeyCallback(this.authKey, this._dcId);
  325. }
  326. } else {
  327. this._authenticated = true;
  328. this._log.debug("Already have an auth key ...");
  329. }
  330. this._userConnected = true;
  331. this._reconnecting = false;
  332. this._log.debug("Starting receive loop");
  333. this._recvLoopHandle = this._recvLoop();
  334. this._log.debug("Starting send loop");
  335. this._sendLoopHandle = this._sendLoop();
  336. // _disconnected only completes after manual disconnection
  337. // or errors after which the sender cannot continue such
  338. // as failing to reconnect or any unexpected error.
  339. this._log.info(
  340. "Connection to %s complete!".replace(
  341. "%s",
  342. this._connection!.toString()
  343. )
  344. );
  345. }
  346. async _disconnect(error = null) {
  347. this._sendQueue.rejectAll();
  348. if (this._connection === null) {
  349. this._log.info("Not disconnecting (already have no connection)");
  350. return;
  351. }
  352. if (this._updateCallback) {
  353. this._updateCallback(
  354. this._client,
  355. new UpdateConnectionState(UpdateConnectionState.disconnected)
  356. );
  357. }
  358. this._log.info(
  359. "Disconnecting from %s...".replace(
  360. "%s",
  361. this._connection!.toString()
  362. )
  363. );
  364. this._userConnected = false;
  365. this._log.debug("Closing current connection...");
  366. await this._connection!.disconnect();
  367. }
  368. /**
  369. * This loop is responsible for popping items off the send
  370. * queue, encrypting them, and sending them over the network.
  371. * Besides `connect`, only this method ever sends data.
  372. * @returns {Promise<void>}
  373. * @private
  374. */
  375. async _sendLoop() {
  376. this._sendQueue = new MessagePacker(this._state, this._log);
  377. while (this._userConnected && !this._reconnecting) {
  378. if (this._pendingAck.size) {
  379. const ack = new RequestState(
  380. new Api.MsgsAck({ msgIds: Array(...this._pendingAck) })
  381. );
  382. this._sendQueue.append(ack);
  383. this._lastAcks.push(ack);
  384. if (this._lastAcks.length >= 10) {
  385. this._lastAcks.shift();
  386. }
  387. this._pendingAck.clear();
  388. }
  389. this._log.debug(
  390. "Waiting for messages to send..." + this._reconnecting
  391. );
  392. // TODO Wait for the connection send queue to be empty?
  393. // This means that while it's not empty we can wait for
  394. // more messages to be added to the send queue.
  395. const res = await this._sendQueue.get();
  396. if (this._reconnecting) {
  397. this._log.debug("Reconnecting. will stop loop");
  398. return;
  399. }
  400. if (!res) {
  401. this._log.debug("Empty result. will not stop loop");
  402. continue;
  403. }
  404. let { data } = res;
  405. const { batch } = res;
  406. this._log.debug(
  407. `Encrypting ${batch.length} message(s) in ${data.length} bytes for sending`
  408. );
  409. data = await this._state.encryptMessageData(data);
  410. try {
  411. await this._connection!.send(data);
  412. } catch (e: any) {
  413. this._log.error(e);
  414. this._log.info("Connection closed while sending data");
  415. return;
  416. }
  417. for (const state of batch) {
  418. if (!Array.isArray(state)) {
  419. if (state.request.classType === "request") {
  420. this._pendingState.set(state.msgId.toString(), state);
  421. }
  422. } else {
  423. for (const s of state) {
  424. if (s.request.classType === "request") {
  425. this._pendingState.set(s.msgId.toString(), s);
  426. }
  427. }
  428. }
  429. }
  430. this._log.debug("Encrypted messages put in a queue to be sent");
  431. }
  432. }
  433. async _recvLoop() {
  434. let body;
  435. let message;
  436. while (this._userConnected && !this._reconnecting) {
  437. // this._log.debug('Receiving items from the network...');
  438. this._log.debug("Receiving items from the network...");
  439. try {
  440. body = await this._connection!.recv();
  441. } catch (e: any) {
  442. /** when the server disconnects us we want to reconnect */
  443. if (!this.userDisconnected) {
  444. this._log.error(e);
  445. this._log.warn("Connection closed while receiving data");
  446. this.reconnect();
  447. }
  448. return;
  449. }
  450. try {
  451. message = await this._state.decryptMessageData(body);
  452. } catch (e: any) {
  453. if (e instanceof TypeNotFoundError) {
  454. // Received object which we don't know how to deserialize
  455. this._log.info(
  456. `Type ${e.invalidConstructorId} not found, remaining data ${e.remaining}`
  457. );
  458. continue;
  459. } else if (e instanceof SecurityError) {
  460. // A step while decoding had the incorrect data. This message
  461. // should not be considered safe and it should be ignored.
  462. this._log.warn(
  463. `Security error while unpacking a received message: ${e}`
  464. );
  465. continue;
  466. } else if (e instanceof InvalidBufferError) {
  467. // 404 means that the server has "forgotten" our auth key and we need to create a new one.
  468. if (e.code === 404) {
  469. this._log.warn(
  470. `Broken authorization key for dc ${this._dcId}; resetting`
  471. );
  472. if (this._updateCallback && this._isMainSender) {
  473. this._updateCallback(
  474. this._client,
  475. new UpdateConnectionState(
  476. UpdateConnectionState.broken
  477. )
  478. );
  479. } else if (
  480. this._onConnectionBreak &&
  481. !this._isMainSender
  482. ) {
  483. // Deletes the current sender from the object
  484. this._onConnectionBreak(this._dcId);
  485. }
  486. } else {
  487. // this happens sometimes when telegram is having some internal issues.
  488. // reconnecting should be enough usually
  489. // since the data we sent and received is probably wrong now.
  490. this._log.warn(
  491. `Invalid buffer ${e.code} for dc ${this._dcId}`
  492. );
  493. this.reconnect();
  494. }
  495. return;
  496. } else {
  497. this._log.error("Unhandled error while receiving data");
  498. this._log.error(e);
  499. this.reconnect();
  500. return;
  501. }
  502. }
  503. try {
  504. await this._processMessage(message);
  505. } catch (e: any) {
  506. this._log.error("Unhandled error while receiving data");
  507. this._log.error(e);
  508. }
  509. }
  510. }
  511. // Response Handlers
  512. /**
  513. * Adds the given message to the list of messages that must be
  514. * acknowledged and dispatches control to different ``_handle_*``
  515. * method based on its type.
  516. * @param message
  517. * @returns {Promise<void>}
  518. * @private
  519. */
  520. async _processMessage(message: TLMessage) {
  521. this._pendingAck.add(message.msgId);
  522. message.obj = await message.obj;
  523. let handler = this._handlers[message.obj.CONSTRUCTOR_ID.toString()];
  524. if (!handler) {
  525. handler = this._handleUpdate.bind(this);
  526. }
  527. await handler(message);
  528. }
  529. /**
  530. * Pops the states known to match the given ID from pending messages.
  531. * This method should be used when the response isn't specific.
  532. * @param msgId
  533. * @returns {*[]}
  534. * @private
  535. */
  536. _popStates(msgId: bigInt.BigInteger) {
  537. let state = this._pendingState.get(msgId.toString());
  538. if (state) {
  539. this._pendingState.delete(msgId.toString());
  540. return [state];
  541. }
  542. const toPop = [];
  543. for (const state of Object.values(this._pendingState)) {
  544. if (state.containerId && state.containerId.equals(msgId)) {
  545. toPop.push(state.msgId);
  546. }
  547. }
  548. if (toPop.length) {
  549. const temp = [];
  550. for (const x of toPop) {
  551. temp.push(this._pendingState.get(x));
  552. this._pendingState.delete(x);
  553. }
  554. return temp;
  555. }
  556. for (const ack of this._lastAcks) {
  557. if (ack.msgId === msgId) {
  558. return [ack];
  559. }
  560. }
  561. return [];
  562. }
  563. /**
  564. * Handles the result for Remote Procedure Calls:
  565. * rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
  566. * This is where the future results for sent requests are set.
  567. * @param message
  568. * @returns {Promise<void>}
  569. * @private
  570. */
  571. _handleRPCResult(message: TLMessage) {
  572. const RPCResult = message.obj;
  573. const state = this._pendingState.get(RPCResult.reqMsgId.toString());
  574. if (state) {
  575. this._pendingState.delete(RPCResult.reqMsgId.toString());
  576. }
  577. this._log.debug(
  578. `Handling RPC result for message ${RPCResult.reqMsgId}`
  579. );
  580. if (!state) {
  581. // TODO We should not get responses to things we never sent
  582. // However receiving a File() with empty bytes is "common".
  583. // See #658, #759 and #958. They seem to happen in a container
  584. // which contain the real response right after.
  585. try {
  586. const reader = new BinaryReader(RPCResult.body);
  587. if (!(reader.tgReadObject() instanceof Api.upload.File)) {
  588. throw new Error("Not an upload.File");
  589. }
  590. } catch (e: any) {
  591. this._log.error(e);
  592. if (e instanceof TypeNotFoundError) {
  593. this._log.info(
  594. `Received response without parent request: ${RPCResult.body}`
  595. );
  596. return;
  597. } else {
  598. throw e;
  599. }
  600. }
  601. return;
  602. }
  603. if (RPCResult.error && state.msgId) {
  604. const error = RPCMessageToError(RPCResult.error, state.request);
  605. this._sendQueue.append(
  606. new RequestState(new Api.MsgsAck({ msgIds: [state.msgId] }))
  607. );
  608. state.reject(error);
  609. } else {
  610. const reader = new BinaryReader(RPCResult.body);
  611. const read = state.request.readResult(reader);
  612. state.resolve(read);
  613. }
  614. }
  615. /**
  616. * Processes the inner messages of a container with many of them:
  617. * msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
  618. * @param message
  619. * @returns {Promise<void>}
  620. * @private
  621. */
  622. async _handleContainer(message: TLMessage) {
  623. this._log.debug("Handling container");
  624. for (const innerMessage of message.obj.messages) {
  625. await this._processMessage(innerMessage);
  626. }
  627. }
  628. /**
  629. * Unpacks the data from a gzipped object and processes it:
  630. * gzip_packed#3072cfa1 packed_data:bytes = Object;
  631. * @param message
  632. * @returns {Promise<void>}
  633. * @private
  634. */
  635. async _handleGzipPacked(message: TLMessage) {
  636. this._log.debug("Handling gzipped data");
  637. const reader = new BinaryReader(message.obj.data);
  638. message.obj = reader.tgReadObject();
  639. await this._processMessage(message);
  640. }
  641. async _handleUpdate(message: TLMessage) {
  642. if (message.obj.SUBCLASS_OF_ID !== 0x8af52aac) {
  643. // crc32(b'Updates')
  644. this._log.warn(
  645. `Note: ${message.obj.className} is not an update, not dispatching it`
  646. );
  647. return;
  648. }
  649. this._log.debug("Handling update " + message.obj.className);
  650. if (this._updateCallback) {
  651. this._updateCallback(this._client, message.obj);
  652. }
  653. }
  654. /**
  655. * Handles pong results, which don't come inside a ``RPCResult``
  656. * but are still sent through a request:
  657. * pong#347773c5 msg_id:long ping_id:long = Pong;
  658. * @param message
  659. * @returns {Promise<void>}
  660. * @private
  661. */
  662. async _handlePong(message: TLMessage) {
  663. const pong = message.obj;
  664. this._log.debug(`Handling pong for message ${pong.msgId}`);
  665. const state = this._pendingState.get(pong.msgId.toString());
  666. this._pendingState.delete(pong.msgId.toString());
  667. // Todo Check result
  668. if (state) {
  669. state.resolve(pong);
  670. }
  671. }
  672. /**
  673. * Corrects the currently used server salt to use the right value
  674. * before enqueuing the rejected message to be re-sent:
  675. * bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int
  676. * error_code:int new_server_salt:long = BadMsgNotification;
  677. * @param message
  678. * @returns {Promise<void>}
  679. * @private
  680. */
  681. async _handleBadServerSalt(message: TLMessage) {
  682. const badSalt = message.obj;
  683. this._log.debug(`Handling bad salt for message ${badSalt.badMsgId}`);
  684. this._state.salt = badSalt.newServerSalt;
  685. const states = this._popStates(badSalt.badMsgId);
  686. this._sendQueue.extend(states);
  687. this._log.debug(`${states.length} message(s) will be resent`);
  688. }
  689. /**
  690. * Adjusts the current state to be correct based on the
  691. * received bad message notification whenever possible:
  692. * bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int
  693. * error_code:int = BadMsgNotification;
  694. * @param message
  695. * @returns {Promise<void>}
  696. * @private
  697. */
  698. async _handleBadNotification(message: TLMessage) {
  699. const badMsg = message.obj;
  700. const states = this._popStates(badMsg.badMsgId);
  701. this._log.debug(`Handling bad msg ${JSON.stringify(badMsg)}`);
  702. if ([16, 17].includes(badMsg.errorCode)) {
  703. // Sent msg_id too low or too high (respectively).
  704. // Use the current msg_id to determine the right time offset.
  705. const to = this._state.updateTimeOffset(bigInt(message.msgId));
  706. this._log.info(`System clock is wrong, set time offset to ${to}s`);
  707. } else if (badMsg.errorCode === 32) {
  708. // msg_seqno too low, so just pump it up by some "large" amount
  709. // TODO A better fix would be to start with a new fresh session ID
  710. this._state._sequence += 64;
  711. } else if (badMsg.errorCode === 33) {
  712. // msg_seqno too high never seems to happen but just in case
  713. this._state._sequence -= 16;
  714. } else {
  715. for (const state of states) {
  716. state.reject(
  717. new BadMessageError(state.request, badMsg.errorCode)
  718. );
  719. }
  720. return;
  721. }
  722. // Messages are to be re-sent once we've corrected the issue
  723. this._sendQueue.extend(states);
  724. this._log.debug(
  725. `${states.length} messages will be resent due to bad msg`
  726. );
  727. }
  728. /**
  729. * Updates the current status with the received detailed information:
  730. * msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long
  731. * bytes:int status:int = MsgDetailedInfo;
  732. * @param message
  733. * @returns {Promise<void>}
  734. * @private
  735. */
  736. async _handleDetailedInfo(message: TLMessage) {
  737. // TODO https://goo.gl/VvpCC6
  738. const msgId = message.obj.answerMsgId;
  739. this._log.debug(`Handling detailed info for message ${msgId}`);
  740. this._pendingAck.add(msgId);
  741. }
  742. /**
  743. * Updates the current status with the received detailed information:
  744. * msg_new_detailed_info#809db6df answer_msg_id:long
  745. * bytes:int status:int = MsgDetailedInfo;
  746. * @param message
  747. * @returns {Promise<void>}
  748. * @private
  749. */
  750. async _handleNewDetailedInfo(message: TLMessage) {
  751. // TODO https://goo.gl/VvpCC6
  752. const msgId = message.obj.answerMsgId;
  753. this._log.debug(`Handling new detailed info for message ${msgId}`);
  754. this._pendingAck.add(msgId);
  755. }
  756. /**
  757. * Updates the current status with the received session information:
  758. * new_session_created#9ec20908 first_msg_id:long unique_id:long
  759. * server_salt:long = NewSession;
  760. * @param message
  761. * @returns {Promise<void>}
  762. * @private
  763. */
  764. async _handleNewSessionCreated(message: TLMessage) {
  765. // TODO https://goo.gl/LMyN7A
  766. this._log.debug("Handling new session created");
  767. this._state.salt = message.obj.serverSalt;
  768. }
  769. /**
  770. * Handles a server acknowledge about our messages. Normally
  771. * these can be ignored except in the case of ``auth.logOut``:
  772. *
  773. * auth.logOut#5717da40 = Bool;
  774. *
  775. * Telegram doesn't seem to send its result so we need to confirm
  776. * it manually. No other request is known to have this behaviour.
  777. * Since the ID of sent messages consisting of a container is
  778. * never returned (unless on a bad notification), this method
  779. * also removes containers messages when any of their inner
  780. * messages are acknowledged.
  781. * @param message
  782. * @returns {Promise<void>}
  783. * @private
  784. */
  785. async _handleAck(message: TLMessage) {
  786. const ack = message.obj;
  787. this._log.debug(`Handling acknowledge for ${ack.msgIds}`);
  788. for (const msgId of ack.msgIds) {
  789. const state = this._pendingState.get(msgId);
  790. if (state && state.request instanceof Api.auth.LogOut) {
  791. this._pendingState.delete(msgId);
  792. state.resolve(true);
  793. }
  794. }
  795. }
  796. /**
  797. * Handles future salt results, which don't come inside a
  798. * ``rpc_result`` but are still sent through a request:
  799. * future_salts#ae500895 req_msg_id:long now:int
  800. * salts:vector<future_salt> = FutureSalts;
  801. * @param message
  802. * @returns {Promise<void>}
  803. * @private
  804. */
  805. async _handleFutureSalts(message: TLMessage) {
  806. // TODO save these salts and automatically adjust to the
  807. // correct one whenever the salt in use expires.
  808. this._log.debug(`Handling future salts for message ${message.msgId}`);
  809. const state = this._pendingState.get(message.msgId.toString());
  810. if (state) {
  811. this._pendingState.delete(message.msgId.toString());
  812. state.resolve(message.obj);
  813. }
  814. }
  815. /**
  816. * Handles both :tl:`MsgsStateReq` and :tl:`MsgResendReq` by
  817. * enqueuing a :tl:`MsgsStateInfo` to be sent at a later point.
  818. * @param message
  819. * @returns {Promise<void>}
  820. * @private
  821. */
  822. async _handleStateForgotten(message: TLMessage) {
  823. this._sendQueue.append(
  824. new RequestState(
  825. new Api.MsgsStateInfo({
  826. reqMsgId: message.msgId,
  827. info: String.fromCharCode(1).repeat(message.obj.msgIds),
  828. })
  829. )
  830. );
  831. }
  832. /**
  833. * Handles :tl:`MsgsAllInfo` by doing nothing (yet).
  834. * @param message
  835. * @returns {Promise<void>}
  836. * @private
  837. */
  838. async _handleMsgAll(message: TLMessage) {}
  839. reconnect() {
  840. if (this._userConnected && !this._reconnecting) {
  841. this._reconnecting = true;
  842. // TODO Should we set this?
  843. // this._user_connected = false
  844. // we want to wait a second between each reconnect try to not flood the server with reconnects
  845. // in case of internal server issues.
  846. sleep(1000).then(() => {
  847. this._log.info("Started reconnecting");
  848. this._reconnect();
  849. });
  850. }
  851. }
  852. async _reconnect() {
  853. this._log.debug("Closing current connection...");
  854. try {
  855. await this.disconnect();
  856. } catch (err: any) {
  857. this._log.warn(err);
  858. }
  859. // @ts-ignore
  860. this._sendQueue.append(undefined);
  861. this._state.reset();
  862. // For some reason reusing existing connection caused stuck requests
  863. const constructor = this._connection!
  864. .constructor as unknown as typeof Connection;
  865. const newConnection = new constructor({
  866. ip: this._connection!._ip,
  867. port: this._connection!._port,
  868. dcId: this._connection!._dcId,
  869. loggers: this._connection!._log,
  870. proxy: this._connection!._proxy,
  871. testServers: this._connection!._testServers,
  872. socket: this._connection!.socket,
  873. });
  874. await this.connect(newConnection, true);
  875. this._reconnecting = false;
  876. this._sendQueue.extend(Array.from(this._pendingState.values()));
  877. this._pendingState = new Map<string, RequestState>();
  878. if (this._autoReconnectCallback) {
  879. this._autoReconnectCallback();
  880. }
  881. }
  882. }