Procházet zdrojové kódy

Allow Concurrent Peer Connections #14

XiZhao před 12 roky
rodič
revize
a64f69ab38
1 změnil soubory, kde provedl 44 přidání a 24 odebrání
  1. 44 24
      lib/server.js

+ 44 - 24
lib/server.js

@@ -16,6 +16,7 @@ function PeerServer(options) {
     key: 'peerjs',
     ip_limit: 5000,
     concurrent_limit: 5000,
+    concurrent_ids: false,
     ssl: {}
   }, options);
 
@@ -73,33 +74,32 @@ PeerServer.prototype._initializeWSS = function() {
       socket.close();
       return;
     }
-    
+
     if (!self._clients[key] || !self._clients[key][id]) {
       self._checkKey(key, ip, function(err) {
         if (!err) {
           if (!self._clients[key][id]) {
-            self._clients[key][id] = { token: token, ip: ip };
-            self._ips[ip]++;
+            self._clients[key][id] = { token: token, instances: [] };
             socket.send(JSON.stringify({ type: 'OPEN' }));
           }
-          self._configureWS(socket, key, id, token);
+          self._configureWS(socket, key, id, ip, token);
         } else {
           socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: err } }));
         }
       });
     } else {
-      self._configureWS(socket, key, id, token);
+      self._configureWS(socket, key, id, ip, token);
     }
   });
 };
 
-PeerServer.prototype._configureWS = function(socket, key, id, token) {
+PeerServer.prototype._configureWS = function(socket, key, id, ip, token) {
   var self = this;
   var client = this._clients[key][id];
 
-  if (token === client.token) {
+  if (token === client.token || this._options.concurrent_ids) {
     // res 'close' event will delete client.res for us
-    client.socket = socket;
+    client.instances.push({ conn: socket, ip: ip });
     // Client already exists
     if (client.res) {
       client.res.end();
@@ -116,9 +116,7 @@ PeerServer.prototype._configureWS = function(socket, key, id, token) {
   // Cleanup after a socket closes.
   socket.on('close', function() {
     util.log('Socket closed:', id);
-    if (client.socket == socket) {
-      self._removePeer(key, id);
-    }
+    self._removePeerInstance(key, id, socket);
   });
 
   // Handle messages from peers.
@@ -130,7 +128,7 @@ PeerServer.prototype._configureWS = function(socket, key, id, token) {
         case 'LEAVE':
           // Clean up if a Peer sends a LEAVE.
           if (!message.dst) {
-            self._removePeer(key, id);
+            self._removePeerInstance(key, id, socket);
             break;
           }
         // ICE candidates
@@ -156,7 +154,6 @@ PeerServer.prototype._configureWS = function(socket, key, id, token) {
   });
 }
 
-
 PeerServer.prototype._checkKey = function(key, ip, cb) {
   if (key == this._options.key) {
     if (!this._clients[key]) {
@@ -207,9 +204,12 @@ PeerServer.prototype._initializeHTTP = function() {
 
     if (!self._clients[key] || !self._clients[key][id]) {
       self._checkKey(key, ip, function(err) {
-        if (!err && !self._clients[key][id]) {
-          self._clients[key][id] = { token: token, ip: ip };
-          self._ips[ip]++;
+        if (!err && (!self._clients[key][id] || this._options.concurrent_ids)) {
+          if (!self._clients[key][id])
+          {
+            self._clients[key][id] = { token: token, instances: [] };
+            self._ips[ip]++;
+          }
           self._startStreaming(res, key, id, token, true);
         } else {
           res.send(JSON.stringify({ type: 'HTTP-ERROR' }));
@@ -287,7 +287,7 @@ PeerServer.prototype._startStreaming = function(res, key, id, token, open) {
     // Client already exists
     res.on('close', function() {
       if (client.res === res) {
-        if (!client.socket) {
+        if (client.instances.length == 0) {
           // No new request yet, peer dead
           self._removePeer(key, id);
           return;
@@ -307,7 +307,7 @@ PeerServer.prototype._pruneOutstanding = function() {
   var keys = Object.keys(this._outstanding);
   for (var k = 0, kk = keys.length; k < kk; k += 1) {
     var key = keys[k];
-    var dsts = Object.keys(this._outstanding[key]);  
+    var dsts = Object.keys(this._outstanding[key]);
     for (var i = 0, ii = dsts.length; i < ii; i += 1) {
       var offers = this._outstanding[key][dsts[i]];
       var seen = {};
@@ -358,13 +358,33 @@ PeerServer.prototype._processOutstanding = function(key, id) {
 
 PeerServer.prototype._removePeer = function(key, id) {
   if (this._clients[key] && this._clients[key][id]) {
-    this._ips[this._clients[key][id].ip]--;
     delete this._clients[key][id];
   }
 };
 
+PeerServer.prototype._removePeerInstance = function(key, id, instance) {
+  if (this._clients[key] && this._clients[key][id]) {
+    for (var i=0; i<this._clients[key][id].instances; i++)
+    {
+      if (this._clients[key][id].instances[i] == instance)
+        this._clients[key][id].instances.splice(i, 1);
+        this._ips[instance.ip]--;
+        break;
+    }
+    if (this._clients[key][id].instances == 0)
+      this._removePeer(key, id);
+  }
+};
+
+PeerServer.prototype._multicast = function(msg, instances)
+{
+  for (var i=0; i<instances.length; i++) {
+    instances[i].conn.send(msg);
+  }
+}
+
 /** Handles passing on a message. */
-PeerServer.prototype._handleTransmission = function(key, message) {
+PeerServer.prototype._handleTransmission = function(key, message, conn) {
   var type = message.type;
   var src = message.src;
   var dst = message.dst;
@@ -377,25 +397,25 @@ PeerServer.prototype._handleTransmission = function(key, message) {
     try {
       util.log(type, 'from', src, 'to', dst);
       if (destination.socket) {
-        destination.socket.send(data);
+        this._multicast(destination.instances, data));
       } else if (destination.res) {
         data += '\n';
         destination.res.write(data);
       } else {
         // Neither socket no res available. Peer dead?
-        throw "Peer dead"
+        throw "Peer dead";
       }
     } catch (e) {
       // This happens when a peer disconnects without closing connections and
       // the associated WebSocket has not closed.
       util.prettyError(e);
       // Tell other side to stop trying.
-      this._removePeer(key, dst);
+      this._removePeerInstance(key, dst, conn);
       this._handleTransmission(key, {
         type: 'LEAVE',
         src: dst,
         dst: src
-      });
+      }, conn);
     }
   } else {
     // Wait for this client to connect/reconnect (XHR) for important