MessagePacker.js 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. const Helpers = require("../utils/Helpers");
  2. const MessageContainer = require("../tl/core/MessageContainer");
  3. const TLMessage = require("../tl/core/TLMessage");
  4. const {TLRequest} = require("../tl/tlobject");
  5. const BinaryWriter = require("../extensions/BinaryWriter");
  6. class MessagePacker {
  7. constructor(state, logger) {
  8. this._state = state;
  9. this._queue = [];
  10. this._ready = false;
  11. this._log = logger;
  12. }
  13. append(state) {
  14. this._queue.push(state);
  15. this._ready = true;
  16. }
  17. extend(states) {
  18. for (let state of states) {
  19. this._queue.push(state);
  20. }
  21. this._ready = true;
  22. }
  23. async get() {
  24. if (!this._queue.length) {
  25. this._ready = false;
  26. while (!this._ready) {
  27. await Helpers.sleep(100);
  28. }
  29. }
  30. let data;
  31. let buffer = new BinaryWriter(Buffer.alloc(0));
  32. let batch = [];
  33. let size = 0;
  34. while (this._queue.length && batch.length <= MessageContainer.MAXIMUM_LENGTH) {
  35. let state = this._queue.shift();
  36. size += state.data.length + TLMessage.SIZE_OVERHEAD;
  37. if (size <= MessageContainer.MAXIMUM_SIZE) {
  38. let afterId;
  39. if (state.after) {
  40. afterId = state.after.msgId;
  41. }
  42. state.msgId = await this._state.writeDataAsMessage(
  43. buffer, state.data, state.request instanceof TLRequest,
  44. afterId
  45. );
  46. this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.constructor.name}`);
  47. batch.push(state);
  48. continue;
  49. }
  50. if (batch.length) {
  51. this._queue.unshift(state);
  52. break;
  53. }
  54. this._log.warning(`Message payload for ${state.request.constructor.name} is too long ${state.data.length} and cannot be sent`);
  55. state.promise.reject("Request Payload is too big");
  56. size = 0;
  57. continue
  58. }
  59. if (!batch.length) {
  60. return null;
  61. }
  62. if (batch.length > 1) {
  63. data = Buffer.concat([struct.pack(
  64. '<Ii', MessageContainer.CONSTRUCTOR_ID, batch.length
  65. ), buffer.getValue()]);
  66. let containerId = await this._state.writeDataAsMessage(
  67. buffer, data, false
  68. );
  69. for (let s of batch) {
  70. s.containerId = containerId;
  71. }
  72. }
  73. data = buffer.getValue();
  74. return {batch, data}
  75. }
  76. }
  77. module.exports = MessagePacker;