Pārlūkot izejas kodu

added delayed peer handling, TODO: free up client ids safely

Michelle Bu 12 gadi atpakaļ
vecāks
revīzija
7498c00c95
1 mainītis faili ar 88 papildinājumiem un 21 dzēšanām
  1. 88 21
      lib/server.js

+ 88 - 21
lib/server.js

@@ -34,9 +34,13 @@ function PeerServer(options) {
   this._httpServer.listen(options.port);
   this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
 
-  // WebSockets that are opened.
+  // WebSockets that are opened or HTTP responses (which are paired with
+  // something in timeouts.
   this._clients = {};
+  // Timeouts for HTTP responses.
   this._timeouts = {};
+  // Connections waiting for another peer.
+  this._outstandingOffers = {};
 
   // Initailize WebSocket server handlers.
   this._initializeWSS();
@@ -53,26 +57,33 @@ PeerServer.prototype._initializeWSS = function() {
   var self = this;
   this._wss.on('connection', function(socket) {
     var id = url.parse(socket.upgradeReq.url, true).query.id;
-
-    // Save the socket for this id.
-    if (!!id && (!self._clients[id] || !!self._timeouts[id])) {
+    if (!!id && !!self._clients[id]) {
+      // If response client and timeout exist, overwrite and clear.
       if (!!self._timeouts[id]) {
         clearTimeout(self._timeouts[id]);
         delete self._timeouts[id];
         self._clients[id].end('socket');
+      } else {
+        socket.send(JSON.stringify({ type: 'ERROR', msg: 'ID is taken' }));
+        return;
       }
     } else if (!id) {
       id = self._generateClientId();
       socket.send(JSON.stringify({ type: 'ID', id: id }));
     }
+
+    // Save the socket for this id.
     self._clients[id] = socket;
 
+    self._processOutstandingOffers(id);
+
     socket.on('message', function(data) {
       try {
         var message = JSON.parse(data);
         util.log(message);
 
         switch (message.type) {
+          case 'LEAVE':
           // ICE candidates
           case 'CANDIDATE':
           // Offer or answer between peers.
@@ -98,6 +109,21 @@ PeerServer.prototype._initializeWSS = function() {
   });
 };
 
+
+/** Process outstanding peer offers. */
+PeerServer.prototype._processOutstandingOffers = function(id) {
+  var offers = this._outstandingOffers[id];
+  for (var source in offers) {
+    if (offers.hasOwnProperty(source)) {
+      var messages = offers[source]
+      for (var i = 0; i < messages.length; i += 1) 
+        this._handleTransmission.apply(this, messages[i]);
+
+      delete this._outstandingOffers[id][source];
+    }
+  }
+};
+
 /** Initialize HTTP server routes. */
 PeerServer.prototype._initializeHTTP = function() {
   var self = this;
@@ -125,26 +151,33 @@ PeerServer.prototype._initializeHTTP = function() {
   });
 
   this._app.post('/offer', function(req, res) {
-    // TODO: if offer person does not exist, set a timeout for 10s. may need to
-    // change switch.
     var src = req.body.src;
     var dst = req.body.dst;
-    self._handleTransmission('OFFER', src, dst, JSON.stringify(req.body));
-    res.send('success');
+    self._handleTransmission('OFFER', src, dst, JSON.stringify(req.body), res);
   });
 
   this._app.post('/ice', function(req, res) {
     var src = req.body.src;
     var dst = req.body.dst;
-    self._handleTransmission('ICE', src, dst, JSON.stringify(req.body));
-    res.send('success');
+    self._handleTransmission('ICE', src, dst, JSON.stringify(req.body), res);
   });
 
   this._app.post('/answer', function(req, res) {
     var src = req.body.src;
     var dst = req.body.dst;
-    self._handleTransmission('ANSWER', src, dst, JSON.stringify(req.body));
-    res.send('success');
+    self._handleTransmission('ANSWER', src, dst, JSON.stringify(req.body), res);
+  });
+
+  this._app.post('/leave', function(req, res) {
+    var src = req.body.src;
+    var dst = req.body.dst;
+    self._handleTransmission('LEAVE', src, dst, JSON.stringify(req.body), res);
+  });
+
+  this._app.post('/port', function(req, res) {
+    var src = req.body.src;
+    var dst = req.body.dst;
+    self._handleTransmission('PORT', src, dst, JSON.stringify(req.body), res);
   });
 };
 
@@ -163,29 +196,63 @@ PeerServer.prototype._startStreaming = function(res, id, write) {
   res.write(pad + '\n');
 
   // Save res so we can write to it.
-  this._clients[id] = res;
+  if (!this._clients[id]) {
+    this._clients[id] = res;
+    // Set timeout to expire.
+    this._timeouts[id] = setTimeout(function() { res.end('end') }, 10000);
+  } else {
+    res.write(JSON.stringify({ type: 'ERROR', msg: 'ID is taken' }) + '\n');
+    res.end('error');
+  }
 
-  // Set timeout to expire.
-  this._timeouts[id] = setTimeout(function() { res.end('end') }, 10000);
 };
 
-// TODO: fix for streaming
 /** Handles passing on a message. */
-PeerServer.prototype._handleTransmission = function(type, src, dst, data) {
+PeerServer.prototype._handleTransmission = function(type, src, dst, data, res) {
   var destination = this._clients[dst];
 
   if (!!destination) {
     try {
-      if (this._timeouts[dst])
+      if (this._timeouts[dst]) {
         data += '\n';
-
+      }
+      // We have to let the source peer know that the offer was sent
+      // successfully so that ice can start being processed.
+      if (type === 'OFFER') {
+        if (!!res) {
+          res.send(200);
+        } else if (!this._timeouts[src] && !!this._clients[src]) {
+          this._clients[src].send(JSON.stringify({ type: 'PEER_READY', src: dst, dst: src }));
+        }
+      }
       destination.send(data);
     } catch (e) {
       util.prettyError(e);
+      // This really shouldn't happen given correct client browsers.
+      // 501: Server does not support this functionality.
+      if (!!res) res.send(501);
     }
   } else {
-    // TODO: IF OFFER: Place in queue for 10 seconds.
-    util.log('TODO/handle: destination does not exist');
+    if (type === 'OFFER' && (!this._outstandingOffers[dst] || !this._outstandingOffers[dst][src])) {
+      // Wait 5 seconds for this client to connect.
+      var self = this;
+      if (!this._outstandingOffers[dst])
+        this._outstandingOffers[dst] = {};
+      this._outstandingOffers[dst][src] = [];
+      this._outstandingOffers[dst][src].push(Array.prototype.slice.apply(arguments));
+
+      setTimeout(function() {
+        delete self._outstandingOffers[dst][src]
+      }, 5000);
+    } else if (type === 'ICE' && !!this._outstandingOffers[dst][src]) {
+      this._outstandingOffers[dst][src].push(Array.prototype.slice.apply(arguments));
+    } else {
+      // Assume a disconnect if the client no longer exists.
+      util.log('destination does not exist');
+      this._handleTransmission('LEAVE', dst, src, JSON.stringify({ type: 'LEAVE', dst: src, src: dst }));
+      // 410: Resource not available.
+      if (!!res) res.send(410);
+    }
   }
 };