socket.ts 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import { util } from "./util";
  2. import { EventEmitter } from "eventemitter3";
  3. import { SocketEventType } from "./enums";
  4. class HttpRequest {
  5. index = 1;
  6. readonly buffer: number[] = [];
  7. previousRequest: HttpRequest;
  8. private _xmlHttpRequest = new XMLHttpRequest();
  9. private _onError = sender => { };
  10. private _onSuccess = sender => { };
  11. set onError(handler) {
  12. this._onError = handler;
  13. }
  14. set onSuccess(handler) {
  15. this._onSuccess = handler;
  16. }
  17. constructor(readonly streamIndex: number, private readonly _httpUrl: string) {
  18. const self = this;
  19. this._xmlHttpRequest.onerror = () => {
  20. self._onError(self);
  21. };
  22. this._xmlHttpRequest.onreadystatechange = () => {
  23. if (self.needsClearPreviousRequest()) {
  24. self.clearPreviousRequest();
  25. } else if (self.isSuccess()) {
  26. self._onSuccess(self);
  27. }
  28. };
  29. }
  30. send(): void {
  31. this._xmlHttpRequest.open(
  32. "post",
  33. this._httpUrl + "/id?i=" + this.streamIndex,
  34. true
  35. );
  36. this._xmlHttpRequest.send(null);
  37. }
  38. abort(): void {
  39. this._xmlHttpRequest.abort();
  40. this._xmlHttpRequest = null;
  41. }
  42. isSuccess(): boolean {
  43. return (
  44. this._xmlHttpRequest.readyState > 2 &&
  45. this._xmlHttpRequest.status === 200 &&
  46. !!this._xmlHttpRequest.responseText
  47. );
  48. }
  49. needsClearPreviousRequest(): boolean {
  50. return this._xmlHttpRequest.readyState === 2 && !!this.previousRequest;
  51. }
  52. clearPreviousRequest(): void {
  53. if (!this.previousRequest) return;
  54. this.previousRequest.abort();
  55. this.previousRequest = null;
  56. }
  57. getMessages(): string[] {
  58. return this._xmlHttpRequest.responseText.split("\n");
  59. }
  60. hasBufferedIndices(): boolean {
  61. return this.buffer.length > 0;
  62. }
  63. popBufferedIndex(): number {
  64. return this.buffer.shift();
  65. }
  66. pushIndexToBuffer(index: number) {
  67. this.buffer.push(index);
  68. }
  69. }
  70. /**
  71. * An abstraction on top of WebSockets and XHR streaming to provide fastest
  72. * possible connection for peers.
  73. */
  74. export class Socket extends EventEmitter {
  75. private readonly HTTP_TIMEOUT = 25000;//ms
  76. private readonly WEB_SOCKET_PING_INTERVAL = 20000;//ms
  77. private _disconnected = false;
  78. private _id: string;
  79. private _messagesQueue: Array<any> = [];
  80. private _httpUrl: string;
  81. private _wsUrl: string;
  82. private _socket: WebSocket;
  83. private _wsPingTimer: any;
  84. private _httpRequest: HttpRequest;
  85. private _timeout: any;
  86. constructor(
  87. secure: any,
  88. host: string,
  89. port: number,
  90. path: string,
  91. key: string,
  92. wsport = port
  93. ) {
  94. super();
  95. const httpProtocol = secure ? "https://" : "http://";
  96. const wsProtocol = secure ? "wss://" : "ws://";
  97. this._httpUrl = httpProtocol + host + ":" + port + path + key;
  98. this._wsUrl = wsProtocol + host + ":" + wsport + path + "peerjs?key=" + key;
  99. }
  100. /** Check in with ID or get one from server. */
  101. start(id: string, token: string): void {
  102. this._id = id;
  103. this._httpUrl += "/" + id + "/" + token;
  104. this._wsUrl += "&id=" + id + "&token=" + token;
  105. this._startXhrStream();
  106. this._startWebSocket();
  107. }
  108. /** Start up websocket communications. */
  109. private _startWebSocket(): void {
  110. if (this._socket) {
  111. return;
  112. }
  113. this._socket = new WebSocket(this._wsUrl);
  114. const self = this;
  115. this._socket.onmessage = function (event) {
  116. let data;
  117. try {
  118. data = JSON.parse(event.data);
  119. } catch (e) {
  120. util.log("Invalid server message", event.data);
  121. return;
  122. }
  123. self.emit(SocketEventType.Message, data);
  124. };
  125. this._socket.onclose = function (event) {
  126. util.log("Socket closed.", event);;
  127. self._disconnected = true;
  128. self.emit(SocketEventType.Disconnected);
  129. };
  130. // Take care of the queue of connections if necessary and make sure Peer knows
  131. // socket is open.
  132. this._socket.onopen = function () {
  133. if (self._timeout) {
  134. clearTimeout(self._timeout);
  135. setTimeout(function () {
  136. self._httpRequest.abort();
  137. self._httpRequest = null;
  138. }, 5000);
  139. }
  140. self._sendQueuedMessages();
  141. util.log("Socket open");
  142. self._wsPingTimer = setTimeout(function () { self._sendHeartbeat() }, self.WEB_SOCKET_PING_INTERVAL);
  143. };
  144. }
  145. /** Start XHR streaming. */
  146. private _startXhrStream(streamIndex: number = 0) {
  147. const newRequest = new HttpRequest(streamIndex, this._httpUrl);
  148. this._httpRequest = newRequest;
  149. newRequest.onError = () => {
  150. // If we get an error, likely something went wrong.
  151. // Stop streaming.
  152. clearTimeout(this._timeout);
  153. this.emit(SocketEventType.Disconnected);
  154. };
  155. newRequest.onSuccess = () => {
  156. this._handleStream(newRequest);
  157. };
  158. try {
  159. newRequest.send();
  160. this._setHTTPTimeout();
  161. } catch (e) {
  162. util.log("XMLHttpRequest not available; defaulting to WebSockets");
  163. }
  164. }
  165. private _sendHeartbeat(): void {
  166. if (!this._wsOpen()) {
  167. util.log(`Cannot send heartbeat, because socket closed`);
  168. return;
  169. }
  170. const data = { type: 'HEARTBEAT' };
  171. const message = JSON.stringify(data);
  172. this._socket.send(message);
  173. const self = this;
  174. this._wsPingTimer = setTimeout(function () { self._sendHeartbeat() }, this.WEB_SOCKET_PING_INTERVAL);
  175. }
  176. /** Handles onreadystatechange response as a stream. */
  177. private _handleStream(httpRequest: HttpRequest) {
  178. // 3 and 4 are loading/done state. All others are not relevant.
  179. const messages = httpRequest.getMessages();
  180. // Check to see if anything needs to be processed on buffer.
  181. while (httpRequest.hasBufferedIndices()) {
  182. const index = httpRequest.popBufferedIndex();
  183. let bufferedMessage = messages[index];
  184. try {
  185. bufferedMessage = JSON.parse(bufferedMessage);
  186. } catch (e) {
  187. //TODO should we need to put it back?
  188. httpRequest.buffer.unshift(index);
  189. break;
  190. }
  191. this.emit(SocketEventType.Message, bufferedMessage);
  192. }
  193. let message = messages[httpRequest.index];
  194. if (message) {
  195. httpRequest.index += 1;
  196. // Buffering--this message is incomplete and we'll get to it next time.
  197. // This checks if the httpResponse ended in a `\n`, in which case the last
  198. // element of messages should be the empty string.
  199. if (httpRequest.index === messages.length) {
  200. httpRequest.pushIndexToBuffer(httpRequest.index - 1);
  201. } else {
  202. try {
  203. message = JSON.parse(message);
  204. } catch (e) {
  205. util.log("Invalid server message", message);
  206. return;
  207. }
  208. this.emit(SocketEventType.Message, message);
  209. }
  210. }
  211. }
  212. private _setHTTPTimeout() {
  213. const self = this;
  214. this._timeout = setTimeout(function () {
  215. if (!self._wsOpen()) {
  216. const oldHttp = self._httpRequest;
  217. self._startXhrStream(oldHttp.streamIndex + 1);
  218. self._httpRequest.previousRequest = oldHttp;
  219. } else {
  220. self._httpRequest.abort();
  221. self._httpRequest = null;
  222. }
  223. }, this.HTTP_TIMEOUT);
  224. }
  225. /** Is the websocket currently open? */
  226. private _wsOpen(): boolean {
  227. return this._socket && this._socket.readyState == 1;
  228. }
  229. /** Send queued messages. */
  230. private _sendQueuedMessages(): void {
  231. //TODO is it ok?
  232. //Create copy of queue and clear it,
  233. //because send method push the message back to queue if smth will go wrong
  234. const copiedQueue = [...this._messagesQueue];
  235. this._messagesQueue = [];
  236. for (const message of copiedQueue) {
  237. this.send(message);
  238. }
  239. }
  240. /** Exposed send for DC & Peer. */
  241. send(data: any): void {
  242. if (this._disconnected) {
  243. return;
  244. }
  245. // If we didn't get an ID yet, we can't yet send anything so we should queue
  246. // up these messages.
  247. if (!this._id) {
  248. this._messagesQueue.push(data);
  249. return;
  250. }
  251. if (!data.type) {
  252. this.emit(SocketEventType.Error, "Invalid message");
  253. return;
  254. }
  255. const message = JSON.stringify(data);
  256. if (this._wsOpen()) {
  257. this._socket.send(message);
  258. } else {
  259. const http = new XMLHttpRequest();
  260. const url = this._httpUrl + "/" + data.type.toLowerCase();
  261. http.open("post", url, true);
  262. http.setRequestHeader("Content-Type", "application/json");
  263. http.send(message);
  264. }
  265. }
  266. close(): void {
  267. if (!this._disconnected && this._wsOpen()) {
  268. this._socket.close();
  269. this._disconnected = true;
  270. clearTimeout(this._wsPingTimer);
  271. }
  272. }
  273. }