Pārlūkot izejas kodu

reliable, and build

Michelle Bu 12 gadi atpakaļ
vecāks
revīzija
3b21f1c4ed
7 mainītis faili ar 372 papildinājumiem un 19 dzēšanām
  1. 3 0
      .gitmodules
  2. 1 0
      bin/build.js
  3. 1 0
      deps/reliable
  4. 339 14
      dist/peer.js
  5. 0 0
      dist/peer.min.js
  6. 27 5
      lib/connection.js
  7. 1 0
      lib/peer.js

+ 3 - 0
.gitmodules

@@ -4,3 +4,6 @@
 [submodule "deps/js-binarypack"]
 	path = deps/js-binarypack
 	url = git://github.com/peers/js-binarypack.git
+[submodule "deps/reliable"]
+	path = deps/reliable
+	url = git@github.com:michellebu/reliable.git

+ 1 - 0
bin/build.js

@@ -42,6 +42,7 @@ var base = [
   , '../deps/js-binarypack/lib/binarypack.js'
   , '../deps/EventEmitter/EventEmitter.js'
   , 'util.js'
+  , '../deps/reliable/lib/reliable.js'
   , 'adapter.js' 
   , 'peer.js'
   , 'connection.js'

+ 1 - 0
deps/reliable

@@ -0,0 +1 @@
+Subproject commit c20f141e461045e33e31f84eb49d3d1f331d535a

+ 339 - 14
dist/peer.js

@@ -63,8 +63,8 @@ exports.BinaryPack = {
     var unpacker = new Unpacker(data);
     return unpacker.unpack();
   },
-  pack: function(data){
-    var packer = new Packer();
+  pack: function(data, utf8){
+    var packer = new Packer(utf8);
     var buffer = packer.pack(data);
     return buffer;
   }
@@ -308,7 +308,8 @@ Unpacker.prototype.read = function(length){
   }
 }
   
-function Packer (){
+function Packer(utf8){
+  this.utf8 = utf8;
   this.bufferBuilder = new BufferBuilder();
 }
 
@@ -386,7 +387,13 @@ Packer.prototype.pack_bin = function(blob){
 }
 
 Packer.prototype.pack_string = function(str){
-  var length = str.length;
+  var length;
+  if (this.utf8) {
+    var blob = new Blob([str]);
+    length = blob.size;
+  } else {
+    length = str.length;
+  }
   if (length <= 0x0f){
     this.pack_uint8(0xb0 + length);
   } else if (length <= 0xffff){
@@ -589,7 +596,7 @@ EventEmitter.prototype.addListener = function(type, listener, scope, once) {
     // Adding the second element, need to change to array.
     this._events[type] = [this._events[type], listener];
   }
-  
+  return this;
 };
 
 EventEmitter.prototype.on = EventEmitter.prototype.addListener;
@@ -807,6 +814,301 @@ var util = {
     return Math.random().toString(36).substr(2);
   }
 };
+/**
+ * Reliable transfer for Chrome Canary DataChannel impl.
+ * Author: @michellebu
+ */
+function Reliable(dc, debug) {
+  if (!(this instanceof Reliable)) return new Reliable(dc);
+  this._dc = dc;
+
+  util.debug = debug;
+
+  // Messages sent/received so far.
+  // id: { ack: n, chunks: [...] }
+  this._outgoing = {};
+  // id: { ack: ['ack', id, n], chunks: [...] }
+  this._incoming = {};
+  this._received = {};
+
+  // Window size.
+  this._window = 20;
+  // MTU.
+  this._mtu = 500;
+  // Interval for setInterval. In ms.
+  this._interval = 30;
+  /**
+   * TIMES for 5KB:
+   * Time - wi | mtu | interval
+   * 4441 - 20 | 500 | 50
+   * 4317 - 100| 600 | 20
+   * 4158 - 50 | 600 | 100
+   * 4151 - 20 | 500 | 30
+   * 4086 - 20 | 600 | 20
+   * 4080 - 50 | 600 | 20
+   * 4049 - 50 | 600 | 10
+   * 3561 - 50 | 600 | 50
+   * 2198 - 20 | 800 | 50 - Breaks for bigger files.
+   *              800 MTU seems to throttle even at 200ms for larger files
+   *
+   * TIMES for 23KB:
+   * 38421 - 20 | 600 | 50
+   * 29904 - 20 | 500 | 30
+   * 37906 - 50 | 500 | 30
+   */
+
+  // Messages sent.
+  this._count = 0;
+
+  // Outgoing message queue.
+  this._queue = [];
+
+  this._setupDC();
+};
+
+// Send a message reliably.
+Reliable.prototype.send = function(msg) {
+  // Determine if chunking is necessary.
+  var bl = util.pack(msg);
+  if (bl.size < this._mtu) {
+    this._handleSend(['no', bl]);
+    return;
+  }
+
+  this._outgoing[this._count] = {
+    ack: 0,
+    chunks: this._chunk(bl)
+  };
+
+  if (util.debug) {
+    this._outgoing[this._count].timer = new Date();
+  }
+
+  // Send prelim window.
+  this._sendWindowedChunks(this._count);
+  this._count += 1;
+};
+
+// Overwritten, typically.
+Reliable.prototype.onmessage = function(msg) {};
+
+// Set up interval for processing queue.
+Reliable.prototype._setupInterval = function() {
+  var self = this;
+  this._timeout = setInterval(function() {
+    // FIXME: String stuff makes things terribly async.
+    var msg = self._queue.shift();
+    util.log('Sending...', msg);
+    msg = util.pack(msg);
+    util.blobToBinaryString(msg, function(str) {
+      self._dc.send(str);
+      if (self._queue.length === 0) {
+        clearTimeout(self._timeout);
+        self._timeout = null;
+        self._processAcks();
+      }
+    });
+  }, this._interval);
+};
+
+// Go through ACKs to send missing pieces.
+Reliable.prototype._processAcks = function() {
+  for (var id in this._outgoing) {
+    if (this._outgoing.hasOwnProperty(id)) {
+      this._sendWindowedChunks(id);
+    }
+  }
+};
+
+// Handle sending a message.
+// FIXME: Don't wait for interval time for all messages...
+Reliable.prototype._handleSend = function(msg) {
+  if (this._queue.indexOf(msg) === -1) {
+    this._queue.push(msg);
+    if (!this._timeout) {
+      this._setupInterval();
+    }
+  }
+};
+
+// Set up DataChannel handlers.
+Reliable.prototype._setupDC = function() {
+  // Handle various message types.
+  var self = this;
+  this._dc.onmessage = function(e) {
+    var msg = e.data;
+    var datatype = msg.constructor;
+    // FIXME: msg is String until binary is supported.
+    // Once that happens, this will have to be smarter.
+    if (datatype === String) {
+      var ab = util.binaryStringToArrayBuffer(msg);
+      msg = util.unpack(ab);
+      self._handleMessage(msg);
+    }
+  };
+};
+
+// Handles an incoming message.
+Reliable.prototype._handleMessage = function(msg) {
+  util.log('handleMessage: ', msg);
+  var id = msg[1];
+  var idata = this._incoming[id];
+  var odata = this._outgoing[id];
+  var data;
+  switch (msg[0]) {
+    // No chunking was done.
+    case 'no':
+      var message = id;
+      if (!!message) {
+        // TODO: What fancy timeout stuff to do with ACK?
+        this.onmessage(util.unpack(message));
+      }
+      break;
+    // Reached the end of the message.
+    case 'end':
+      // What if 'end' is sent out of order? Eventually we will ACK for
+      // and receive it, so shouldn't be a problem.
+      data = idata;
+      if (!!data && data.ack[2] === msg[2]) {
+        this._complete(id);
+        this._received[id] = true;
+      } else if (!!data) {
+        this._ack(id, data.ack);
+      }
+      break;
+    case 'ack':
+      data = odata;
+      if (!!data) {
+        // TODO: more optimization for window size & data sent.
+        // TODO: possibly process ACKs only after queue cleared to avoid dups?
+        var ack = msg[2];
+        // Take the larger ACK, for out of order messages.
+        data.ack = Math.max(ack, data.ack);
+
+        // Clean up when all chunks are ACKed.
+        if (data.ack >= data.chunks.length) {
+          util.log('Time: ', new Date() - data.timer);
+          delete this._outgoing[id];
+        }
+      }
+      // If !data, just ignore.
+      break;
+    // Received a chunk of data.
+    case 'chunk':
+      // Create a new entry if none exists.
+      data = idata;
+      if (!data) {
+        if (this._received[id] !== undefined) {
+          break;
+        }
+        data = {
+          ack: ['ack', id, 0],
+          chunks: []
+        };
+        this._incoming[id] = data;
+      }
+
+      var n = msg[2];
+      var chunk = msg[3];
+      data.chunks[n] = chunk;
+
+      // If we get the chunk we're looking for, ACK for next missing.
+      // Otherwise, ACK the same N again.
+      if (n === data.ack[2]) {
+        this._calculateNextAck(id);
+      }
+      this._ack(id);
+      break;
+    default:
+      // Shouldn't happen, but would make sense for message to just go
+      // through as is.
+      this._handleSend(msg);
+      break;
+  }
+};
+
+// Chunks BL into smaller messages.
+Reliable.prototype._chunk = function(bl) {
+  var chunks = [];
+  var size = bl.size;
+  var start = 0;
+  while (start < size) {
+    var end = Math.min(size, start + this._mtu);
+    var b = bl.slice(start, end);
+    var chunk = {
+      payload: b
+    }
+    chunks.push(chunk);
+    start = end;
+  }
+  return chunks;
+};
+
+// Sends ACK N, expecting Nth blob chunk for message ID.
+Reliable.prototype._ack = function(id, n) {
+  var ack = this._incoming[id].ack;
+  this._handleSend(ack);
+};
+
+// Sends END.
+Reliable.prototype._end = function(id, n) {
+  this._handleSend(['end', id, n]);
+};
+
+// Calculates the next ACK number, given chunks.
+Reliable.prototype._calculateNextAck = function(id) {
+  var data = this._incoming[id];
+  var chunks = data.chunks;
+  for (var i = 0, ii = chunks.length; i < ii; i += 1) {
+    // This chunk is missing!!! Better ACK for it.
+    if (chunks[i] === undefined) {
+      data.ack[2] = i;
+      return;
+    }
+  }
+  data.ack[2] = chunks.length;
+};
+
+// Sends the next window of chunks.
+Reliable.prototype._sendWindowedChunks = function(id) {
+  util.log('sendWindowedChunks for: ', id);
+  var data = this._outgoing[id];
+  var ch = data.chunks;
+  var limit = Math.min(data.ack + this._window, ch.length);
+  var timeout = 0;
+  for (var i = data.ack; i < limit; i += 1) {
+    if (!ch[i].sent || i === data.ack) {
+      ch[i].sent = true;
+      // TODO: set timer.
+      this._sendChunk(id, i, ch[i].payload);
+    }
+  }
+  if (data.ack + this._window >= ch.length) {
+    this._end(id, ch.length);
+  }
+  // TODO: set retry timer.
+};
+
+// Sends one chunk.
+Reliable.prototype._sendChunk = function(id, n, payload) {
+  util.log('sendChunk', payload);
+  this._handleSend(['chunk', id, n, payload]);
+};
+
+// Puts together a message from chunks.
+Reliable.prototype._complete = function(id) {
+  util.log('complete', id);
+  // FIXME: handle errors.
+  var self = this;
+  var chunks = this._incoming[id].chunks;
+  var bl = new Blob(chunks);
+  util.blobToArrayBuffer(bl, function(ab) {
+    self.onmessage(util.unpack(ab));
+  });
+  delete this._incoming[id];
+};
+
+exports.Reliable = Reliable;
 var RTCPeerConnection = null;
 var getUserMedia = null;
 var attachMediaStream = null;
@@ -940,6 +1242,7 @@ Peer.prototype._handleServerJSONMessage = function(message) {
         metadata: payload.metadata,
         serialization: payload.serialization,
         sdp: payload.sdp,
+        reliable: payload.reliable,
         config: this._options.config
       };
       var connection = new DataConnection(this.id, peer, this._socket, options);
@@ -1147,13 +1450,22 @@ DataConnection.prototype._setupDataChannel = function() {
   var self = this;
   if (this._originator) {
     util.log('Creating data channel');
-    this._dc = this._pc.createDataChannel(this.peer, { reliable: this._options.reliable });
+    // FOR NOW: reliable DC is not supported.
+    this._dc = this._pc.createDataChannel(this.peer, { reliable: false });
+    // Experimental reliable wrapper.
+    if (this._options.reliable) {
+      this._reliable = new Reliable(this._dc);
+    }
     this._configureDataChannel();
   } else {
     util.log('Listening for data channel');
     this._pc.ondatachannel = function(evt) {
       util.log('Received data channel');
       self._dc = evt.channel;
+      // Experimental reliable wrapper.
+      if (self._options.reliable) {
+        self._reliable = new Reliable(self._dc);
+      }
       self._configureDataChannel();
     };
   }
@@ -1214,9 +1526,15 @@ DataConnection.prototype._configureDataChannel = function() {
     self.open = true;
     self.emit('open');
   };
-  this._dc.onmessage = function(e) {
-    self._handleDataMessage(e);
-  };
+  if (this._reliable) {
+    this._reliable.onmessage = function(msg) {
+      self.emit('data', msg);
+    };
+  } else {
+    this._dc.onmessage = function(e) {
+      self._handleDataMessage(e);
+    };
+  }
   this._dc.onclose = function(e) {
     self.emit('close');
   };
@@ -1245,7 +1563,8 @@ DataConnection.prototype._makeOffer = function() {
         payload: {
           sdp: offer,
           serialization: self.serialization,
-          metadata: self.metadata
+          metadata: self.metadata,
+          reliable: self._options.reliable
         },
         dst: self.peer
       });
@@ -1301,15 +1620,15 @@ DataConnection.prototype._handleDataMessage = function(e) {
   if (this.serialization === 'binary' || this.serialization === 'binary-utf8') {
     if (datatype === Blob) {
       util.blobToArrayBuffer(data, function(ab) {
-        data = BinaryPack.unpack(ab);
+        data = util.unpack(ab);
         self.emit('data', data);
       });
       return;
     } else if (datatype === ArrayBuffer) {
-      data = BinaryPack.unpack(data);
+      data = util.unpack(data);
     } else if (datatype === String) {
       var ab = util.binaryStringToArrayBuffer(data);
-      data = BinaryPack.unpack(ab);
+      data = util.unpack(ab);
     }
   } else if (this.serialization === 'json') {
     data = JSON.parse(data);
@@ -1339,6 +1658,12 @@ DataConnection.prototype.close = function() {
 
 /** Allows user to send data. */
 DataConnection.prototype.send = function(data) {
+  if (this._reliable) {
+    // Note: reliable sending will make it so that you cannot customize
+    // serialization.
+    this._reliable.send(data);
+    return;
+  }
   var self = this;
   if (this.serialization === 'none') {
     this._dc.send(data);
@@ -1346,7 +1671,7 @@ DataConnection.prototype.send = function(data) {
     this._dc.send(JSON.stringify(data));
   } else {
     var utf8 = (this.serialization === 'binary-utf8');
-    var blob = BinaryPack.pack(data, utf8);
+    var blob = util.pack(data, utf8);
     // DataChannel currently only supports strings.
     if (util.browserisms === 'Webkit') {
       util.blobToBinaryString(blob, function(str){

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 0 - 0
dist/peer.min.js


+ 27 - 5
lib/connection.js

@@ -87,13 +87,22 @@ DataConnection.prototype._setupDataChannel = function() {
   var self = this;
   if (this._originator) {
     util.log('Creating data channel');
-    this._dc = this._pc.createDataChannel(this.peer, { reliable: this._options.reliable });
+    // FOR NOW: reliable DC is not supported.
+    this._dc = this._pc.createDataChannel(this.peer, { reliable: false });
+    // Experimental reliable wrapper.
+    if (this._options.reliable) {
+      this._reliable = new Reliable(this._dc);
+    }
     this._configureDataChannel();
   } else {
     util.log('Listening for data channel');
     this._pc.ondatachannel = function(evt) {
       util.log('Received data channel');
       self._dc = evt.channel;
+      // Experimental reliable wrapper.
+      if (self._options.reliable) {
+        self._reliable = new Reliable(self._dc);
+      }
       self._configureDataChannel();
     };
   }
@@ -154,9 +163,15 @@ DataConnection.prototype._configureDataChannel = function() {
     self.open = true;
     self.emit('open');
   };
-  this._dc.onmessage = function(e) {
-    self._handleDataMessage(e);
-  };
+  if (this._reliable) {
+    this._reliable.onmessage = function(msg) {
+      self.emit('data', msg);
+    };
+  } else {
+    this._dc.onmessage = function(e) {
+      self._handleDataMessage(e);
+    };
+  }
   this._dc.onclose = function(e) {
     self.emit('close');
   };
@@ -185,7 +200,8 @@ DataConnection.prototype._makeOffer = function() {
         payload: {
           sdp: offer,
           serialization: self.serialization,
-          metadata: self.metadata
+          metadata: self.metadata,
+          reliable: self._options.reliable
         },
         dst: self.peer
       });
@@ -279,6 +295,12 @@ DataConnection.prototype.close = function() {
 
 /** Allows user to send data. */
 DataConnection.prototype.send = function(data) {
+  if (this._reliable) {
+    // Note: reliable sending will make it so that you cannot customize
+    // serialization.
+    this._reliable.send(data);
+    return;
+  }
   var self = this;
   if (this.serialization === 'none') {
     this._dc.send(data);

+ 1 - 0
lib/peer.js

@@ -111,6 +111,7 @@ Peer.prototype._handleServerJSONMessage = function(message) {
         metadata: payload.metadata,
         serialization: payload.serialization,
         sdp: payload.sdp,
+        reliable: payload.reliable,
         config: this._options.config
       };
       var connection = new DataConnection(this.id, peer, this._socket, options);

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels