浏览代码

No more self and no more managers :D

Michelle Bu 12 年之前
父节点
当前提交
8102fc75ba
共有 4 个文件被更改,包括 165 次插入160 次删除
  1. 13 10
      lib/dataconnection.js
  2. 23 6
      lib/mediaconnection.js
  3. 119 138
      lib/negotiator.js
  4. 10 6
      lib/peer.js

+ 13 - 10
lib/dataconnection.js

@@ -21,14 +21,17 @@ function DataConnection(peer, provider, options) {
   this.serialization = this.options.serialization;
   this.reliable = this.options.reliable;
 
-  this.id = this.options._id || DataConnection._idPrefix + util.randomToken();
-
-  this._pc = Negotiator.startConnection(
-    this.type,
-    this.peer,
-    this.id,
-    this.provider,
-    this.options._payload || {originator: true}
+  this.id = this.options.connection_id || DataConnection._idPrefix + util.randomToken();
+
+  this.pc = Negotiator.startConnection(
+    this,
+    this.options._payload || {
+      originator: true,
+      multiplex: this.options.multiplex // I don't think multiplex should be a
+                                        // top-level property because it only
+                                        // applies to the originator--otherwise
+                                        // we'd just have an options.pc to use.
+    }
   );
 }
 
@@ -163,11 +166,11 @@ DataConnection.prototype.handleMessage = function(message) {
       // TODO: assert sdp exists.
       // Should we pass `this`?
       // Forward to negotiator
-      Negotiator.handleSDP(this.peer, this.id, payload.sdp, message.type);
+      Negotiator.handleSDP(message.type, this, payload.sdp);
       break;
     case 'CANDIDATE':
       // TODO
-      Negotiator.handleCandidate(this.peer, this.id, payload.candidate);
+      Negotiator.handleCandidate(this, payload.candidate);
       break;
     default:
       util.warn('Unrecognized message type:', message.type, 'from peer:', this.peer);

+ 23 - 6
lib/mediaconnection.js

@@ -15,13 +15,10 @@ function MediaConnection(peer, provider, options) {
   this.metadata = this.options.metadata;
   this.localStream = this.options._stream;
 
-  this.id = this.options._id || MediaConnection._idPrefix + util.randomToken();
+  this.id = this.options.connection_id || MediaConnection._idPrefix + util.randomToken();
   if (this.localStream) {
-    this._pc = Negotiator.startConnection(
-      this.type,
-      this.peer,
-      this.id,
-      this.provider,
+    this.pc = Negotiator.startConnection(
+      this,
       {_stream: this.localStream, originator: true}
     )
   }
@@ -39,6 +36,26 @@ MediaConnection.prototype.addStream = function(remoteStream) {
   this.open = true;
 };
 
+MediaConnection.prototype.handleMessage = function(message) {
+  var payload = message.payload;
+
+  switch (message.type) {
+    case 'ANSWER':
+      // TODO: assert sdp exists.
+      // Should we pass `this`?
+      // Forward to negotiator
+      Negotiator.handleSDP(message.type, this, payload.sdp);
+      break;
+    case 'CANDIDATE':
+      // TODO
+      Negotiator.handleCandidate(this, payload.candidate);
+      break;
+    default:
+      util.warn('Unrecognized message type:', message.type, 'from peer:', this.peer);
+      break;
+  }
+}
+
 MediaConnection.prototype.answer = function(stream) {
   if (this.localStream) {
     // Throw some error.

+ 119 - 138
lib/negotiator.js

@@ -4,75 +4,107 @@
 // TODO: LOCKS.
 // TODO: FIREFOX new PC after offer made for DC.
 var Negotiator = {
-  pcs: {}, // pc id => pc.
-  providers: {}, // provider's id => providers (there may be multiple providers/client.
-  options: {},
+  pcs: {
+    data: {},
+    media: {}
+  }, // type => {peer_id: {pc_id: pc}}.
+  //providers: {}, // provider's id => providers (there may be multiple providers/client.
   queue: [] // connections that are delayed due to a PC being in use.
 }
 
 Negotiator._idPrefix = 'pc_'
 
-Negotiator.startConnection = function(type, peer, connection, provider, options) {
-  Negotiator._addProvider(peer, provider);
+/** Returns a PeerConnection object set up correctly (for data, media). */
+// Options preceeded with _ are ones we add artificially.
+Negotiator.startConnection = function(connection, options) {
+  //Negotiator._addProvider(provider);
+  var pc = Negotiator._getPeerConnection(connection, options);
 
-  var pc;
-  // options.pc is the PC's ID.
-  pc = Negotiator.pcs[options.pc]
-  if (!pc || pc.signalingState !== 'stable') {
-    pc = Negotiator._startPeerConnection(peer, provider);
-  }
-
-  if (type === 'media' && options._stream) {
+  if (connection.type === 'media' && options._stream) {
     // Add the stream.
     pc.addStream(options._stream);
   }
 
   // What do we need to do now?
   if (options.originator) {
-    if (type === 'data') {
+    if (connection.type === 'data') {
       // Create the datachannel.
-      dc = pc.createDataChannel(options.label, {reliable: reliable});
-      connection = provider.getConnection(peer, connection);
+      var dc = pc.createDataChannel(options.label, {reliable: reliable});
       connection.initialize(dc);
     }
 
     if (!util.supports.onnegotiationneeded) {
-      Negotiator._makeOffer(peer, connection, options);
+      Negotiator._makeOffer(connection);
     }
   } else {
-    Negotiator._handleSDP(peer, connection, options);
+    Negotiator.handleSDP('OFFER', connection, options.sdp);
+  }
+
+  return pc;
+}
+
+Negotiator._getPeerConnection = function(connection, options) {
+  if (!Negotiator.pcs[connection.type]) {
+    util.error(connection.type + ' is not a valid connection type. Maybe you overrode the `type` property somewhere.');
+  }
+
+  if (!Negotiator.pcs[connection.type][connection.peer]) {
+    Negotiator.pcs[connection.type][connection.peer] = {};
+  }
+  peer_connections = Negotiator.pcs[connection.type][connection.peer];
+
+  var pc;
+  if (options.multiplex) {
+    // Find an existing PC to use.
+    ids = Object.keys(peer_connections);
+    for (var i = 0, ii = ids.length; i < ii; i += 1) {
+      pc = peer_connections[ids[i]];
+      if (pc.signalingState === 'stable') {
+        break; // We can go ahead and use this PC.
+      }
+    }
+  } else if (options.pc) { // Simplest case: PC id already provided for us.
+    pc = Negotiator.pcs[connection.type][connection.peer][options.pc];
   }
 
+  if (!pc || pc.signalingState !== 'stable') {
+    pc = Negotiator._startPeerConnection(connection);
+  }
   return pc;
 }
 
-Negotiator._addProvider = function(peer, provider) {
+/*
+Negotiator._addProvider = function(provider) {
   if ((!provider.id && !provider.disconnected) || !provider.socket.open) {
     // Wait for provider to obtain an ID.
     provider.on('open', function(id) {
-      Negotiator._addProvider(peer, provider);
+      Negotiator._addProvider(provider);
     });
   } else {
     Negotiator.providers[provider.id] = provider;
   }
-}
+}*/
 
 
 /** Start a PC. */
-Negotiator._startPeerConnection = function(peer, provider) {
+Negotiator._startPeerConnection = function(connection) {
   util.log('Creating RTCPeerConnection.');
 
   var id = Negotiator._idPrefix + util.randomToken();
-  pc = new RTCPeerConnection(provider.options.config, {optional: [{RtpDataChannels: true}]});
-  Negotiator.pcs[id] = pc;
+  pc = new RTCPeerConnection(connection.provider.options.config, {optional: [{RtpDataChannels: true}]});
+  Negotiator.pcs[connection.type][connection.peer][id] = pc;
 
-  Negotiator._startListeners(peer, provider, pc, id);
+  Negotiator._setupListeners(connection, pc, id);
 
   return pc;
 }
 
 /** Set up various WebRTC listeners. */
-Negotiator._setupListeners = function(peer, provider, pc, id) {
+Negotiator._setupListeners = function(connection, pc, pc_id) {
+  var peer_id = connection.peer;
+  var connection_id = connection.id;
+  var provider = connection.provider;
+
   // ICE CANDIDATES.
   util.log('Listening for ICE candidates.');
   pc.onicecandidate = function(evt) {
@@ -82,9 +114,9 @@ Negotiator._setupListeners = function(peer, provider, pc, id) {
         type: 'CANDIDATE',
         payload: {
           candidate: evt.candidate,
-          pc: id  // Send along this PC's ID.
+          connection_id: connection.id
         },
-        dst: peer,
+        dst: peer_id,
       });
     }
   };
@@ -92,11 +124,11 @@ Negotiator._setupListeners = function(peer, provider, pc, id) {
   pc.oniceconnectionstatechange = function() {
     switch (pc.iceConnectionState) {
       case 'failed':
-        util.log('iceConnectionState is disconnected, closing connections to ' + self.peer);
+        util.log('iceConnectionState is disconnected, closing connections to ' + peer_id);
         Negotiator._cleanup();
         break;
       case 'completed':
-        pc.onicecandidate = null;
+        pc.onicecandidate = util.noop;
         break;
     }
   };
@@ -108,7 +140,7 @@ Negotiator._setupListeners = function(peer, provider, pc, id) {
   util.log('Listening for `negotiationneeded`');
   pc.onnegotiationneeded = function() {
     util.log('`negotiationneeded` triggered');
-    Negotiator._makeOffer();
+    Negotiator._makeOffer(connection);
   };
 
   // DATACONNECTION.
@@ -118,7 +150,7 @@ Negotiator._setupListeners = function(peer, provider, pc, id) {
   pc.ondatachannel = function(evt) {
     util.log('Received data channel');
     var dc = evt.channel;
-    connection = provider.getConnection(peer, connection);
+    var connection = provider.getConnection(peer_id, connection_id);
     connection.initialize(dc);
   };
 
@@ -127,173 +159,122 @@ Negotiator._setupListeners = function(peer, provider, pc, id) {
   pc.onaddstream = function(evt) {
     util.log('Received remote stream');
     var stream = evt.stream;
-    provider.getConnection(peer, id).receiveStream(stream);
+    provider.getConnection(peer_id, id).receiveStream(stream);
   };
 }
 
-Negotiator._cleanup = function() {
+Negotiator._cleanup = function(provider, peer_id, connection_id) {
   // TODO
+  util.log('Cleanup PeerConnection for ' + peer_id);
+  if (!!this.pc && (this.pc.readyState !== 'closed' || this.pc.signalingState !== 'closed')) {
+    this.pc.close();
+    this.pc = null;
+  }
+
+  provider.socket.send({
+    type: 'LEAVE',
+    dst: peer_id
+  });
 }
 
-Negotiator._makeOffer = function() {
-  // TODO
+Negotiator._makeOffer = function(connection) {
+  var pc = connection.pc;
+
   pc.createOffer(function(offer) {
     util.log('Created offer.');
-    // Firefox currently does not support multiplexing once an offer is made.
-    self.firefoxSingular = true;
 
-    if (util.browserisms === 'Webkit') {
+    if (!util.supports.reliable) {
       //offer.sdp = Reliable.higherBandwidthSDP(offer.sdp);
     }
 
-    self.pc.setLocalDescription(offer, function() {
+    pc.setLocalDescription(offer, function() {
       util.log('Set localDescription to offer');
-      self._socket.send({
+      connection.provider.socket.send({
         type: 'OFFER',
         payload: {
           sdp: offer,
-          config: self._options.config,
-          labels: self.labels,
-          call: !!self._call
+          connection_id: connection.id
         },
-        dst: self.peer,
-        manager: self._managerId
+        dst: connection.peer,
       });
-      // We can now reset labels because all info has been communicated.
-      self.labels = {};
     }, function(err) {
-      self.emit('error', err);
+      connection.provider.emit('error', err);
       util.log('Failed to setLocalDescription, ', err);
     });
   }, function(err) {
-    self.emit('error', err);
+    connection.provider.emit('error', err);
     util.log('Failed to createOffer, ', err);
   });
 }
 
-Negotiator._makeAnswer = function() {
-  // TODO
-}
+Negotiator._makeAnswer = function(connection) {
+  var pc = connection.pc;
 
-/** Create an answer for PC. */
-ConnectionManager.prototype._makeAnswer = function() {
-  var self = this;
-  this.pc.createAnswer(function(answer) {
+  pc.createAnswer(function(answer) {
     util.log('Created answer.');
 
-    if (util.browserisms === 'Webkit') {
+    if (!util.supports.reliable) {
+      // TODO
       //answer.sdp = Reliable.higherBandwidthSDP(answer.sdp);
     }
 
-    self.pc.setLocalDescription(answer, function() {
+    pc.setLocalDescription(answer, function() {
       util.log('Set localDescription to answer.');
-      self._socket.send({
+      connection.provider.socket.send({
         type: 'ANSWER',
         payload: {
-          sdp: answer
+          sdp: answer,
+          connection_id: connection.id
         },
-        dst: self.peer,
-        manager: self._managerId
+        dst: connection.peer
       });
     }, function(err) {
-      self.emit('error', err);
+      connection.provider.emit('error', err);
       util.log('Failed to setLocalDescription, ', err);
     });
   }, function(err) {
-    self.emit('error', err);
+    connection.provider.emit('error', err);
     util.log('Failed to create answer, ', err);
   });
 }
 
-/** Clean up PC, close related DCs. */
-ConnectionManager.prototype._cleanup = function() {
-  util.log('Cleanup ConnectionManager for ' + this.peer);
-  if (!!this.pc && (this.pc.readyState !== 'closed' || this.pc.signalingState !== 'closed')) {
-    this.pc.close();
-    this.pc = null;
-  }
-
-  var self = this;
-  this._socket.send({
-    type: 'LEAVE',
-    dst: self.peer
-  });
-
-  this.destroyed = true;
-  this.emit('close');
-}
-
 /** Handle an SDP. */
-ConnectionManager.prototype.handleSDP = function(sdp, type, call) {
+Negotiator.handleSDP = function(type, connection, sdp) {
   sdp = new RTCSessionDescription(sdp);
+  var pc = connection.pc;
 
-  var self = this;
-  this.pc.setRemoteDescription(sdp, function() {
+  pc.setRemoteDescription(sdp, function() {
     util.log('Set remoteDescription: ' + type);
+
     if (type === 'OFFER') {
-      if (call && !self._call) {
-        self._call = new MediaConnection(self.peer);
-        self._call.on('answer', function(stream){
-          if (stream) {
-            self.pc.addStream(stream);
-          }
-          self._makeAnswer();
-          util.setZeroTimeout(function(){
-            // Add remote streams
-            self._call.receiveStream(self.pc.getRemoteStreams()[0]);
-          });
+      if (connection.type === 'media') {
+        if (connection.localStream) {
+          // Add local stream (from answer).
+          pc.addStream(connection.localStream);
+        }
+        util.setZeroTimeout(function(){
+          // Add remote streams
+          connection.addStream(pc.getRemoteStreams()[0]);
         });
-        self.emit('call', self._call);
-      } else {
-        self._makeAnswer();
       }
-    } else {
-      // Got answer from remote
-      self._lock = false;
+      // TODO. also, why setZeroTimeout up there?
+      Negotiator._makeAnswer();
     }
   }, function(err) {
-    self.emit('error', err);
+    connection.provider.emit('error', err);
     util.log('Failed to setRemoteDescription, ', err);
   });
 }
 
 /** Handle a candidate. */
-ConnectionManager.prototype.handleCandidate = function(message) {
-  var candidate = new RTCIceCandidate(message.candidate);
-  this.pc.addIceCandidate(candidate);
+Negotiator.handleCandidate = function(connection, candidate) {
+  var candidate = new RTCIceCandidate(candidate);
+  connection.pc.addIceCandidate(candidate);
   util.log('Added ICE candidate.');
 }
 
-/** Updates label:[serialization, reliable, metadata] pairs from offer. */
-ConnectionManager.prototype.handleUpdate = function(updates) {
-  var labels = Object.keys(updates);
-  for (var i = 0, ii = labels.length; i < ii; i += 1) {
-    var label = labels[i];
-    this.labels[label] = updates[label];
-  }
-}
-
 /** Handle peer leaving. */
-ConnectionManager.prototype.handleLeave = function() {
-  util.log('Peer ' + this.peer + ' disconnected.');
-  this.close();
-}
-
-/** Closes manager and all related connections. */
-ConnectionManager.prototype.close = function() {
-  if (this.destroyed) {
-    this.emit('error', new Error('Connections to ' + this.peer + 'are already closed.'));
-    return;
-  }
-
-  var labels = Object.keys(this.connections);
-  for (var i = 0, ii = labels.length; i < ii; i += 1) {
-    var label = labels[i];
-    var connection = this.connections[label];
-    connection.close();
-  }
-
-  // TODO: close the call.
-  this.connections = null;
-  this._cleanup();
+Negotiator.handleLeave = function(connection) {
+  util.log('Peer ' + connection.peer + ' disconnected.');
+  // TODO: clean up PC if this is the last connection on that PC.
 }

+ 10 - 6
lib/peer.js

@@ -169,17 +169,17 @@ Peer.prototype._handleMessage = function(message) {
       this.emit('error', new Error('Could not connect to peer ' + peer));
       break;
     case 'OFFER': // we should consider switching this to CALL/CONNECT, but this is the least breaking option.
-      var connectionId = payload.connectionId;
-      var connection = this.getConnection(peer, connectionId);
+      var connection_id = payload.connection_id;
+      var connection = this.getConnection(peer, connection_id);
 
       if (connection) {
-        util.warn('Offer received for existing Connection ID:', connectionId);
+        util.warn('Offer received for existing Connection ID:', connection_id);
         //connection.handleMessage(message);
       } else {
         // Create a new connection.
         if (payload.type === 'call') {
           var call = new MediaConnection(peer, this, {
-            _id: connectionId,
+            connection_id: connection_id,
             _payload: payload, // A regular *Connection would have no payload.
             metadata: payload.metadata,
           });
@@ -187,7 +187,7 @@ Peer.prototype._handleMessage = function(message) {
           this.emit('call', call);
         } else if (payload.type === 'connect') {
           var connection = new DataConnection(peer, this, {
-            _id: connectionId,
+            connection_id: connection_id,
             _payload: payload,
             metadata: payload.metadata,
             label: payload.label,
@@ -202,7 +202,11 @@ Peer.prototype._handleMessage = function(message) {
       }
       break;
     default:
-      var id = message.id;
+      if (!payload) {
+        util.warn('You received a malformed message from ' + peer);
+      }
+
+      var id = payload.connection_id;
       var connection = this.getConnection(peer, id);
 
       if (connection) {