LimitedQueue.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. const cleanPeriod = 60*1000;//1 минута
  2. const cleanTimeout = 60;//timeout в минутах (cleanPeriod)
  3. class LimitedQueue {
  4. constructor(enqueueAfter = 10, size = 100, timeout = cleanTimeout) {//timeout в минутах (cleanPeriod)
  5. this.size = size;
  6. this.timeout = timeout;
  7. this.freed = enqueueAfter;
  8. this.listeners = [];
  9. this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
  10. }
  11. _addListener(listener) {
  12. this.listeners.push(Object.assign({regTime: Date.now()}, listener));
  13. }
  14. //отсылаем сообщение первому ожидающему и удаляем его из списка
  15. _emitFree() {
  16. if (this.listeners.length > 0) {
  17. let listener = this.listeners.shift();
  18. listener.onFree();
  19. const now = Date.now();
  20. for (let i = 0; i < this.listeners.length; i++) {
  21. listener = this.listeners[i];
  22. listener.regTime = now;
  23. listener.onPlaceChange(i + 1);
  24. }
  25. }
  26. }
  27. get(onPlaceChange) {
  28. return new Promise((resolve, reject) => {
  29. if (this.destroyed)
  30. reject('destroyed');
  31. const take = () => {
  32. if (this.freed <= 0)
  33. throw new Error('Ошибка получения ресурсов в очереди ожидания');
  34. this.freed--;
  35. let returned = false;
  36. return {
  37. ret: () => {
  38. if (!returned) {
  39. this.freed++;
  40. this._emitFree();
  41. returned = true;
  42. }
  43. }
  44. };
  45. };
  46. if (this.freed > 0) {
  47. resolve(take());
  48. } else {
  49. if (this.listeners.length < this.size) {
  50. this._addListener({
  51. onFree: () => {
  52. resolve(take());
  53. },
  54. onError: (err) => {
  55. reject(err);
  56. },
  57. onPlaceChange: (i) => {
  58. if (onPlaceChange)
  59. onPlaceChange(i);
  60. }
  61. });
  62. if (onPlaceChange)
  63. onPlaceChange(this.listeners.length);
  64. } else {
  65. reject('Превышен размер очереди ожидания');
  66. }
  67. }
  68. });
  69. }
  70. destroy() {
  71. if (this.timer) {
  72. clearTimeout(this.timer);
  73. this.timer = null;
  74. }
  75. for (const listener of this.listeners) {
  76. listener.onError('destroy');
  77. }
  78. this.listeners = [];
  79. this.destroyed = true;
  80. }
  81. periodicClean() {
  82. try {
  83. this.timer = null;
  84. const now = Date.now();
  85. //чистка listeners, убираем зависшие в очереди на одном месте
  86. let newListeners = [];
  87. for (const listener of this.listeners) {
  88. if (now - listener.regTime < this.timeout*cleanPeriod - 50) {
  89. newListeners.push(listener);
  90. } else {
  91. listener.onError('Время ожидания в очереди истекло');
  92. }
  93. }
  94. this.listeners = newListeners;
  95. } finally {
  96. if (!this.destroyed) {
  97. this.timer = setTimeout(() => { this.periodicClean(); }, cleanPeriod);
  98. }
  99. }
  100. }
  101. }
  102. module.exports = LimitedQueue;