server.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. var util = require('./util');
  2. var express = require('express');
  3. var http = require('http');
  4. var EventEmitter = require('events').EventEmitter;
  5. var WebSocketServer = require('ws').Server;
  6. var url = require('url');
  7. function PeerServer(options) {
  8. if (!(this instanceof PeerServer)) return new PeerServer(options);
  9. EventEmitter.call(this);
  10. this._app = express();
  11. this._httpServer = http.createServer(this._app);
  12. options = util.extend({
  13. port: 80
  14. }, options);
  15. util.debug = options.debug;
  16. // Listen on user-specified port and create WebSocket server as well.
  17. this._httpServer.listen(options.port);
  18. this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
  19. // WebSockets that are opened.
  20. this._clients = {};
  21. this._timeouts = {};
  22. // Initailize WebSocket server handlers.
  23. this._initializeWSS();
  24. // Initialize HTTP routes. This is only used for the first few milliseconds
  25. // before a socket is opened for a Peer.
  26. this._initializeHTTP();
  27. };
  28. util.inherits(PeerServer, EventEmitter);
  29. /* Initialize WebSocket server. */
  30. PeerServer.prototype._initializeWSS = function() {
  31. var self = this;
  32. this._wss.on('connection', function(socket) {
  33. var id = url.parse(socket.upgradeReq.url, true).query.id;
  34. // Save the socket for this id.
  35. if (!!id && (!self._clients[id] || !(self._clients[id] instanceof WebSocket))) {
  36. if (!(self._clients[id] instanceof WebSocket)) {
  37. clearTimeout(self._timeouts[id]);
  38. self._clients[id].end('socket');
  39. }
  40. self._clients[id] = socket;
  41. } else if (!id) {
  42. // TODO: assign id.
  43. };
  44. socket.on('message', function(data) {
  45. var message = JSON.parse(data);
  46. var ice = false;
  47. util.log(message);
  48. // Save the socket.
  49. if (!self._clients[message.src]) {
  50. self._clients[message.src] = socket;
  51. }
  52. switch (message.type) {
  53. // ICE candidates
  54. case 'CANDIDATE':
  55. ice = true;
  56. // Offer or answer between peers.
  57. case 'OFFER':
  58. case 'ANSWER':
  59. // Firefoxism (connectDataConnection ports)
  60. case 'PORT':
  61. self._handleTransmission(message.src, message.dst, data, ice);
  62. // Clean up.
  63. if (message.type === 'LEAVE') {
  64. delete self._clients[message.src];
  65. delete self._requests[message.src];
  66. }
  67. break;
  68. default:
  69. util.prettyError('message unrecognized');
  70. }
  71. });
  72. });
  73. };
  74. /** Initialize HTTP server routes. */
  75. PeerServer.prototype._initializeHTTP = function() {
  76. var self = this;
  77. // Server sets up HTTP streaming whether you get or post an ID.
  78. // Retrieve guaranteed random ID.
  79. this._app.get('/id', function(req, res) {
  80. var clientId = util.randomId();
  81. while (!!self._clients[clientId] || !!self._requests[clientId]) {
  82. clientId = util.randomId();
  83. }
  84. self._startStreaming(res, clientId, function() {
  85. res.write(clientId);
  86. });
  87. });
  88. this._app.post('/id', function(req, res) {
  89. var id = req.body.id;
  90. // Checked in with ID, now waiting for an offer.
  91. self._startStreaming(res, id);
  92. });
  93. this._app.post('/offer', function(req, res) {
  94. // TODO: if offer person does not exist, set a timeout for 10s. may need to
  95. // change switch.
  96. var data = req.body.data;
  97. var src = data.src;
  98. var dst = data.dst;
  99. self._handleTransmission(src, dst, JSON.stringify(data));
  100. // Now expecting ice from same dst.
  101. });
  102. this._app.post('/answer', function(req, res) {
  103. var data = req.body.data;
  104. var src = data.src;
  105. var dst = data.dst;
  106. self._handleTransmission(src, dst, JSON.stringify(data));
  107. // Now expecting ice from same dst.
  108. });
  109. };
  110. /** Saves a streaming response and takes care of timeouts and headers. */
  111. PeerServer.prototype._startStreaming(res, id, write) {
  112. res.writeHead(200, {'Content-Type': 'text/plain'});
  113. if (!!write) {
  114. write();
  115. }
  116. // Save res so we can write to it.
  117. this._clients[id] = res;
  118. // Set timeout to expire.
  119. this._timeouts[id] = setTimeout(function() { res.end('end') }, 10000);
  120. };
  121. // TODO: fix for streaming
  122. /** Handles passing on a message. */
  123. PeerServer.prototype._handleTransmission = function(src, dst, data, ice) {
  124. var destination = this._clients[dst];
  125. if (!destination) {
  126. // For ICE, ports, and answers this should be here.
  127. destination = this._requests[dst, src];
  128. if (!destination) {
  129. // Otherwise it's a new offer.
  130. destination = this._requests[dst, 'offer'];
  131. }
  132. }
  133. if (!!destination) {
  134. if (!ice) {
  135. try {
  136. destination.send(data);
  137. } catch (e) {
  138. util.prettyError(e);
  139. }
  140. } else {
  141. if (!!this._ice[dst, src]) {
  142. // TODO: see if we can save less.
  143. this._ice[dst, src].push(data);
  144. }
  145. }
  146. } else {
  147. // Place in queue for 10 seconds.
  148. }
  149. }
  150. exports.PeerServer = PeerServer;