MessagePacker.ts 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import {MessageContainer} from '../tl/core';
  2. import {TLMessage} from '../tl/core';
  3. import {BinaryWriter} from './BinaryWriter';
  4. import type{MTProtoState} from "../network/MTProtoState";
  5. import type{RequestState} from "../network/RequestState";
  6. export class MessagePacker {
  7. private _state: any;
  8. private _queue: any[];
  9. private _ready: Promise<unknown>;
  10. private setReady: ((value?: any) => void) | undefined;
  11. private _log: any;
  12. constructor(state: MTProtoState, logger: any) {
  13. this._state = state;
  14. this._queue = [];
  15. this._ready = new Promise((resolve => {
  16. this.setReady = resolve
  17. }));
  18. this._log = logger
  19. }
  20. values() {
  21. return this._queue
  22. }
  23. append(state: RequestState) {
  24. this._queue.push(state);
  25. if (this.setReady) {
  26. this.setReady(true);
  27. }
  28. }
  29. extend(states: any[]) {
  30. for (const state of states) {
  31. this._queue.push(state)
  32. }
  33. if (this.setReady) {
  34. this.setReady(true);
  35. }
  36. }
  37. async get() {
  38. if (!this._queue.length) {
  39. this._ready = new Promise((resolve => {
  40. this.setReady = resolve
  41. }));
  42. await this._ready
  43. }
  44. if (!this._queue[this._queue.length - 1]) {
  45. this._queue = [];
  46. return
  47. }
  48. let data;
  49. let buffer = new BinaryWriter(Buffer.alloc(0));
  50. const batch = [];
  51. let size = 0;
  52. while (this._queue.length && batch.length <= MessageContainer.MAXIMUM_LENGTH) {
  53. const state = this._queue.shift();
  54. size += state.data.length + TLMessage.SIZE_OVERHEAD;
  55. if (size <= MessageContainer.MAXIMUM_SIZE) {
  56. let afterId;
  57. if (state.after) {
  58. afterId = state.after.msgId
  59. }
  60. state.msgId = await this._state.writeDataAsMessage(
  61. buffer, state.data, state.request.classType === 'request',
  62. afterId,
  63. );
  64. this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.className || state.request.constructor.name}`);
  65. batch.push(state);
  66. continue
  67. }
  68. if (batch.length) {
  69. this._queue.unshift(state);
  70. break
  71. }
  72. this._log.warn(`Message payload for ${state.request.className || state.request.constructor.name} is too long ${state.data.length} and cannot be sent`);
  73. state.promise.reject('Request Payload is too big');
  74. size = 0;
  75. }
  76. if (!batch.length) {
  77. return null
  78. }
  79. if (batch.length > 1) {
  80. const b = Buffer.alloc(8);
  81. b.writeUInt32LE(MessageContainer.CONSTRUCTOR_ID, 0);
  82. b.writeInt32LE(batch.length, 4);
  83. data = Buffer.concat([b, buffer.getValue()]);
  84. buffer = new BinaryWriter(Buffer.alloc(0));
  85. const containerId = await this._state.writeDataAsMessage(
  86. buffer, data, false,
  87. );
  88. for (const s of batch) {
  89. s.containerId = containerId
  90. }
  91. }
  92. data = buffer.getValue();
  93. return {batch, data}
  94. }
  95. }