ericz hace 12 años
padre
commit
d4ccb5d7df
Se han modificado 2 ficheros con 162 adiciones y 147 borrados
  1. 155 147
      lib/server.js
  2. 7 0
      lib/util.js

+ 155 - 147
lib/server.js

@@ -13,8 +13,6 @@ function PeerServer(options) {
 
   this._app = express();
   this._httpServer = http.createServer(this._app);
-  this._app.use(express.bodyParser());
-  this._app.use(this._allowCrossDomain);
 
   this._options = util.extend({
     port: 80,
@@ -24,17 +22,11 @@ function PeerServer(options) {
 
   util.debug = this._options.debug;
 
-  // Listen on user-specified port and create WebSocket server as well.
-  this._httpServer.listen(this._options.port);
-  this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
-
-  // WebSockets that are opened or HTTP responses (which are paired with
-  // something in timeouts.
+  // Connected clients
   this._clients = {};
-  // Timeouts for HTTP responses.
-  this._timeouts = {};
-  // Connections waiting for another peer.
-  this._outstandingOffers = {};
+  
+  // Messages waiting for another peer.
+  this._outstanding = {};
 
   // Initailize WebSocket server handlers.
   this._initializeWSS();
@@ -46,46 +38,53 @@ function PeerServer(options) {
 
 util.inherits(PeerServer, EventEmitter);
 
-/** Handle CORS */
-PeerServer.prototype._allowCrossDomain = function(req, res, next) {
-  res.setHeader('Access-Control-Allow-Origin', '*');
-  res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
-  res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
-
-  next();
-}
 
 /** Initialize WebSocket server. */
 PeerServer.prototype._initializeWSS = function() {
   var self = this;
+  
+  // Create WebSocket server as well.
+  this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
+
+  
   this._wss.on('connection', function(socket) {
-    var id = url.parse(socket.upgradeReq.url, true).query.id;
-    if (!!id && !!self._clients[id]) {
-      // If response client and timeout exist, overwrite and clear.
-      if (!!self._timeouts[id]) {
-        clearTimeout(self._timeouts[id]);
-        delete self._timeouts[id];
-        self._clients[id].end(JSON.stringify({ type: 'HTTP-SOCKET' }));
-      } else {
-        socket.send(JSON.stringify({ type: 'ID-TAKEN', msg: 'ID is taken' }));
-        socket.close();
-        return;
+    var query = url.parse(socket.upgradeReq.url, true).query;
+    var id = query.id || self.generateClientId();
+    var token = query.token;
+   
+    var client = self._clients[id];
+    
+    if (!client) {
+      // New client, save info
+      client = { token: token };
+      self._clients[id] = client;
+      socket.send(JSON.stringify({ type: 'OPEN', id: id }));
+    } 
+    
+    if (token === client.token) {
+      // res 'close' event will delete client.res for us
+      client.socket = socket;
+      // Client already exists
+      if (client.res) {
+        client.res.end();
       }
-    } else if (id === undefined) {
-      id = self._generateClientId();
+    } else {
+      // ID-taken, invalid token
+      socket.send(JSON.stringify({ type: 'ID-TAKEN', payload: { msg: 'ID is taken' } }));
+      socket.close();
+      return;
     }
-    socket.send(JSON.stringify({ type: 'OPEN', id: id }));
 
-    // Save the socket for this id.
-    self._clients[id] = socket;
-
-    self._processOutstandingOffers(id);
+    self._processOutstanding(id);
 
     // Cleanup after a socket closes.
     socket.on('close', function() {
       util.log('Socket closed:', id);
-      self._removePeer(id);
+      if (client.socket == socket) {
+        self._removePeer(id);
+      }
     });
+    
     // Handle messages from peers.
     socket.on('message', function(data) {
       try {
@@ -105,10 +104,14 @@ PeerServer.prototype._initializeWSS = function() {
           case 'OFFER':
           case 'ANSWER':
           // Firefoxism (connectDataConnection ports)
-          case 'PORT':
+          // case 'PORT':
             // Use the ID we know to be correct to prevent spoofing.
-            message.src = id;
-            self._handleTransmission(message);
+            self._handleTransmission({
+              type: message.type,
+              src: id,
+              dst: message.dst,
+              payload: msg.payload
+            });
             break;
           default:
             util.prettyError('Message unrecognized');
@@ -121,26 +124,13 @@ PeerServer.prototype._initializeWSS = function() {
 };
 
 
-/** Process outstanding peer offers. */
-PeerServer.prototype._processOutstandingOffers = function(id) {
-  var offers = this._outstandingOffers[id];
-  if (offers === undefined) {
-    return;
-  }
-  var sources = Object.keys(offers);
-  for (var i = 0, ii = sources.length; i < ii; i += 1) {
-    var messages = offers[sources[i]];
-    for (var j = 0, jj = messages.length; j < jj; j += 1) {
-      this._handleTransmission.apply(this, messages[j]);
-    }
-    delete this._outstandingOffers[id][sources[i]];
-  }
-};
-
 /** Initialize HTTP server routes. */
 PeerServer.prototype._initializeHTTP = function() {
   var self = this;
-
+  
+  this._app.use(express.bodyParser());
+  this._app.use(util.allowCrossDomain);
+  
   this._app.options('/*', function(req, res, next) {
     res.send(200);
   });
@@ -148,82 +138,122 @@ PeerServer.prototype._initializeHTTP = function() {
   // Server sets up HTTP streaming whether you get or post an ID.
   // Retrieve guaranteed random ID.
   this._app.get('/id', function(req, res) {
-    var clientId = util.randomId();
-    while (!!self._clients[clientId]) {
-      clientId = util.randomId();
-    }
-    self._startStreaming(res, clientId, function() {
-      // Chrome hacks.
-      res.write('{"id":"' + clientId + '"}\n');
-    });
+    var id = self.generateClientId();
+    var token = req.query.token;
+    self._startStreaming(res, id, token);
   });
 
   this._app.post('/id', function(req, res) {
-    var id = req.body.id;
-    self._startStreaming(res, id);
-  });
-
-  this._app.post('/offer', function(req, res) {
-    self._handleTransmission(req.body, res);
+    self._startStreaming(res, req.body.id, req.body.token);
   });
 
-  this._app.post('/candidate', function(req, res) {
-    self._handleTransmission(req.body, res);
-  });
+    
+  var handle = function(req, res){
+    var client = self._clients[req.body.id];
+    // Auth the req
+    if (!client || req.body.token !== client.token) {
+      res.send(401);
+      return;
+    } else {
+      self._handleTransmission({
+        type: req.body.type,
+        src: client.id,
+        dst: req.body.dst,
+        payload: req.body.payload
+      });
+      res.send(200);
+    }
+  };
+  
+  this._app.post('/offer', handle);
 
-  this._app.post('/answer', function(req, res) {
-    self._handleTransmission(req.body, res);
-  });
+  this._app.post('/candidate', handle);
 
-  this._app.post('/leave', function(req, res) {
-    self._handleTransmission(req.body, res);
-  });
+  this._app.post('/answer', handle);
 
-  this._app.post('/port', function(req, res) {
-    self._handleTransmission(req.body, res);
-  });
-};
+  this._app.post('/leave', handle);
 
-PeerServer.prototype._removePeer = function(id) {
-  delete this._clients[id];
-  if (this._timeouts[id]) {
-    clearTimeout(this._timeouts[id]);
-    delete this._timeouts[id];
-  }
+  this._app.post('/port', handle);
+  
+  // Listen on user-specified port and 
+  this._httpServer.listen(this._options.port);
+  
 };
 
 /** Saves a streaming response and takes care of timeouts and headers. */
-PeerServer.prototype._startStreaming = function(res, id, write) {
+PeerServer.prototype._startStreaming = function(res, id, token) {
   res.writeHead(200, {'Content-Type': 'application/octet-stream'});
-  if (!!write) {
-    write();
-  }
 
+  var client = this._clients[id];
+  
+  // Save res so we can write to it.
+  if (!client) {
+    // New client, save info
+    client = { token: token };
+    this._clients[id] = client;
+    res.write(JSON.stringify({ type: 'OPEN', id: id }) + '\n');
+  }
+    
   var pad = '00';
-  var iterations = 10;
-  for (var i = 0; i < iterations; i++) {
+  for (var i = 0; i < 10; i++) {
     pad += pad;
   }
   res.write(pad + '\n');
 
-  // Save res so we can write to it.
-  if (!this._clients[id]) {
-    this._clients[id] = res;
-    // Set timeout to expire.
-    var self = this;
-    this._timeouts[id] = setTimeout(function() {
-      self._removePeer(id);
-      res.end(JSON.stringify({ type: 'HTTP-END' }));
-    }, 30000);
-    this._processOutstandingOffers(id);
+  if (token === client.token) {
+    // Client already exists
+    res.on('close', function(){
+      if (client.res === res) {
+        if (!client.socket) {
+          // No new request yet, peer dead
+          self._remotePeer(id);
+          return;
+        }
+        delete client.res;
+      }
+    });
+    client.res = res;
+    this._processOutstanding(id);
   } else {
+    // ID-taken, invalid token
     res.end(JSON.stringify({ type: 'HTTP-ERROR' }));
   }
+};
+
+PeerServer.prototype._pruneOutstanding = function() {
+  var dsts = Object.keys(this._outstanding);
+  for (var i = 0, ii = dsts.length; i < ii; i++) {
+    var offers = this._outstanding[dsts[i]];
+    var seen = {};
+    for (var j = 0, jj = offers.length; j < jj; j++) {
+      var message = offers[j];
+      if (!seen[message.src]) {
+        this._handleTransmission({ type: 'EXPIRE', src: message.dst, dst: message.src });
+        seen[message.src] = true;
+      }
+    }
+  }
+  this._outstanding = [];
+}
 
+/** Process outstanding peer offers. */
+PeerServer.prototype._processOutstanding = function(id) {
+  var offers = this._outstanding[id];
+  if (!offers) {
+    return;
+  }
+  for (var j = 0, jj = offers.length; j < jj; j += 1) {
+    this._handleTransmission(offers[j]);
+  }
+  delete this._outstanding[id];
+};
+
+PeerServer.prototype._removePeer = function(id) {
+  delete this._clients[id];
 };
 
 /** Handles passing on a message. */
-PeerServer.prototype._handleTransmission = function(message, res) {
+PeerServer.prototype._handleTransmission = function(message) {
   var type = message.type;
   var src = message.src;
   var dst = message.dst;
@@ -231,16 +261,17 @@ PeerServer.prototype._handleTransmission = function(message, res) {
 
   var destination = this._clients[dst];
 
-  if (!!destination) {
+  // User is connected!
+  if (destination) {
     try {
-      if (this._timeouts[dst]) {
+      if (destination.socket) {
+        destination.socket.send(data);
+      } else if (destination.res) {
         data += '\n';
-        destination.write(data);
+        destination.res.write(data);
       } else {
-        destination.send(data);
-      }
-      if (!!res) {
-        res.send(200);
+        // Neither socket no res available. Peer dead?
+        throw "Peer dead"
       }
     } catch (e) {
       // This happens when a peer disconnects without closing connections and
@@ -253,44 +284,21 @@ PeerServer.prototype._handleTransmission = function(message, res) {
         src: dst,
         dst: src
       });
-      if (!!res) res.send(501);
     }
   } else {
-    // Wait 5 seconds for this client to connect/reconnect (XHR) for important
+    // Wait for this client to connect/reconnect (XHR) for important
     // messages.
-    if (type !== 'LEAVE') {
+    if (type !== 'LEAVE' && type !== 'EXPIRE' && !!dst) {
       var self = this;
-      if (!this._outstandingOffers[dst]) {
-        this._outstandingOffers[dst] = {};
-      }
-      if (!this._outstandingOffers[dst][src]) {
-        this._outstandingOffers[dst][src] = [];
-        setTimeout(function() {
-          if(!!self._outstandingOffers[dst][src]) {
-            delete self._outstandingOffers[dst][src];
-            self._handleTransmission({
-              type: 'EXPIRE',
-              src: dst,
-              dst: src
-            });
-          }
-        }, this._options.timeout);
+      if (!this._outstanding[dst]) {
+        this._outstanding[dst] = [];
       }
-      this._outstandingOffers[dst][src].push([message]);
-      if (!!res) res.send(200);
+      this._outstanding[dst].push(message);
     } else if (type === 'LEAVE' && !dst) {
       this._removePeer(src);
-      if (!!res) res.send(200);
-    } else if (src) {
-      // Assume a disconnect if the client no longer exists.
-      // Unless it's a message from the server.
-      this._handleTransmission({
-        type: 'EXPIRE',
-        src: dst,
-        dst: src
-      });
-      // 410: Resource not available.
-      if (!!res) res.send(410);
+    } else {
+      // Unavailable destination specified with message LEAVE or EXPIRE
+      // Ignore
     }
   }
 };

+ 7 - 0
lib/util.js

@@ -36,6 +36,13 @@ var util = {
       }
       console.log.apply(console, copy);
     }
+  },
+  _allowCrossDomain: function(req, res, next) {
+    res.setHeader('Access-Control-Allow-Origin', '*');
+    res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
+    res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
+
+    next();
   }
 };