123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- const isBrowser = (typeof window !== 'undefined');
- const utils = {
- sleep: (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); }
- };
- const cleanPeriod = 5*1000;//5 секунд
- class WebSocketConnection {
- //messageLifeTime в секундах (проверка каждый cleanPeriod интервал)
- constructor(url, openTimeoutSecs = 10, messageLifeTimeSecs = 30, webSocketOptions = {}) {
- this.WebSocket = (isBrowser ? WebSocket : require('ws'));
- this.url = url;
- this.webSocketOptions = webSocketOptions;
- this.ws = null;
- this.listeners = [];
- this.messageQueue = [];
- this.messageLifeTime = messageLifeTimeSecs*1000;
- this.openTimeout = openTimeoutSecs*1000;
- this.requestId = 0;
- this.wsErrored = false;
- this.closed = false;
- this.connecting = false;
- this.periodicClean();//no await
- }
- //рассылаем сообщение и удаляем те обработчики, которые его получили
- emit(mes, isError) {
- const len = this.listeners.length;
- if (len > 0) {
- let newListeners = [];
- for (const listener of this.listeners) {
- let emitted = false;
- if (isError) {
- listener.onError(mes);
- emitted = true;
- } else {
- if ( (listener.requestId && mes.requestId && listener.requestId === mes.requestId) ||
- (!listener.requestId && !mes.requestId) ) {
- listener.onMessage(mes);
- emitted = true;
- }
- }
- if (!emitted)
- newListeners.push(listener);
- }
- this.listeners = newListeners;
- }
-
- return this.listeners.length != len;
- }
- get isOpen() {
- return (this.ws && this.ws.readyState == this.WebSocket.OPEN);
- }
- processMessageQueue() {
- let newMessageQueue = [];
- for (const message of this.messageQueue) {
- if (!this.emit(message.mes)) {
- newMessageQueue.push(message);
- }
- }
- this.messageQueue = newMessageQueue;
- }
- _open() {
- return new Promise((resolve, reject) => { (async() => {
- if (this.closed)
- reject(new Error('Этот экземпляр класса уничтожен. Пожалуйста, создайте новый.'));
- if (this.connecting) {
- let i = this.openTimeout/100;
- while (i-- > 0 && this.connecting) {
- await utils.sleep(100);
- }
- }
- //проверим подключение, и если нет, то подключимся заново
- if (this.isOpen) {
- resolve(this.ws);
- } else {
- this.connecting = true;
- this.terminate();
- if (isBrowser) {
- const protocol = (window.location.protocol == 'https:' ? 'wss:' : 'ws:');
- const url = this.url || `${protocol}//${window.location.host}/ws`;
- this.ws = new this.WebSocket(url);
- } else {
- this.ws = new this.WebSocket(this.url, this.webSocketOptions);
- }
- const onopen = () => {
- this.connecting = false;
- resolve(this.ws);
- };
- const onmessage = (data) => {
- try {
- if (isBrowser)
- data = data.data;
- const mes = JSON.parse(data);
- this.messageQueue.push({regTime: Date.now(), mes});
- this.processMessageQueue();
- } catch (e) {
- this.emit(e.message, true);
- }
- };
- const onerror = (e) => {
- this.emit(e.message, true);
- reject(new Error(e.message));
- };
- const onclose = (e) => {
- this.emit(e.message, true);
- reject(new Error(e.message));
- };
- if (isBrowser) {
- this.ws.onopen = onopen;
- this.ws.onmessage = onmessage;
- this.ws.onerror = onerror;
- this.ws.onclose = onclose;
- } else {
- this.ws.on('open', onopen);
- this.ws.on('message', onmessage);
- this.ws.on('error', onerror);
- this.ws.on('close', onclose);
- }
- await utils.sleep(this.openTimeout);
- reject(new Error('Соединение не удалось'));
- }
- })() });
- }
- //timeout в секундах (проверка каждый cleanPeriod интервал)
- message(requestId, timeoutSecs = 4) {
- return new Promise((resolve, reject) => {
- this.listeners.push({
- regTime: Date.now(),
- requestId,
- timeout: timeoutSecs*1000,
- onMessage: (mes) => {
- resolve(mes);
- },
- onError: (mes) => {
- reject(new Error(mes));
- }
- });
-
- this.processMessageQueue();
- });
- }
- async send(req, timeoutSecs = 4) {
- await this._open();
- if (this.isOpen) {
- this.requestId = (this.requestId < 1000000 ? this.requestId + 1 : 1);
- const requestId = this.requestId;//реентерабельность!!!
- this.ws.send(JSON.stringify(Object.assign({requestId}, req)));
- let resp = {};
- try {
- resp = await this.message(requestId, timeoutSecs);
- } catch(e) {
- this.terminate();
- throw new Error('WebSocket не отвечает');
- }
- if (resp._rok) {
- return requestId;
- } else {
- throw new Error('Запрос не принят сервером');
- }
- } else {
- throw new Error('WebSocket коннект закрыт');
- }
- }
- terminate() {
- if (this.ws) {
- if (isBrowser) {
- this.ws.close();
- } else {
- this.ws.terminate();
- }
- }
- this.ws = null;
- }
- close() {
- this.terminate();
- this.closed = true;
- }
- async periodicClean() {
- while (!this.closed) {
- try {
- const now = Date.now();
- //чистка listeners
- let newListeners = [];
- for (const listener of this.listeners) {
- if (now - listener.regTime < listener.timeout) {
- newListeners.push(listener);
- } else {
- if (listener.onError)
- listener.onError('Время ожидания ответа истекло');
- }
- }
- this.listeners = newListeners;
- //чистка messageQueue
- let newMessageQueue = [];
- for (const message of this.messageQueue) {
- if (now - message.regTime < this.messageLifeTime) {
- newMessageQueue.push(message);
- }
- }
- this.messageQueue = newMessageQueue;
- } catch(e) {
- //
- }
- await utils.sleep(cleanPeriod);
- }
- }
- }
- module.exports = WebSocketConnection;
|