server.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. var util = require('./util');
  2. var express = require('express');
  3. var http = require('http');
  4. var EventEmitter = require('events').EventEmitter;
  5. var WebSocketServer = require('ws').Server;
  6. var url = require('url');
  7. function PeerServer(options) {
  8. if (!(this instanceof PeerServer)) return new PeerServer(options);
  9. EventEmitter.call(this);
  10. this._app = express();
  11. this._httpServer = http.createServer(this._app);
  12. this._app.use(express.bodyParser());
  13. this._app.use(this._allowCrossDomain);
  14. options = util.extend({
  15. port: 80
  16. }, options);
  17. util.debug = options.debug;
  18. // Listen on user-specified port and create WebSocket server as well.
  19. this._httpServer.listen(options.port);
  20. this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
  21. // WebSockets that are opened or HTTP responses (which are paired with
  22. // something in timeouts.
  23. this._clients = {};
  24. // Timeouts for HTTP responses.
  25. this._timeouts = {};
  26. // Connections waiting for another peer.
  27. this._outstandingOffers = {};
  28. // Initailize WebSocket server handlers.
  29. this._initializeWSS();
  30. // Initialize HTTP routes. This is only used for the first few milliseconds
  31. // before a socket is opened for a Peer.
  32. this._initializeHTTP();
  33. };
  34. util.inherits(PeerServer, EventEmitter);
  35. /** Handle CORS */
  36. PeerServer.prototype._allowCrossDomain = function(req, res, next) {
  37. res.setHeader('Access-Control-Allow-Origin', '*');
  38. res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
  39. res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
  40. next();
  41. }
  42. /** Initialize WebSocket server. */
  43. PeerServer.prototype._initializeWSS = function() {
  44. var self = this;
  45. this._wss.on('connection', function(socket) {
  46. var id = url.parse(socket.upgradeReq.url, true).query.id;
  47. if (!!id && !!self._clients[id]) {
  48. // If response client and timeout exist, overwrite and clear.
  49. if (!!self._timeouts[id]) {
  50. clearTimeout(self._timeouts[id]);
  51. delete self._timeouts[id];
  52. self._clients[id].end('socket');
  53. } else {
  54. socket.send(JSON.stringify({ type: 'ERROR', msg: 'ID is taken' }));
  55. return;
  56. }
  57. } else if (!id) {
  58. id = self._generateClientId();
  59. socket.send(JSON.stringify({ type: 'ID', id: id }));
  60. }
  61. // Save the socket for this id.
  62. self._clients[id] = socket;
  63. self._processOutstandingOffers(id);
  64. socket.on('message', function(data) {
  65. try {
  66. var message = JSON.parse(data);
  67. util.log(message);
  68. switch (message.type) {
  69. case 'LEAVE':
  70. // Clean up if a Peer sends a LEAVE.
  71. if (!message.dst) {
  72. delete self._clients[message.src];
  73. delete self._timeouts[message.src];
  74. break;
  75. }
  76. // ICE candidates
  77. case 'CANDIDATE':
  78. // Offer or answer between peers.
  79. case 'OFFER':
  80. case 'ANSWER':
  81. // Firefoxism (connectDataConnection ports)
  82. case 'PORT':
  83. self._handleTransmission(message.type, message.src, message.dst, data);
  84. break;
  85. default:
  86. util.prettyError('message unrecognized');
  87. }
  88. } catch(e) {
  89. util.log('invalid message');
  90. }
  91. });
  92. });
  93. };
  94. /** Process outstanding peer offers. */
  95. PeerServer.prototype._processOutstandingOffers = function(id) {
  96. var offers = this._outstandingOffers[id];
  97. for (var source in offers) {
  98. if (offers.hasOwnProperty(source)) {
  99. var messages = offers[source]
  100. for (var i = 0; i < messages.length; i += 1)
  101. this._handleTransmission.apply(this, messages[i]);
  102. delete this._outstandingOffers[id][source];
  103. }
  104. }
  105. };
  106. /** Initialize HTTP server routes. */
  107. PeerServer.prototype._initializeHTTP = function() {
  108. var self = this;
  109. this._app.options('/*', function(req, res, next) {
  110. res.send(200);
  111. });
  112. // Server sets up HTTP streaming whether you get or post an ID.
  113. // Retrieve guaranteed random ID.
  114. this._app.get('/id', function(req, res) {
  115. var clientId = util.randomId();
  116. while (!!self._clients[clientId]) {
  117. clientId = util.randomId();
  118. }
  119. self._startStreaming(res, clientId, function() {
  120. // Chrome hacks.
  121. res.write('{"id":"' + clientId + '"}\n');
  122. });
  123. });
  124. this._app.post('/id', function(req, res) {
  125. var id = req.body.id;
  126. self._startStreaming(res, id);
  127. });
  128. this._app.post('/offer', function(req, res) {
  129. var src = req.body.src;
  130. var dst = req.body.dst;
  131. self._handleTransmission('OFFER', src, dst, JSON.stringify(req.body), res);
  132. });
  133. this._app.post('/ice', function(req, res) {
  134. var src = req.body.src;
  135. var dst = req.body.dst;
  136. self._handleTransmission('ICE', src, dst, JSON.stringify(req.body), res);
  137. });
  138. this._app.post('/answer', function(req, res) {
  139. var src = req.body.src;
  140. var dst = req.body.dst;
  141. self._handleTransmission('ANSWER', src, dst, JSON.stringify(req.body), res);
  142. });
  143. this._app.post('/leave', function(req, res) {
  144. var src = req.body.src;
  145. var dst = req.body.dst;
  146. self._handleTransmission('LEAVE', src, dst, JSON.stringify(req.body), res);
  147. });
  148. this._app.post('/port', function(req, res) {
  149. var src = req.body.src;
  150. var dst = req.body.dst;
  151. self._handleTransmission('PORT', src, dst, JSON.stringify(req.body), res);
  152. });
  153. };
  154. /** Saves a streaming response and takes care of timeouts and headers. */
  155. PeerServer.prototype._startStreaming = function(res, id, write) {
  156. res.writeHead(200, {'Content-Type': 'application/octet-stream'});
  157. if (!!write) {
  158. write();
  159. }
  160. var pad = '00';
  161. var iterations = 10;
  162. for (var i = 0; i < iterations; i++) {
  163. pad += pad;
  164. }
  165. res.write(pad + '\n');
  166. // Save res so we can write to it.
  167. if (!this._clients[id]) {
  168. this._clients[id] = res;
  169. // Set timeout to expire.
  170. this._timeouts[id] = setTimeout(function() { res.end('end') }, 10000);
  171. } else {
  172. res.write(JSON.stringify({ type: 'ERROR', msg: 'ID is taken' }) + '\n');
  173. res.end('error');
  174. }
  175. };
  176. /** Handles passing on a message. */
  177. PeerServer.prototype._handleTransmission = function(type, src, dst, data, res) {
  178. var destination = this._clients[dst];
  179. if (!!destination) {
  180. try {
  181. if (this._timeouts[dst]) {
  182. data += '\n';
  183. }
  184. // We have to let the source peer know that the offer was sent
  185. // successfully so that ice can start being processed.
  186. if (type === 'OFFER') {
  187. if (!!res) {
  188. res.send(200);
  189. } else if (!this._timeouts[src] && !!this._clients[src]) {
  190. this._clients[src].send(JSON.stringify({ type: 'PEER_READY', src: dst, dst: src }));
  191. }
  192. }
  193. destination.send(data);
  194. } catch (e) {
  195. util.prettyError(e);
  196. // This really shouldn't happen given correct client browsers.
  197. // 501: Server does not support this functionality.
  198. if (!!res) res.send(501);
  199. }
  200. } else {
  201. if (type === 'OFFER' && (!this._outstandingOffers[dst] || !this._outstandingOffers[dst][src])) {
  202. // Wait 5 seconds for this client to connect.
  203. var self = this;
  204. if (!this._outstandingOffers[dst])
  205. this._outstandingOffers[dst] = {};
  206. this._outstandingOffers[dst][src] = [];
  207. this._outstandingOffers[dst][src].push(Array.prototype.slice.apply(arguments));
  208. setTimeout(function() {
  209. delete self._outstandingOffers[dst][src]
  210. }, 5000);
  211. } else if (type === 'ICE' && !!this._outstandingOffers[dst][src]) {
  212. this._outstandingOffers[dst][src].push(Array.prototype.slice.apply(arguments));
  213. } else {
  214. // Assume a disconnect if the client no longer exists.
  215. util.log('destination does not exist');
  216. this._handleTransmission('LEAVE', dst, src, JSON.stringify({ type: 'LEAVE', dst: src, src: dst }));
  217. // 410: Resource not available.
  218. if (!!res) res.send(410);
  219. }
  220. }
  221. };
  222. PeerServer.prototype._generateClientId = function() {
  223. var clientId = util.randomId();
  224. while (!!self._clients[clientId]) {
  225. clientId = util.randomId();
  226. }
  227. };
  228. exports.PeerServer = PeerServer;