WebSocketConnection.js 7.7 KB


  1. const isBrowser = (typeof window !== 'undefined');
  2. const utils = {
  3. sleep: (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); }
  4. };
  5. const cleanPeriod = 5*1000;//5 секунд
  6. class WebSocketConnection {
  7. //messageLifeTime в секундах (проверка каждый cleanPeriod интервал)
  8. constructor(url, openTimeoutSecs = 10, messageLifeTimeSecs = 30) {
  9. this.WebSocket = (isBrowser ? WebSocket : require('ws'));
  10. this.url = url;
  11. this.ws = null;
  12. this.listeners = [];
  13. this.messageQueue = [];
  14. this.messageLifeTime = messageLifeTimeSecs*1000;
  15. this.openTimeout = openTimeoutSecs*1000;
  16. this.requestId = 0;
  17. this.wsErrored = false;
  18. this.closed = false;
  19. this.connecting = false;
  20. this.periodicClean();//no await
  21. }
  22. //рассылаем сообщение и удаляем те обработчики, которые его получили
  23. emit(mes, isError) {
  24. const len = this.listeners.length;
  25. if (len > 0) {
  26. let newListeners = [];
  27. for (const listener of this.listeners) {
  28. let emitted = false;
  29. if (isError) {
  30. listener.onError(mes);
  31. emitted = true;
  32. } else {
  33. if ( (listener.requestId && mes.requestId && listener.requestId === mes.requestId) ||
  34. (!listener.requestId && !mes.requestId) ) {
  35. listener.onMessage(mes);
  36. emitted = true;
  37. }
  38. }
  39. if (!emitted)
  40. newListeners.push(listener);
  41. }
  42. this.listeners = newListeners;
  43. }
  44. return this.listeners.length != len;
  45. }
  46. get isOpen() {
  47. return (this.ws && this.ws.readyState == this.WebSocket.OPEN);
  48. }
  49. processMessageQueue() {
  50. let newMessageQueue = [];
  51. for (const message of this.messageQueue) {
  52. if (!this.emit(message.mes)) {
  53. newMessageQueue.push(message);
  54. }
  55. }
  56. this.messageQueue = newMessageQueue;
  57. }
  58. _open() {
  59. return new Promise((resolve, reject) => { (async() => {
  60. if (this.closed)
  61. reject(new Error('Этот экземпляр класса уничтожен. Пожалуйста, создайте новый.'));
  62. if (this.connecting) {
  63. let i = this.openTimeout/100;
  64. while (i-- > 0 && this.connecting) {
  65. await utils.sleep(100);
  66. }
  67. }
  68. //проверим подключение, и если нет, то подключимся заново
  69. if (this.isOpen) {
  70. resolve(this.ws);
  71. } else {
  72. this.connecting = true;
  73. this.terminate();
  74. if (isBrowser) {
  75. const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:');
  76. const url = this.url || `${protocol}//${window.location.host}/ws`;
  77. this.ws = new this.WebSocket(url);
  78. } else {
  79. this.ws = new this.WebSocket(this.url);
  80. }
  81. const onopen = (e) => {
  82. this.connecting = false;
  83. resolve(this.ws);
  84. };
  85. const onmessage = (data) => {
  86. try {
  87. if (isBrowser)
  88. data = data.data;
  89. const mes = JSON.parse(data);
  90. this.messageQueue.push({regTime: Date.now(), mes});
  91. this.processMessageQueue();
  92. } catch (e) {
  93. this.emit(e.message, true);
  94. }
  95. };
  96. const onerror = (e) => {
  97. this.emit(e.message, true);
  98. reject(new Error(e.message));
  99. };
  100. const onclose = (e) => {
  101. this.emit(e.message, true);
  102. reject(new Error(e.message));
  103. };
  104. if (isBrowser) {
  105. this.ws.onopen = onopen;
  106. this.ws.onmessage = onmessage;
  107. this.ws.onerror = onerror;
  108. this.ws.onclose = onclose;
  109. } else {
  110. this.ws.on('open', onopen);
  111. this.ws.on('message', onmessage);
  112. this.ws.on('error', onerror);
  113. this.ws.on('close', onclose);
  114. }
  115. await utils.sleep(this.openTimeout);
  116. reject(new Error('Соединение не удалось'));
  117. }
  118. })() });
  119. }
  120. //timeout в секундах (проверка каждый cleanPeriod интервал)
  121. message(requestId, timeoutSecs = 4) {
  122. return new Promise((resolve, reject) => {
  123. this.listeners.push({
  124. regTime: Date.now(),
  125. requestId,
  126. timeout: timeoutSecs*1000,
  127. onMessage: (mes) => {
  128. resolve(mes);
  129. },
  130. onError: (mes) => {
  131. reject(new Error(mes));
  132. }
  133. });
  134. this.processMessageQueue();
  135. });
  136. }
  137. async send(req, timeoutSecs = 4) {
  138. await this._open();
  139. if (this.isOpen) {
  140. this.requestId = (this.requestId < 1000000 ? this.requestId + 1 : 1);
  141. const requestId = this.requestId;//реентерабельность!!!
  142. this.ws.send(JSON.stringify(Object.assign({requestId, _rpo: 1}, req)));//_rpo: 1 - ждем в ответ _rok: 1
  143. let resp = {};
  144. try {
  145. resp = await this.message(requestId, timeoutSecs);
  146. } catch(e) {
  147. this.terminate();
  148. throw new Error('WebSocket не отвечает');
  149. }
  150. if (resp._rok) {
  151. return requestId;
  152. } else {
  153. throw new Error('Запрос не принят сервером');
  154. }
  155. } else {
  156. throw new Error('WebSocket коннект закрыт');
  157. }
  158. }
  159. terminate() {
  160. if (this.ws) {
  161. if (isBrowser) {
  162. this.ws.close();
  163. } else {
  164. this.ws.terminate();
  165. }
  166. }
  167. this.ws = null;
  168. }
  169. close() {
  170. this.terminate();
  171. this.closed = true;
  172. }
  173. async periodicClean() {
  174. while (!this.closed) {
  175. try {
  176. const now = Date.now();
  177. //чистка listeners
  178. let newListeners = [];
  179. for (const listener of this.listeners) {
  180. if (now - listener.regTime < listener.timeout) {
  181. newListeners.push(listener);
  182. } else {
  183. if (listener.onError)
  184. listener.onError('Время ожидания ответа истекло');
  185. }
  186. }
  187. this.listeners = newListeners;
  188. //чистка messageQueue
  189. let newMessageQueue = [];
  190. for (const message of this.messageQueue) {
  191. if (now - message.regTime < this.messageLifeTime) {
  192. newMessageQueue.push(message);
  193. }
  194. }
  195. this.messageQueue = newMessageQueue;
  196. } catch(e) {
  197. //
  198. }
  199. await utils.sleep(cleanPeriod);
  200. }
  201. }
  202. }
  203. module.exports = WebSocketConnection;