WebSocketConnection.js 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. const isBrowser = (typeof window !== 'undefined');
  2. const utils = {
  3. sleep: (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); }
  4. };
  5. const cleanPeriod = 5*1000;//5 секунд
  6. class WebSocketConnection {
  7. //messageLifeTime в секундах (проверка каждый cleanPeriod интервал)
  8. constructor(url, openTimeoutSecs = 10, messageLifeTimeSecs = 30, webSocketOptions = {}) {
  9. this.WebSocket = (isBrowser ? WebSocket : require('ws'));
  10. this.url = url;
  11. this.webSocketOptions = webSocketOptions;
  12. this.ws = null;
  13. this.listeners = [];
  14. this.messageQueue = [];
  15. this.messageLifeTime = messageLifeTimeSecs*1000;
  16. this.openTimeout = openTimeoutSecs*1000;
  17. this.requestId = 0;
  18. this.wsErrored = false;
  19. this.closed = false;
  20. this.connecting = false;
  21. this.periodicClean();//no await
  22. }
  23. //рассылаем сообщение и удаляем те обработчики, которые его получили
  24. emit(mes, isError) {
  25. const len = this.listeners.length;
  26. if (len > 0) {
  27. let newListeners = [];
  28. for (const listener of this.listeners) {
  29. let emitted = false;
  30. if (isError) {
  31. listener.onError(mes);
  32. emitted = true;
  33. } else {
  34. if ( (listener.requestId && mes.requestId && listener.requestId === mes.requestId) ||
  35. (!listener.requestId && !mes.requestId) ) {
  36. listener.onMessage(mes);
  37. emitted = true;
  38. }
  39. }
  40. if (!emitted)
  41. newListeners.push(listener);
  42. }
  43. this.listeners = newListeners;
  44. }
  45. return this.listeners.length != len;
  46. }
  47. get isOpen() {
  48. return (this.ws && this.ws.readyState == this.WebSocket.OPEN);
  49. }
  50. processMessageQueue() {
  51. let newMessageQueue = [];
  52. for (const message of this.messageQueue) {
  53. if (!this.emit(message.mes)) {
  54. newMessageQueue.push(message);
  55. }
  56. }
  57. this.messageQueue = newMessageQueue;
  58. }
  59. _open() {
  60. return new Promise((resolve, reject) => { (async() => {
  61. if (this.closed)
  62. reject(new Error('Этот экземпляр класса уничтожен. Пожалуйста, создайте новый.'));
  63. if (this.connecting) {
  64. let i = this.openTimeout/100;
  65. while (i-- > 0 && this.connecting) {
  66. await utils.sleep(100);
  67. }
  68. }
  69. //проверим подключение, и если нет, то подключимся заново
  70. if (this.isOpen) {
  71. resolve(this.ws);
  72. } else {
  73. this.connecting = true;
  74. this.terminate();
  75. if (isBrowser) {
  76. const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:');
  77. const url = this.url || `${protocol}//${window.location.host}/ws`;
  78. this.ws = new this.WebSocket(url);
  79. } else {
  80. this.ws = new this.WebSocket(this.url, this.webSocketOptions);
  81. }
  82. const onopen = () => {
  83. this.connecting = false;
  84. resolve(this.ws);
  85. };
  86. const onmessage = (data) => {
  87. try {
  88. if (isBrowser)
  89. data = data.data;
  90. const mes = JSON.parse(data);
  91. this.messageQueue.push({regTime: Date.now(), mes});
  92. this.processMessageQueue();
  93. } catch (e) {
  94. this.emit(e.message, true);
  95. }
  96. };
  97. const onerror = (e) => {
  98. this.emit(e.message, true);
  99. reject(new Error(e.message));
  100. };
  101. const onclose = (e) => {
  102. this.emit(e.message, true);
  103. reject(new Error(e.message));
  104. };
  105. if (isBrowser) {
  106. this.ws.onopen = onopen;
  107. this.ws.onmessage = onmessage;
  108. this.ws.onerror = onerror;
  109. this.ws.onclose = onclose;
  110. } else {
  111. this.ws.on('open', onopen);
  112. this.ws.on('message', onmessage);
  113. this.ws.on('error', onerror);
  114. this.ws.on('close', onclose);
  115. }
  116. await utils.sleep(this.openTimeout);
  117. reject(new Error('Соединение не удалось'));
  118. }
  119. })() });
  120. }
  121. //timeout в секундах (проверка каждый cleanPeriod интервал)
  122. message(requestId, timeoutSecs = 4) {
  123. return new Promise((resolve, reject) => {
  124. this.listeners.push({
  125. regTime: Date.now(),
  126. requestId,
  127. timeout: timeoutSecs*1000,
  128. onMessage: (mes) => {
  129. resolve(mes);
  130. },
  131. onError: (mes) => {
  132. reject(new Error(mes));
  133. }
  134. });
  135. this.processMessageQueue();
  136. });
  137. }
  138. async send(req, timeoutSecs = 4) {
  139. await this._open();
  140. if (this.isOpen) {
  141. this.requestId = (this.requestId < 1000000 ? this.requestId + 1 : 1);
  142. const requestId = this.requestId;//реентерабельность!!!
  143. this.ws.send(JSON.stringify(Object.assign({requestId}, req)));
  144. let resp = {};
  145. try {
  146. resp = await this.message(requestId, timeoutSecs);
  147. } catch(e) {
  148. this.terminate();
  149. throw new Error('WebSocket не отвечает');
  150. }
  151. if (resp._rok) {
  152. return requestId;
  153. } else {
  154. throw new Error('Запрос не принят сервером');
  155. }
  156. } else {
  157. throw new Error('WebSocket коннект закрыт');
  158. }
  159. }
  160. terminate() {
  161. if (this.ws) {
  162. if (isBrowser) {
  163. this.ws.close();
  164. } else {
  165. this.ws.terminate();
  166. }
  167. }
  168. this.ws = null;
  169. }
  170. close() {
  171. this.terminate();
  172. this.closed = true;
  173. }
  174. async periodicClean() {
  175. while (!this.closed) {
  176. try {
  177. const now = Date.now();
  178. //чистка listeners
  179. let newListeners = [];
  180. for (const listener of this.listeners) {
  181. if (now - listener.regTime < listener.timeout) {
  182. newListeners.push(listener);
  183. } else {
  184. if (listener.onError)
  185. listener.onError('Время ожидания ответа истекло');
  186. }
  187. }
  188. this.listeners = newListeners;
  189. //чистка messageQueue
  190. let newMessageQueue = [];
  191. for (const message of this.messageQueue) {
  192. if (now - message.regTime < this.messageLifeTime) {
  193. newMessageQueue.push(message);
  194. }
  195. }
  196. this.messageQueue = newMessageQueue;
  197. } catch(e) {
  198. //
  199. }
  200. await utils.sleep(cleanPeriod);
  201. }
  202. }
  203. }
  204. module.exports = WebSocketConnection;