socket.ts 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import { util } from "./util";
  2. import { EventEmitter } from "eventemitter3";
  3. /**
  4. * An abstraction on top of WebSockets and XHR streaming to provide fastest
  5. * possible connection for peers.
  6. */
  7. export function Socket(secure, host, port, path, key, wsport) {
  8. if (!(this instanceof Socket))
  9. return new Socket(secure, host, port, path, key, wsport);
  10. wsport = wsport || port;
  11. EventEmitter.call(this);
  12. // Disconnected manually.
  13. this.disconnected = false;
  14. this._queue = [];
  15. var httpProtocol = secure ? "https://" : "http://";
  16. var wsProtocol = secure ? "wss://" : "ws://";
  17. this._httpUrl = httpProtocol + host + ":" + port + path + key;
  18. this._wsUrl = wsProtocol + host + ":" + wsport + path + "peerjs?key=" + key;
  19. }
  20. util.inherits(Socket, EventEmitter);
  21. /** Check in with ID or get one from server. */
  22. Socket.prototype.start = function(id, token) {
  23. this.id = id;
  24. this._httpUrl += "/" + id + "/" + token;
  25. this._wsUrl += "&id=" + id + "&token=" + token;
  26. this._startXhrStream();
  27. this._startWebSocket();
  28. };
  29. /** Start up websocket communications. */
  30. Socket.prototype._startWebSocket = function(id) {
  31. var self = this;
  32. if (this._socket) {
  33. return;
  34. }
  35. this._socket = new WebSocket(this._wsUrl);
  36. this._socket.onmessage = function(event) {
  37. try {
  38. var data = JSON.parse(event.data);
  39. } catch (e) {
  40. util.log("Invalid server message", event.data);
  41. return;
  42. }
  43. self.emit("message", data);
  44. };
  45. this._socket.onclose = function(event) {
  46. util.log("Socket closed.");
  47. self.disconnected = true;
  48. self.emit("disconnected");
  49. };
  50. // Take care of the queue of connections if necessary and make sure Peer knows
  51. // socket is open.
  52. this._socket.onopen = function() {
  53. if (self._timeout) {
  54. clearTimeout(self._timeout);
  55. setTimeout(function() {
  56. self._http.abort();
  57. self._http = null;
  58. }, 5000);
  59. }
  60. self._sendQueuedMessages();
  61. util.log("Socket open");
  62. };
  63. };
  64. /** Start XHR streaming. */
  65. Socket.prototype._startXhrStream = function(n) {
  66. try {
  67. var self = this;
  68. this._http = new XMLHttpRequest();
  69. this._http._index = 1;
  70. this._http._streamIndex = n || 0;
  71. this._http.open(
  72. "post",
  73. this._httpUrl + "/id?i=" + this._http._streamIndex,
  74. true
  75. );
  76. this._http.onerror = function() {
  77. // If we get an error, likely something went wrong.
  78. // Stop streaming.
  79. clearTimeout(self._timeout);
  80. self.emit("disconnected");
  81. };
  82. this._http.onreadystatechange = function() {
  83. if (this.readyState == 2 && this.old) {
  84. this.old.abort();
  85. delete this.old;
  86. } else if (
  87. this.readyState > 2 &&
  88. this.status === 200 &&
  89. this.responseText
  90. ) {
  91. self._handleStream(this);
  92. }
  93. };
  94. this._http.send(null);
  95. this._setHTTPTimeout();
  96. } catch (e) {
  97. util.log("XMLHttpRequest not available; defaulting to WebSockets");
  98. }
  99. };
  100. /** Handles onreadystatechange response as a stream. */
  101. Socket.prototype._handleStream = function(http) {
  102. // 3 and 4 are loading/done state. All others are not relevant.
  103. var messages = http.responseText.split("\n");
  104. // Check to see if anything needs to be processed on buffer.
  105. if (http._buffer) {
  106. while (http._buffer.length > 0) {
  107. var index = http._buffer.shift();
  108. var bufferedMessage = messages[index];
  109. try {
  110. bufferedMessage = JSON.parse(bufferedMessage);
  111. } catch (e) {
  112. http._buffer.shift(index);
  113. break;
  114. }
  115. this.emit("message", bufferedMessage);
  116. }
  117. }
  118. var message = messages[http._index];
  119. if (message) {
  120. http._index += 1;
  121. // Buffering--this message is incomplete and we'll get to it next time.
  122. // This checks if the httpResponse ended in a `\n`, in which case the last
  123. // element of messages should be the empty string.
  124. if (http._index === messages.length) {
  125. if (!http._buffer) {
  126. http._buffer = [];
  127. }
  128. http._buffer.push(http._index - 1);
  129. } else {
  130. try {
  131. message = JSON.parse(message);
  132. } catch (e) {
  133. util.log("Invalid server message", message);
  134. return;
  135. }
  136. this.emit("message", message);
  137. }
  138. }
  139. };
  140. Socket.prototype._setHTTPTimeout = function() {
  141. var self = this;
  142. this._timeout = setTimeout(function() {
  143. var old = self._http;
  144. if (!self._wsOpen()) {
  145. self._startXhrStream(old._streamIndex + 1);
  146. self._http.old = old;
  147. } else {
  148. old.abort();
  149. }
  150. }, 25000);
  151. };
  152. /** Is the websocket currently open? */
  153. Socket.prototype._wsOpen = function() {
  154. return this._socket && this._socket.readyState == 1;
  155. };
  156. /** Send queued messages. */
  157. Socket.prototype._sendQueuedMessages = function() {
  158. for (var i = 0, ii = this._queue.length; i < ii; i += 1) {
  159. this.send(this._queue[i]);
  160. }
  161. };
  162. /** Exposed send for DC & Peer. */
  163. Socket.prototype.send = function(data) {
  164. if (this.disconnected) {
  165. return;
  166. }
  167. // If we didn't get an ID yet, we can't yet send anything so we should queue
  168. // up these messages.
  169. if (!this.id) {
  170. this._queue.push(data);
  171. return;
  172. }
  173. if (!data.type) {
  174. this.emit("error", "Invalid message");
  175. return;
  176. }
  177. var message = JSON.stringify(data);
  178. if (this._wsOpen()) {
  179. this._socket.send(message);
  180. } else {
  181. var http = new XMLHttpRequest();
  182. var url = this._httpUrl + "/" + data.type.toLowerCase();
  183. http.open("post", url, true);
  184. http.setRequestHeader("Content-Type", "application/json");
  185. http.send(message);
  186. }
  187. };
  188. Socket.prototype.close = function() {
  189. if (!this.disconnected && this._wsOpen()) {
  190. this._socket.close();
  191. this.disconnected = true;
  192. }
  193. };