123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- var util = require('./util');
- var EventEmitter = require('eventemitter3');
- /**
- * An abstraction on top of WebSockets and XHR streaming to provide fastest
- * possible connection for peers.
- */
- function Socket(secure, host, port, path, key, wsport) {
- if (!(this instanceof Socket)) return new Socket(secure, host, port, path, key, wsport);
- wsport = wsport || port;
- EventEmitter.call(this);
- // Disconnected manually.
- this.disconnected = false;
- this._queue = [];
- var httpProtocol = secure ? 'https://' : 'http://';
- var wsProtocol = secure ? 'wss://' : 'ws://';
- this._httpUrl = httpProtocol + host + ':' + port + path + key;
- this._wsUrl = wsProtocol + host + ':' + wsport + path + 'peerjs?key=' + key;
- }
- util.inherits(Socket, EventEmitter);
- /** Check in with ID or get one from server. */
- Socket.prototype.start = function(id, token) {
- this.id = id;
- this._httpUrl += '/' + id + '/' + token;
- this._wsUrl += '&id=' + id + '&token=' + token;
- this._startXhrStream();
- this._startWebSocket();
- }
- /** Start up websocket communications. */
- Socket.prototype._startWebSocket = function(id) {
- var self = this;
- if (this._socket) {
- return;
- }
- this._socket = new WebSocket(this._wsUrl);
- this._socket.onmessage = function(event) {
- try {
- var data = JSON.parse(event.data);
- } catch(e) {
- util.log('Invalid server message', event.data);
- return;
- }
- self.emit('message', data);
- };
- this._socket.onclose = function(event) {
- util.log('Socket closed.');
- self.disconnected = true;
- self.emit('disconnected');
- };
- // Take care of the queue of connections if necessary and make sure Peer knows
- // socket is open.
- this._socket.onopen = function() {
- if (self._timeout) {
- clearTimeout(self._timeout);
- setTimeout(function(){
- self._http.abort();
- self._http = null;
- }, 5000);
- }
- self._sendQueuedMessages();
- util.log('Socket open');
- };
- }
- /** Start XHR streaming. */
- Socket.prototype._startXhrStream = function(n) {
- try {
- var self = this;
- this._http = new XMLHttpRequest();
- this._http._index = 1;
- this._http._streamIndex = n || 0;
- this._http.open('post', this._httpUrl + '/id?i=' + this._http._streamIndex, true);
- this._http.onerror = function() {
- // If we get an error, likely something went wrong.
- // Stop streaming.
- clearTimeout(self._timeout);
- self.emit('disconnected');
- }
- this._http.onreadystatechange = function() {
- if (this.readyState == 2 && this.old) {
- this.old.abort();
- delete this.old;
- } else if (this.readyState > 2 && this.status === 200 && this.responseText) {
- self._handleStream(this);
- }
- };
- this._http.send(null);
- this._setHTTPTimeout();
- } catch(e) {
- util.log('XMLHttpRequest not available; defaulting to WebSockets');
- }
- }
- /** Handles onreadystatechange response as a stream. */
- Socket.prototype._handleStream = function(http) {
- // 3 and 4 are loading/done state. All others are not relevant.
- var messages = http.responseText.split('\n');
- // Check to see if anything needs to be processed on buffer.
- if (http._buffer) {
- while (http._buffer.length > 0) {
- var index = http._buffer.shift();
- var bufferedMessage = messages[index];
- try {
- bufferedMessage = JSON.parse(bufferedMessage);
- } catch(e) {
- http._buffer.shift(index);
- break;
- }
- this.emit('message', bufferedMessage);
- }
- }
- var message = messages[http._index];
- if (message) {
- http._index += 1;
- // Buffering--this message is incomplete and we'll get to it next time.
- // This checks if the httpResponse ended in a `\n`, in which case the last
- // element of messages should be the empty string.
- if (http._index === messages.length) {
- if (!http._buffer) {
- http._buffer = [];
- }
- http._buffer.push(http._index - 1);
- } else {
- try {
- message = JSON.parse(message);
- } catch(e) {
- util.log('Invalid server message', message);
- return;
- }
- this.emit('message', message);
- }
- }
- }
- Socket.prototype._setHTTPTimeout = function() {
- var self = this;
- this._timeout = setTimeout(function() {
- var old = self._http;
- if (!self._wsOpen()) {
- self._startXhrStream(old._streamIndex + 1);
- self._http.old = old;
- } else {
- old.abort();
- }
- }, 25000);
- }
- /** Is the websocket currently open? */
- Socket.prototype._wsOpen = function() {
- return this._socket && this._socket.readyState == 1;
- }
- /** Send queued messages. */
- Socket.prototype._sendQueuedMessages = function() {
- for (var i = 0, ii = this._queue.length; i < ii; i += 1) {
- this.send(this._queue[i]);
- }
- }
- /** Exposed send for DC & Peer. */
- Socket.prototype.send = function(data) {
- if (this.disconnected) {
- return;
- }
- // If we didn't get an ID yet, we can't yet send anything so we should queue
- // up these messages.
- if (!this.id) {
- this._queue.push(data);
- return;
- }
- if (!data.type) {
- this.emit('error', 'Invalid message');
- return;
- }
- var message = JSON.stringify(data);
- if (this._wsOpen()) {
- this._socket.send(message);
- } else {
- var http = new XMLHttpRequest();
- var url = this._httpUrl + '/' + data.type.toLowerCase();
- http.open('post', url, true);
- http.setRequestHeader('Content-Type', 'application/json');
- http.send(message);
- }
- }
- Socket.prototype.close = function() {
- if (!this.disconnected && this._wsOpen()) {
- this._socket.close();
- this.disconnected = true;
- }
- }
- module.exports = Socket;
|