LimitedQueue.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. class LimitedQueue {
  2. constructor(enqueueAfter = 10, size = 100, timeout = 60*60*1000) {//timeout в ms
  3. this.size = size;
  4. this.timeout = timeout;
  5. this.abortCount = 0;
  6. this.enqueueAfter = enqueueAfter;
  7. this.freed = enqueueAfter;
  8. this.listeners = [];
  9. }
  10. _addListener(listener) {
  11. this.listeners.push(listener);
  12. }
  13. //отсылаем сообщение первому ожидающему и удаляем его из списка
  14. _emitFree() {
  15. if (this.listeners.length > 0) {
  16. let listener = this.listeners.shift();
  17. listener.onFree();
  18. for (let i = 0; i < this.listeners.length; i++) {
  19. this.listeners[i].onPlaceChange(i + 1);
  20. }
  21. }
  22. }
  23. get(onPlaceChange) {
  24. return new Promise((resolve, reject) => {
  25. if (this.destroyed)
  26. reject('destroyed');
  27. const take = () => {
  28. if (this.freed <= 0)
  29. throw new Error('Ошибка получения ресурсов в очереди ожидания');
  30. this.freed--;
  31. this.resetTimeout();
  32. let aCount = this.abortCount;
  33. return {
  34. ret: () => {
  35. if (aCount == this.abortCount) {
  36. this.freed++;
  37. this._emitFree();
  38. aCount = -1;
  39. this.resetTimeout();
  40. }
  41. },
  42. abort: () => {
  43. return (aCount != this.abortCount);
  44. },
  45. resetTimeout: this.resetTimeout.bind(this)
  46. };
  47. };
  48. if (this.freed > 0) {
  49. resolve(take());
  50. } else {
  51. if (this.listeners.length < this.size) {
  52. this._addListener({
  53. onFree: () => {
  54. resolve(take());
  55. },
  56. onError: (err) => {
  57. reject(err);
  58. },
  59. onPlaceChange: (i) => {
  60. if (onPlaceChange)
  61. onPlaceChange(i);
  62. }
  63. });
  64. if (onPlaceChange)
  65. onPlaceChange(this.listeners.length);
  66. } else {
  67. reject('Превышен размер очереди ожидания');
  68. }
  69. }
  70. });
  71. }
  72. resetTimeout() {
  73. if (this.timer)
  74. clearTimeout(this.timer);
  75. this.timer = setTimeout(() => { this.clean(); }, this.timeout);
  76. }
  77. clean() {
  78. this.timer = null;
  79. if (this.freed < this.enqueueAfter) {
  80. this.abortCount++;
  81. //чистка listeners
  82. for (const listener of this.listeners) {
  83. listener.onError('Время ожидания в очереди истекло');
  84. }
  85. this.listeners = [];
  86. this.freed = this.enqueueAfter;
  87. }
  88. }
  89. destroy() {
  90. if (this.timer) {
  91. clearTimeout(this.timer);
  92. this.timer = null;
  93. }
  94. for (const listener of this.listeners) {
  95. listener.onError('destroy');
  96. }
  97. this.listeners = [];
  98. this.abortCount++;
  99. this.destroyed = true;
  100. }
  101. }
  102. module.exports = LimitedQueue;