server.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. var util = require('./util');
  2. var express = require('express');
  3. var http = require('http');
  4. var EventEmitter = require('events').EventEmitter;
  5. var WebSocketServer = require('ws').Server;
  6. var url = require('url');
  7. function PeerServer(options) {
  8. if (!(this instanceof PeerServer)) return new PeerServer(options);
  9. EventEmitter.call(this);
  10. this._app = express();
  11. this._httpServer = http.createServer(this._app);
  12. this._app.use(express.bodyParser());
  13. this._app.use(this._allowCrossDomain);
  14. this._options = util.extend({
  15. port: 80,
  16. debug: false,
  17. timeout: 5000
  18. }, options);
  19. util.debug = this._options.debug;
  20. // Listen on user-specified port and create WebSocket server as well.
  21. this._httpServer.listen(this._options.port);
  22. this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
  23. // WebSockets that are opened or HTTP responses (which are paired with
  24. // something in timeouts.
  25. this._clients = {};
  26. // Timeouts for HTTP responses.
  27. this._timeouts = {};
  28. // Connections waiting for another peer.
  29. this._outstandingOffers = {};
  30. // Initailize WebSocket server handlers.
  31. this._initializeWSS();
  32. // Initialize HTTP routes. This is only used for the first few milliseconds
  33. // before a socket is opened for a Peer.
  34. this._initializeHTTP();
  35. };
  36. util.inherits(PeerServer, EventEmitter);
  37. /** Handle CORS */
  38. PeerServer.prototype._allowCrossDomain = function(req, res, next) {
  39. res.setHeader('Access-Control-Allow-Origin', '*');
  40. res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
  41. res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
  42. next();
  43. }
  44. /** Initialize WebSocket server. */
  45. PeerServer.prototype._initializeWSS = function() {
  46. var self = this;
  47. this._wss.on('connection', function(socket) {
  48. var id = url.parse(socket.upgradeReq.url, true).query.id;
  49. if (!!id && !!self._clients[id]) {
  50. // If response client and timeout exist, overwrite and clear.
  51. if (!!self._timeouts[id]) {
  52. clearTimeout(self._timeouts[id]);
  53. delete self._timeouts[id];
  54. self._clients[id].end(JSON.stringify({ type: 'HTTP-SOCKET' }));
  55. } else {
  56. socket.send(JSON.stringify({ type: 'ID-TAKEN', msg: 'ID is taken' }));
  57. socket.close();
  58. return;
  59. }
  60. } else if (id === undefined) {
  61. id = self._generateClientId();
  62. }
  63. socket.send(JSON.stringify({ type: 'OPEN', id: id }));
  64. // Save the socket for this id.
  65. self._clients[id] = socket;
  66. self._processOutstandingOffers(id);
  67. // Cleanup after a socket closes.
  68. socket.on('close', function() {
  69. util.log('Socket closed:', id);
  70. self._removePeer(id);
  71. });
  72. // Handle messages from peers.
  73. socket.on('message', function(data) {
  74. try {
  75. var message = JSON.parse(data);
  76. util.log(message);
  77. switch (message.type) {
  78. case 'LEAVE':
  79. // Clean up if a Peer sends a LEAVE.
  80. if (!message.dst) {
  81. self._removePeer(id);
  82. break;
  83. }
  84. // ICE candidates
  85. case 'CANDIDATE':
  86. // Offer or answer between peers.
  87. case 'OFFER':
  88. case 'ANSWER':
  89. // Firefoxism (connectDataConnection ports)
  90. case 'PORT':
  91. // Use the ID we know to be correct to prevent spoofing.
  92. message.src = id;
  93. self._handleTransmission(message);
  94. break;
  95. default:
  96. util.prettyError('Message unrecognized');
  97. }
  98. } catch(e) {
  99. util.log('Invalid message', data);
  100. }
  101. });
  102. });
  103. };
  104. /** Process outstanding peer offers. */
  105. PeerServer.prototype._processOutstandingOffers = function(id) {
  106. var offers = this._outstandingOffers[id];
  107. if (offers === undefined) {
  108. return;
  109. }
  110. var sources = Object.keys(offers);
  111. for (var i = 0, ii = sources.length; i < ii; i += 1) {
  112. var messages = offers[sources[i]];
  113. for (var j = 0, jj = messages.length; j < jj; j += 1) {
  114. this._handleTransmission.apply(this, messages[j]);
  115. }
  116. delete this._outstandingOffers[id][sources[i]];
  117. }
  118. };
  119. /** Initialize HTTP server routes. */
  120. PeerServer.prototype._initializeHTTP = function() {
  121. var self = this;
  122. this._app.options('/*', function(req, res, next) {
  123. res.send(200);
  124. });
  125. // Server sets up HTTP streaming whether you get or post an ID.
  126. // Retrieve guaranteed random ID.
  127. this._app.get('/id', function(req, res) {
  128. var clientId = util.randomId();
  129. while (!!self._clients[clientId]) {
  130. clientId = util.randomId();
  131. }
  132. self._startStreaming(res, clientId, function() {
  133. // Chrome hacks.
  134. res.write('{"id":"' + clientId + '"}\n');
  135. });
  136. });
  137. this._app.post('/id', function(req, res) {
  138. var id = req.body.id;
  139. self._startStreaming(res, id);
  140. });
  141. this._app.post('/offer', function(req, res) {
  142. self._handleTransmission(req.body, res);
  143. });
  144. this._app.post('/candidate', function(req, res) {
  145. self._handleTransmission(req.body, res);
  146. });
  147. this._app.post('/answer', function(req, res) {
  148. self._handleTransmission(req.body, res);
  149. });
  150. this._app.post('/leave', function(req, res) {
  151. self._handleTransmission(req.body, res);
  152. });
  153. this._app.post('/port', function(req, res) {
  154. self._handleTransmission(req.body, res);
  155. });
  156. };
  157. PeerServer.prototype._removePeer = function(id) {
  158. delete this._clients[id];
  159. if (this._timeouts[id]) {
  160. clearTimeout(this._timeouts[id]);
  161. delete this._timeouts[id];
  162. }
  163. };
  164. /** Saves a streaming response and takes care of timeouts and headers. */
  165. PeerServer.prototype._startStreaming = function(res, id, write) {
  166. res.writeHead(200, {'Content-Type': 'application/octet-stream'});
  167. if (!!write) {
  168. write();
  169. }
  170. var pad = '00';
  171. var iterations = 10;
  172. for (var i = 0; i < iterations; i++) {
  173. pad += pad;
  174. }
  175. res.write(pad + '\n');
  176. // Save res so we can write to it.
  177. if (!this._clients[id]) {
  178. this._clients[id] = res;
  179. // Set timeout to expire.
  180. var self = this;
  181. this._timeouts[id] = setTimeout(function() {
  182. self._removePeer(id);
  183. res.end(JSON.stringify({ type: 'HTTP-END' }));
  184. }, 30000);
  185. this._processOutstandingOffers(id);
  186. } else {
  187. res.end(JSON.stringify({ type: 'HTTP-ERROR' }));
  188. }
  189. };
  190. /** Handles passing on a message. */
  191. PeerServer.prototype._handleTransmission = function(message, res) {
  192. var type = message.type;
  193. var src = message.src;
  194. var dst = message.dst;
  195. var data = JSON.stringify(message);
  196. var destination = this._clients[dst];
  197. if (!!destination) {
  198. try {
  199. if (this._timeouts[dst]) {
  200. data += '\n';
  201. destination.write(data);
  202. } else {
  203. destination.send(data);
  204. }
  205. if (!!res) {
  206. res.send(200);
  207. }
  208. } catch (e) {
  209. // This happens when a peer disconnects without closing connections and
  210. // the associated WebSocket has not closed.
  211. util.prettyError(e);
  212. // Tell other side to stop trying.
  213. this._removePeer(dst);
  214. this._handleTransmission({
  215. type: 'LEAVE',
  216. src: dst,
  217. dst: src
  218. });
  219. if (!!res) res.send(501);
  220. }
  221. } else {
  222. // Wait 5 seconds for this client to connect/reconnect (XHR) for important
  223. // messages.
  224. if (type !== 'LEAVE') {
  225. var self = this;
  226. if (!this._outstandingOffers[dst]) {
  227. this._outstandingOffers[dst] = {};
  228. }
  229. if (!this._outstandingOffers[dst][src]) {
  230. this._outstandingOffers[dst][src] = [];
  231. setTimeout(function() {
  232. if(!!self._outstandingOffers[dst][src]) {
  233. delete self._outstandingOffers[dst][src];
  234. self._handleTransmission({
  235. type: 'EXPIRE',
  236. src: dst,
  237. dst: src
  238. });
  239. }
  240. }, this._options.timeout);
  241. }
  242. this._outstandingOffers[dst][src].push([message]);
  243. if (!!res) res.send(200);
  244. } else if (type === 'LEAVE' && !dst) {
  245. this._removePeer(src);
  246. if (!!res) res.send(200);
  247. } else if (src) {
  248. // Assume a disconnect if the client no longer exists.
  249. // Unless it's a message from the server.
  250. this._handleTransmission({
  251. type: 'EXPIRE',
  252. src: dst,
  253. dst: src
  254. });
  255. // 410: Resource not available.
  256. if (!!res) res.send(410);
  257. }
  258. }
  259. };
  260. PeerServer.prototype._generateClientId = function() {
  261. var clientId = util.randomId();
  262. while (!!this._clients[clientId]) {
  263. clientId = util.randomId();
  264. }
  265. return clientId;
  266. };
  267. exports.PeerServer = PeerServer;