socket.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. var util = require('./util');
  2. var EventEmitter = require('eventemitter3');
  3. /**
  4. * An abstraction on top of WebSockets and XHR streaming to provide fastest
  5. * possible connection for peers.
  6. */
  7. function Socket(secure, host, port, path, key, wsport) {
  8. if (!(this instanceof Socket)) return new Socket(secure, host, port, path, key, wsport);
  9. wsport = wsport || port;
  10. EventEmitter.call(this);
  11. // Disconnected manually.
  12. this.disconnected = false;
  13. this._queue = [];
  14. var httpProtocol = secure ? 'https://' : 'http://';
  15. var wsProtocol = secure ? 'wss://' : 'ws://';
  16. this._httpUrl = httpProtocol + host + ':' + port + path + key;
  17. this._wsUrl = wsProtocol + host + ':' + wsport + path + 'peerjs?key=' + key;
  18. }
  19. util.inherits(Socket, EventEmitter);
  20. /** Check in with ID or get one from server. */
  21. Socket.prototype.start = function(id, token) {
  22. this.id = id;
  23. this._httpUrl += '/' + id + '/' + token;
  24. this._wsUrl += '&id=' + id + '&token=' + token;
  25. this._startXhrStream();
  26. this._startWebSocket();
  27. }
  28. /** Start up websocket communications. */
  29. Socket.prototype._startWebSocket = function(id) {
  30. var self = this;
  31. if (this._socket) {
  32. return;
  33. }
  34. this._socket = new WebSocket(this._wsUrl);
  35. this._socket.onmessage = function(event) {
  36. try {
  37. var data = JSON.parse(event.data);
  38. } catch(e) {
  39. util.log('Invalid server message', event.data);
  40. return;
  41. }
  42. self.emit('message', data);
  43. };
  44. this._socket.onclose = function(event) {
  45. util.log('Socket closed.');
  46. self.disconnected = true;
  47. self.emit('disconnected');
  48. };
  49. // Take care of the queue of connections if necessary and make sure Peer knows
  50. // socket is open.
  51. this._socket.onopen = function() {
  52. if (self._timeout) {
  53. clearTimeout(self._timeout);
  54. setTimeout(function(){
  55. self._http.abort();
  56. self._http = null;
  57. }, 5000);
  58. }
  59. self._sendQueuedMessages();
  60. util.log('Socket open');
  61. };
  62. }
  63. /** Start XHR streaming. */
  64. Socket.prototype._startXhrStream = function(n) {
  65. try {
  66. var self = this;
  67. this._http = new XMLHttpRequest();
  68. this._http._index = 1;
  69. this._http._streamIndex = n || 0;
  70. this._http.open('post', this._httpUrl + '/id?i=' + this._http._streamIndex, true);
  71. this._http.onerror = function() {
  72. // If we get an error, likely something went wrong.
  73. // Stop streaming.
  74. clearTimeout(self._timeout);
  75. self.emit('disconnected');
  76. }
  77. this._http.onreadystatechange = function() {
  78. if (this.readyState == 2 && this.old) {
  79. this.old.abort();
  80. delete this.old;
  81. } else if (this.readyState > 2 && this.status === 200 && this.responseText) {
  82. self._handleStream(this);
  83. }
  84. };
  85. this._http.send(null);
  86. this._setHTTPTimeout();
  87. } catch(e) {
  88. util.log('XMLHttpRequest not available; defaulting to WebSockets');
  89. }
  90. }
  91. /** Handles onreadystatechange response as a stream. */
  92. Socket.prototype._handleStream = function(http) {
  93. // 3 and 4 are loading/done state. All others are not relevant.
  94. var messages = http.responseText.split('\n');
  95. // Check to see if anything needs to be processed on buffer.
  96. if (http._buffer) {
  97. while (http._buffer.length > 0) {
  98. var index = http._buffer.shift();
  99. var bufferedMessage = messages[index];
  100. try {
  101. bufferedMessage = JSON.parse(bufferedMessage);
  102. } catch(e) {
  103. http._buffer.shift(index);
  104. break;
  105. }
  106. this.emit('message', bufferedMessage);
  107. }
  108. }
  109. var message = messages[http._index];
  110. if (message) {
  111. http._index += 1;
  112. // Buffering--this message is incomplete and we'll get to it next time.
  113. // This checks if the httpResponse ended in a `\n`, in which case the last
  114. // element of messages should be the empty string.
  115. if (http._index === messages.length) {
  116. if (!http._buffer) {
  117. http._buffer = [];
  118. }
  119. http._buffer.push(http._index - 1);
  120. } else {
  121. try {
  122. message = JSON.parse(message);
  123. } catch(e) {
  124. util.log('Invalid server message', message);
  125. return;
  126. }
  127. this.emit('message', message);
  128. }
  129. }
  130. }
  131. Socket.prototype._setHTTPTimeout = function() {
  132. var self = this;
  133. this._timeout = setTimeout(function() {
  134. var old = self._http;
  135. if (!self._wsOpen()) {
  136. self._startXhrStream(old._streamIndex + 1);
  137. self._http.old = old;
  138. } else {
  139. old.abort();
  140. }
  141. }, 25000);
  142. }
  143. /** Is the websocket currently open? */
  144. Socket.prototype._wsOpen = function() {
  145. return this._socket && this._socket.readyState == 1;
  146. }
  147. /** Send queued messages. */
  148. Socket.prototype._sendQueuedMessages = function() {
  149. for (var i = 0, ii = this._queue.length; i < ii; i += 1) {
  150. this.send(this._queue[i]);
  151. }
  152. }
  153. /** Exposed send for DC & Peer. */
  154. Socket.prototype.send = function(data) {
  155. if (this.disconnected) {
  156. return;
  157. }
  158. // If we didn't get an ID yet, we can't yet send anything so we should queue
  159. // up these messages.
  160. if (!this.id) {
  161. this._queue.push(data);
  162. return;
  163. }
  164. if (!data.type) {
  165. this.emit('error', 'Invalid message');
  166. return;
  167. }
  168. var message = JSON.stringify(data);
  169. if (this._wsOpen()) {
  170. this._socket.send(message);
  171. } else {
  172. var http = new XMLHttpRequest();
  173. var url = this._httpUrl + '/' + data.type.toLowerCase();
  174. http.open('post', url, true);
  175. http.setRequestHeader('Content-Type', 'application/json');
  176. http.send(message);
  177. }
  178. }
  179. Socket.prototype.close = function() {
  180. if (!this.disconnected && this._wsOpen()) {
  181. this._socket.close();
  182. this.disconnected = true;
  183. }
  184. }
  185. module.exports = Socket;