socket.ts 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import { util } from "./util";
  2. import { EventEmitter } from "eventemitter3";
  3. import { SocketEventType } from "./enums";
  4. class HttpRequest {
  5. index = 1;
  6. readonly buffer: number[] = [];
  7. previousRequest: HttpRequest;
  8. private _xmlHttpRequest = new XMLHttpRequest();
  9. private _onError = sender => {};
  10. private _onSuccess = sender => {};
  11. set onError(handler) {
  12. this._onError = handler;
  13. }
  14. set onSuccess(handler) {
  15. this._onSuccess = handler;
  16. }
  17. constructor(readonly streamIndex: number, private readonly _httpUrl: string) {
  18. const self = this;
  19. this._xmlHttpRequest.onerror = () => {
  20. self._onError(self);
  21. };
  22. this._xmlHttpRequest.onreadystatechange = () => {
  23. if (self.needsClearPreviousRequest()) {
  24. self.clearPreviousRequest();
  25. } else if (self.isSuccess()) {
  26. self._onSuccess(self);
  27. }
  28. };
  29. }
  30. send(): void {
  31. this._xmlHttpRequest.open(
  32. "post",
  33. this._httpUrl + "/id?i=" + this.streamIndex,
  34. true
  35. );
  36. this._xmlHttpRequest.send(null);
  37. }
  38. abort(): void {
  39. this._xmlHttpRequest.abort();
  40. this._xmlHttpRequest = null;
  41. }
  42. isSuccess(): boolean {
  43. return (
  44. this._xmlHttpRequest.readyState > 2 &&
  45. this._xmlHttpRequest.status === 200 &&
  46. !!this._xmlHttpRequest.responseText
  47. );
  48. }
  49. needsClearPreviousRequest(): boolean {
  50. return this._xmlHttpRequest.readyState === 2 && !!this.previousRequest;
  51. }
  52. clearPreviousRequest(): void {
  53. if (!this.previousRequest) return;
  54. this.previousRequest.abort();
  55. this.previousRequest = null;
  56. }
  57. getMessages(): string[] {
  58. return this._xmlHttpRequest.responseText.split("\n");
  59. }
  60. hasBufferedIndices(): boolean {
  61. return this.buffer.length > 0;
  62. }
  63. popBufferedIndex(): number {
  64. return this.buffer.shift();
  65. }
  66. pushIndexToBuffer(index: number) {
  67. this.buffer.push(index);
  68. }
  69. }
  70. /**
  71. * An abstraction on top of WebSockets and XHR streaming to provide fastest
  72. * possible connection for peers.
  73. */
  74. export class Socket extends EventEmitter {
  75. private readonly HTTP_TIMEOUT = 25000;
  76. private _disconnected = false;
  77. private _id: string;
  78. private _messagesQueue: Array<any> = [];
  79. private _httpUrl: string;
  80. private _wsUrl: string;
  81. private _socket: WebSocket;
  82. private _httpRequest: HttpRequest;
  83. private _timeout: any;
  84. constructor(
  85. secure: any,
  86. host: string,
  87. port: number,
  88. path: string,
  89. key: string,
  90. wsport = port
  91. ) {
  92. super();
  93. const httpProtocol = secure ? "https://" : "http://";
  94. const wsProtocol = secure ? "wss://" : "ws://";
  95. this._httpUrl = httpProtocol + host + ":" + port + path + key;
  96. this._wsUrl = wsProtocol + host + ":" + wsport + path + "peerjs?key=" + key;
  97. }
  98. /** Check in with ID or get one from server. */
  99. start(id: string, token: string): void {
  100. this._id = id;
  101. this._httpUrl += "/" + id + "/" + token;
  102. this._wsUrl += "&id=" + id + "&token=" + token;
  103. this._startXhrStream();
  104. this._startWebSocket();
  105. }
  106. /** Start up websocket communications. */
  107. private _startWebSocket(): void {
  108. if (this._socket) {
  109. return;
  110. }
  111. this._socket = new WebSocket(this._wsUrl);
  112. const self = this;
  113. this._socket.onmessage = function(event) {
  114. let data;
  115. try {
  116. data = JSON.parse(event.data);
  117. } catch (e) {
  118. util.log("Invalid server message", event.data);
  119. return;
  120. }
  121. self.emit(SocketEventType.Message, data);
  122. };
  123. this._socket.onclose = function(event) {
  124. util.log("Socket closed.");
  125. self._disconnected = true;
  126. self.emit(SocketEventType.Disconnected);
  127. };
  128. // Take care of the queue of connections if necessary and make sure Peer knows
  129. // socket is open.
  130. this._socket.onopen = function() {
  131. if (self._timeout) {
  132. clearTimeout(self._timeout);
  133. setTimeout(function() {
  134. self._httpRequest.abort();
  135. self._httpRequest = null;
  136. }, 5000);
  137. }
  138. self._sendQueuedMessages();
  139. util.log("Socket open");
  140. };
  141. }
  142. /** Start XHR streaming. */
  143. private _startXhrStream(streamIndex: number = 0) {
  144. const newRequest = new HttpRequest(streamIndex, this._httpUrl);
  145. this._httpRequest = newRequest;
  146. newRequest.onError = () => {
  147. // If we get an error, likely something went wrong.
  148. // Stop streaming.
  149. clearTimeout(this._timeout);
  150. this.emit(SocketEventType.Disconnected);
  151. };
  152. newRequest.onSuccess = () => {
  153. this._handleStream(newRequest);
  154. };
  155. try {
  156. newRequest.send();
  157. this._setHTTPTimeout();
  158. } catch (e) {
  159. util.log("XMLHttpRequest not available; defaulting to WebSockets");
  160. }
  161. }
  162. /** Handles onreadystatechange response as a stream. */
  163. private _handleStream(httpRequest: HttpRequest) {
  164. // 3 and 4 are loading/done state. All others are not relevant.
  165. const messages = httpRequest.getMessages();
  166. // Check to see if anything needs to be processed on buffer.
  167. while (httpRequest.hasBufferedIndices()) {
  168. const index = httpRequest.popBufferedIndex();
  169. let bufferedMessage = messages[index];
  170. try {
  171. bufferedMessage = JSON.parse(bufferedMessage);
  172. } catch (e) {
  173. //TODO should we need to put it back?
  174. httpRequest.buffer.unshift(index);
  175. break;
  176. }
  177. this.emit(SocketEventType.Message, bufferedMessage);
  178. }
  179. let message = messages[httpRequest.index];
  180. if (message) {
  181. httpRequest.index += 1;
  182. // Buffering--this message is incomplete and we'll get to it next time.
  183. // This checks if the httpResponse ended in a `\n`, in which case the last
  184. // element of messages should be the empty string.
  185. if (httpRequest.index === messages.length) {
  186. httpRequest.pushIndexToBuffer(httpRequest.index - 1);
  187. } else {
  188. try {
  189. message = JSON.parse(message);
  190. } catch (e) {
  191. util.log("Invalid server message", message);
  192. return;
  193. }
  194. this.emit(SocketEventType.Message, message);
  195. }
  196. }
  197. }
  198. private _setHTTPTimeout() {
  199. const self = this;
  200. this._timeout = setTimeout(function() {
  201. if (!self._wsOpen()) {
  202. const oldHttp = self._httpRequest;
  203. self._startXhrStream(oldHttp.streamIndex + 1);
  204. self._httpRequest.previousRequest = oldHttp;
  205. } else {
  206. self._httpRequest.abort();
  207. self._httpRequest = null;
  208. }
  209. }, this.HTTP_TIMEOUT);
  210. }
  211. /** Is the websocket currently open? */
  212. private _wsOpen(): boolean {
  213. return this._socket && this._socket.readyState == 1;
  214. }
  215. /** Send queued messages. */
  216. private _sendQueuedMessages(): void {
  217. //TODO is it ok?
  218. //Create copy of queue and clear it,
  219. //because send method push the message back to queue if smth will go wrong
  220. const copiedQueue = [...this._messagesQueue];
  221. this._messagesQueue = [];
  222. for (const message of copiedQueue) {
  223. this.send(message);
  224. }
  225. }
  226. /** Exposed send for DC & Peer. */
  227. send(data: any): void {
  228. if (this._disconnected) {
  229. return;
  230. }
  231. // If we didn't get an ID yet, we can't yet send anything so we should queue
  232. // up these messages.
  233. if (!this._id) {
  234. this._messagesQueue.push(data);
  235. return;
  236. }
  237. if (!data.type) {
  238. this.emit(SocketEventType.Error, "Invalid message");
  239. return;
  240. }
  241. const message = JSON.stringify(data);
  242. if (this._wsOpen()) {
  243. this._socket.send(message);
  244. } else {
  245. const http = new XMLHttpRequest();
  246. const url = this._httpUrl + "/" + data.type.toLowerCase();
  247. http.open("post", url, true);
  248. http.setRequestHeader("Content-Type", "application/json");
  249. http.send(message);
  250. }
  251. }
  252. close(): void {
  253. if (!this._disconnected && this._wsOpen()) {
  254. this._socket.close();
  255. this._disconnected = true;
  256. }
  257. }
  258. }