MessagePacker.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. const MessageContainer = require('../tl/core/MessageContainer')
  2. const TLMessage = require('../tl/core/TLMessage')
  3. const BinaryWriter = require('../extensions/BinaryWriter')
  4. class MessagePacker {
  5. constructor(state, logger) {
  6. this._state = state
  7. this._queue = []
  8. this._ready = new Promise(((resolve) => {
  9. this.setReady = resolve
  10. }))
  11. this._log = logger
  12. }
  13. values() {
  14. return this._queue
  15. }
  16. append(state) {
  17. this._queue.push(state)
  18. this.setReady(true)
  19. }
  20. extend(states) {
  21. for (const state of states) {
  22. this._queue.push(state)
  23. }
  24. this.setReady(true)
  25. }
  26. async get() {
  27. if (!this._queue.length) {
  28. this._ready = new Promise(((resolve) => {
  29. this.setReady = resolve
  30. }))
  31. await this._ready
  32. }
  33. if (!this._queue[this._queue.length - 1]) {
  34. this._queue = []
  35. return
  36. }
  37. let data
  38. let buffer = new BinaryWriter(Buffer.alloc(0))
  39. const batch = []
  40. let size = 0
  41. while (this._queue.length && batch.length <= MessageContainer.MAXIMUM_LENGTH) {
  42. const state = this._queue.shift()
  43. size += state.data.length + TLMessage.SIZE_OVERHEAD
  44. if (size <= MessageContainer.MAXIMUM_SIZE) {
  45. let afterId
  46. if (state.after) {
  47. afterId = state.after.msgId
  48. }
  49. state.msgId = await this._state.writeDataAsMessage(
  50. buffer, state.data, state.request.classType === 'request',
  51. afterId,
  52. )
  53. this._log.debug(`Assigned msgId = ${state.msgId} to ${state.request.className || state.request.constructor.name}`)
  54. batch.push(state)
  55. continue
  56. }
  57. if (batch.length) {
  58. this._queue.unshift(state)
  59. break
  60. }
  61. this._log.warn(`Message payload for ${state.request.className || state.request.constructor.name} is too long ${state.data.length} and cannot be sent`)
  62. state.promise.reject('Request Payload is too big')
  63. size = 0
  64. continue
  65. }
  66. if (!batch.length) {
  67. return null
  68. }
  69. if (batch.length > 1) {
  70. const b = Buffer.alloc(8)
  71. b.writeUInt32LE(MessageContainer.CONSTRUCTOR_ID, 0)
  72. b.writeInt32LE(batch.length, 4)
  73. data = Buffer.concat([b, buffer.getValue()])
  74. buffer = new BinaryWriter(Buffer.alloc(0))
  75. const containerId = await this._state.writeDataAsMessage(
  76. buffer, data, false,
  77. )
  78. for (const s of batch) {
  79. s.containerId = containerId
  80. }
  81. }
  82. data = buffer.getValue()
  83. return { batch, data }
  84. }
  85. }
  86. module.exports = MessagePacker