dataconnection.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. var util = require('./util');
  2. var EventEmitter = require('eventemitter3');
  3. var Negotiator = require('./negotiator');
  4. var Reliable = require('reliable');
  5. /**
  6. * Wraps a DataChannel between two Peers.
  7. */
  8. function DataConnection(peer, provider, options) {
  9. if (!(this instanceof DataConnection)) return new DataConnection(peer, provider, options);
  10. EventEmitter.call(this);
  11. this.options = util.extend({
  12. serialization: 'binary',
  13. reliable: false
  14. }, options);
  15. // Connection is not open yet.
  16. this.open = false;
  17. this.type = 'data';
  18. this.peer = peer;
  19. this.provider = provider;
  20. this.id = this.options.connectionId || DataConnection._idPrefix + util.randomToken();
  21. this.label = this.options.label || this.id;
  22. this.metadata = this.options.metadata;
  23. this.serialization = this.options.serialization;
  24. this.reliable = this.options.reliable;
  25. // Data channel buffering.
  26. this._buffer = [];
  27. this._buffering = false;
  28. this.bufferSize = 0;
  29. // For storing large data.
  30. this._chunkedData = {};
  31. if (this.options._payload) {
  32. this._peerBrowser = this.options._payload.browser;
  33. }
  34. Negotiator.startConnection(
  35. this,
  36. this.options._payload || {
  37. originator: true
  38. }
  39. );
  40. }
  41. util.inherits(DataConnection, EventEmitter);
  42. DataConnection._idPrefix = 'dc_';
  43. /** Called by the Negotiator when the DataChannel is ready. */
  44. DataConnection.prototype.initialize = function(dc) {
  45. this._dc = this.dataChannel = dc;
  46. this._configureDataChannel();
  47. }
  48. DataConnection.prototype._configureDataChannel = function() {
  49. var self = this;
  50. if (util.supports.sctp) {
  51. this._dc.binaryType = 'arraybuffer';
  52. }
  53. this._dc.onopen = function() {
  54. util.log('Data channel connection success');
  55. self.open = true;
  56. self.emit('open');
  57. }
  58. // Use the Reliable shim for non Firefox browsers
  59. if (!util.supports.sctp && this.reliable) {
  60. this._reliable = new Reliable(this._dc, util.debug);
  61. }
  62. if (this._reliable) {
  63. this._reliable.onmessage = function(msg) {
  64. self.emit('data', msg);
  65. };
  66. } else {
  67. this._dc.onmessage = function(e) {
  68. self._handleDataMessage(e);
  69. };
  70. }
  71. this._dc.onclose = function(e) {
  72. util.log('DataChannel closed for:', self.peer);
  73. self.close();
  74. };
  75. }
  76. // Handles a DataChannel message.
  77. DataConnection.prototype._handleDataMessage = function(e) {
  78. var self = this;
  79. var data = e.data;
  80. var datatype = data.constructor;
  81. if (this.serialization === 'binary' || this.serialization === 'binary-utf8') {
  82. if (datatype === Blob) {
  83. // Datatype should never be blob
  84. util.blobToArrayBuffer(data, function(ab) {
  85. data = util.unpack(ab);
  86. self.emit('data', data);
  87. });
  88. return;
  89. } else if (datatype === ArrayBuffer) {
  90. data = util.unpack(data);
  91. } else if (datatype === String) {
  92. // String fallback for binary data for browsers that don't support binary yet
  93. var ab = util.binaryStringToArrayBuffer(data);
  94. data = util.unpack(ab);
  95. }
  96. } else if (this.serialization === 'json') {
  97. data = JSON.parse(data);
  98. }
  99. // Check if we've chunked--if so, piece things back together.
  100. // We're guaranteed that this isn't 0.
  101. if (data.__peerData) {
  102. var id = data.__peerData;
  103. var chunkInfo = this._chunkedData[id] || {data: [], count: 0, total: data.total};
  104. chunkInfo.data[data.n] = data.data;
  105. chunkInfo.count += 1;
  106. if (chunkInfo.total === chunkInfo.count) {
  107. // Clean up before making the recursive call to `_handleDataMessage`.
  108. delete this._chunkedData[id];
  109. // We've received all the chunks--time to construct the complete data.
  110. data = new Blob(chunkInfo.data);
  111. this._handleDataMessage({data: data});
  112. }
  113. this._chunkedData[id] = chunkInfo;
  114. return;
  115. }
  116. this.emit('data', data);
  117. }
  118. /**
  119. * Exposed functionality for users.
  120. */
  121. /** Allows user to close connection. */
  122. DataConnection.prototype.close = function() {
  123. if (!this.open) {
  124. return;
  125. }
  126. this.open = false;
  127. Negotiator.cleanup(this);
  128. this.emit('close');
  129. }
  130. /** Allows user to send data. */
  131. DataConnection.prototype.send = function(data, chunked) {
  132. if (!this.open) {
  133. this.emit('error', new Error('Connection is not open. You should listen for the `open` event before sending messages.'));
  134. return;
  135. }
  136. if (this._reliable) {
  137. // Note: reliable shim sending will make it so that you cannot customize
  138. // serialization.
  139. this._reliable.send(data);
  140. return;
  141. }
  142. var self = this;
  143. if (this.serialization === 'json') {
  144. this._bufferedSend(JSON.stringify(data));
  145. } else if (this.serialization === 'binary' || this.serialization === 'binary-utf8') {
  146. var blob = util.pack(data);
  147. // For Chrome-Firefox interoperability, we need to make Firefox "chunk"
  148. // the data it sends out.
  149. var needsChunking = util.chunkedBrowsers[this._peerBrowser] || util.chunkedBrowsers[util.browser];
  150. if (needsChunking && !chunked && blob.size > util.chunkedMTU) {
  151. this._sendChunks(blob);
  152. return;
  153. }
  154. // DataChannel currently only supports strings.
  155. if (!util.supports.sctp) {
  156. util.blobToBinaryString(blob, function(str) {
  157. self._bufferedSend(str);
  158. });
  159. } else if (!util.supports.binaryBlob) {
  160. // We only do this if we really need to (e.g. blobs are not supported),
  161. // because this conversion is costly.
  162. util.blobToArrayBuffer(blob, function(ab) {
  163. self._bufferedSend(ab);
  164. });
  165. } else {
  166. this._bufferedSend(blob);
  167. }
  168. } else {
  169. this._bufferedSend(data);
  170. }
  171. }
  172. DataConnection.prototype._bufferedSend = function(msg) {
  173. if (this._buffering || !this._trySend(msg)) {
  174. this._buffer.push(msg);
  175. this.bufferSize = this._buffer.length;
  176. }
  177. }
  178. // Returns true if the send succeeds.
  179. DataConnection.prototype._trySend = function(msg) {
  180. try {
  181. this._dc.send(msg);
  182. } catch (e) {
  183. this._buffering = true;
  184. var self = this;
  185. setTimeout(function() {
  186. // Try again.
  187. self._buffering = false;
  188. self._tryBuffer();
  189. }, 100);
  190. return false;
  191. }
  192. return true;
  193. }
  194. // Try to send the first message in the buffer.
  195. DataConnection.prototype._tryBuffer = function() {
  196. if (this._buffer.length === 0) {
  197. return;
  198. }
  199. var msg = this._buffer[0];
  200. if (this._trySend(msg)) {
  201. this._buffer.shift();
  202. this.bufferSize = this._buffer.length;
  203. this._tryBuffer();
  204. }
  205. }
  206. DataConnection.prototype._sendChunks = function(blob) {
  207. var blobs = util.chunk(blob);
  208. for (var i = 0, ii = blobs.length; i < ii; i += 1) {
  209. var blob = blobs[i];
  210. this.send(blob, true);
  211. }
  212. }
  213. DataConnection.prototype.handleMessage = function(message) {
  214. var payload = message.payload;
  215. switch (message.type) {
  216. case 'ANSWER':
  217. this._peerBrowser = payload.browser;
  218. // Forward to negotiator
  219. Negotiator.handleSDP(message.type, this, payload.sdp);
  220. break;
  221. case 'CANDIDATE':
  222. Negotiator.handleCandidate(this, payload.candidate);
  223. break;
  224. default:
  225. util.warn('Unrecognized message type:', message.type, 'from peer:', this.peer);
  226. break;
  227. }
  228. }
  229. module.exports = DataConnection;