123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- var util = require('./util');
- var express = require('express');
- var http = require('http');
- var EventEmitter = require('events').EventEmitter;
- var WebSocketServer = require('ws').Server;
- var url = require('url');
- function PeerServer(options) {
- if (!(this instanceof PeerServer)) return new PeerServer(options);
- EventEmitter.call(this);
- this._app = express();
- this._httpServer = http.createServer(this._app);
- this._app.use(express.bodyParser());
- this._app.use(this._allowCrossDomain);
- this._options = util.extend({
- port: 80,
- debug: false,
- timeout: 5000
- }, options);
- util.debug = this._options.debug;
- // Listen on user-specified port and create WebSocket server as well.
- this._httpServer.listen(this._options.port);
- this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer });
- // WebSockets that are opened or HTTP responses (which are paired with
- // something in timeouts.
- this._clients = {};
- // Timeouts for HTTP responses.
- this._timeouts = {};
- // Connections waiting for another peer.
- this._outstandingOffers = {};
- // Initailize WebSocket server handlers.
- this._initializeWSS();
- // Initialize HTTP routes. This is only used for the first few milliseconds
- // before a socket is opened for a Peer.
- this._initializeHTTP();
- };
- util.inherits(PeerServer, EventEmitter);
- /** Handle CORS */
- PeerServer.prototype._allowCrossDomain = function(req, res, next) {
- res.setHeader('Access-Control-Allow-Origin', '*');
- res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE');
- res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
- next();
- }
- /** Initialize WebSocket server. */
- PeerServer.prototype._initializeWSS = function() {
- var self = this;
- this._wss.on('connection', function(socket) {
- var id = url.parse(socket.upgradeReq.url, true).query.id;
- if (!!id && !!self._clients[id]) {
- // If response client and timeout exist, overwrite and clear.
- if (!!self._timeouts[id]) {
- clearTimeout(self._timeouts[id]);
- delete self._timeouts[id];
- self._clients[id].end(JSON.stringify({ type: 'HTTP-SOCKET' }));
- } else {
- socket.send(JSON.stringify({ type: 'ID-TAKEN', msg: 'ID is taken' }));
- socket.close();
- return;
- }
- } else if (id === undefined) {
- id = self._generateClientId();
- }
- socket.send(JSON.stringify({ type: 'OPEN', id: id }));
- // Save the socket for this id.
- self._clients[id] = socket;
- self._processOutstandingOffers(id);
- // Cleanup after a socket closes.
- socket.on('close', function() {
- util.log('Socket closed:', id);
- self._removePeer(id);
- });
- // Handle messages from peers.
- socket.on('message', function(data) {
- try {
- var message = JSON.parse(data);
- util.log(message);
- switch (message.type) {
- case 'LEAVE':
- // Clean up if a Peer sends a LEAVE.
- if (!message.dst) {
- self._removePeer(id);
- break;
- }
- // ICE candidates
- case 'CANDIDATE':
- // Offer or answer between peers.
- case 'OFFER':
- case 'ANSWER':
- // Firefoxism (connectDataConnection ports)
- case 'PORT':
- // Use the ID we know to be correct to prevent spoofing.
- message.src = id;
- self._handleTransmission(message);
- break;
- default:
- util.prettyError('Message unrecognized');
- }
- } catch(e) {
- util.log('Invalid message', data);
- }
- });
- });
- };
- /** Process outstanding peer offers. */
- PeerServer.prototype._processOutstandingOffers = function(id) {
- var offers = this._outstandingOffers[id];
- if (offers === undefined) {
- return;
- }
- var sources = Object.keys(offers);
- for (var i = 0, ii = sources.length; i < ii; i += 1) {
- var messages = offers[sources[i]];
- for (var j = 0, jj = messages.length; j < jj; j += 1) {
- this._handleTransmission.apply(this, messages[j]);
- }
- delete this._outstandingOffers[id][sources[i]];
- }
- };
- /** Initialize HTTP server routes. */
- PeerServer.prototype._initializeHTTP = function() {
- var self = this;
- this._app.options('/*', function(req, res, next) {
- res.send(200);
- });
- // Server sets up HTTP streaming whether you get or post an ID.
- // Retrieve guaranteed random ID.
- this._app.get('/id', function(req, res) {
- var clientId = util.randomId();
- while (!!self._clients[clientId]) {
- clientId = util.randomId();
- }
- self._startStreaming(res, clientId, function() {
- // Chrome hacks.
- res.write('{"id":"' + clientId + '"}\n');
- });
- });
- this._app.post('/id', function(req, res) {
- var id = req.body.id;
- self._startStreaming(res, id);
- });
- this._app.post('/offer', function(req, res) {
- self._handleTransmission(req.body, res);
- });
- this._app.post('/candidate', function(req, res) {
- self._handleTransmission(req.body, res);
- });
- this._app.post('/answer', function(req, res) {
- self._handleTransmission(req.body, res);
- });
- this._app.post('/leave', function(req, res) {
- self._handleTransmission(req.body, res);
- });
- this._app.post('/port', function(req, res) {
- self._handleTransmission(req.body, res);
- });
- };
- PeerServer.prototype._removePeer = function(id) {
- delete this._clients[id];
- if (this._timeouts[id]) {
- clearTimeout(this._timeouts[id]);
- delete this._timeouts[id];
- }
- };
- /** Saves a streaming response and takes care of timeouts and headers. */
- PeerServer.prototype._startStreaming = function(res, id, write) {
- res.writeHead(200, {'Content-Type': 'application/octet-stream'});
- if (!!write) {
- write();
- }
- var pad = '00';
- var iterations = 10;
- for (var i = 0; i < iterations; i++) {
- pad += pad;
- }
- res.write(pad + '\n');
- // Save res so we can write to it.
- if (!this._clients[id]) {
- this._clients[id] = res;
- // Set timeout to expire.
- var self = this;
- this._timeouts[id] = setTimeout(function() {
- self._removePeer(id);
- res.end(JSON.stringify({ type: 'HTTP-END' }));
- }, 30000);
- this._processOutstandingOffers(id);
- } else {
- res.end(JSON.stringify({ type: 'HTTP-ERROR' }));
- }
- };
- /** Handles passing on a message. */
- PeerServer.prototype._handleTransmission = function(message, res) {
- var type = message.type;
- var src = message.src;
- var dst = message.dst;
- var data = JSON.stringify(message);
- var destination = this._clients[dst];
- if (!!destination) {
- try {
- if (this._timeouts[dst]) {
- data += '\n';
- destination.write(data);
- } else {
- destination.send(data);
- }
- if (!!res) {
- res.send(200);
- }
- } catch (e) {
- // This happens when a peer disconnects without closing connections and
- // the associated WebSocket has not closed.
- util.prettyError(e);
- // Tell other side to stop trying.
- this._removePeer(dst);
- this._handleTransmission({
- type: 'LEAVE',
- src: dst,
- dst: src
- });
- if (!!res) res.send(501);
- }
- } else {
- // Wait 5 seconds for this client to connect/reconnect (XHR) for important
- // messages.
- if (type !== 'LEAVE') {
- var self = this;
- if (!this._outstandingOffers[dst]) {
- this._outstandingOffers[dst] = {};
- }
- if (!this._outstandingOffers[dst][src]) {
- this._outstandingOffers[dst][src] = [];
- setTimeout(function() {
- if(!!self._outstandingOffers[dst][src]) {
- delete self._outstandingOffers[dst][src];
- self._handleTransmission({
- type: 'EXPIRE',
- src: dst,
- dst: src
- });
- }
- }, this._options.timeout);
- }
- this._outstandingOffers[dst][src].push([message]);
- if (!!res) res.send(200);
- } else if (type === 'LEAVE' && !dst) {
- this._removePeer(src);
- if (!!res) res.send(200);
- } else if (src) {
- // Assume a disconnect if the client no longer exists.
- // Unless it's a message from the server.
- this._handleTransmission({
- type: 'EXPIRE',
- src: dst,
- dst: src
- });
- // 410: Resource not available.
- if (!!res) res.send(410);
- }
- }
- };
- PeerServer.prototype._generateClientId = function() {
- var clientId = util.randomId();
- while (!!this._clients[clientId]) {
- clientId = util.randomId();
- }
- return clientId;
- };
- exports.PeerServer = PeerServer;
|